Skip to content

Commit f90c9a3

Browse files
authored
Merge 82d5b85 into 7c5a9d4
2 parents 7c5a9d4 + 82d5b85 commit f90c9a3

14 files changed

+241
-19
lines changed

ydb/core/protos/flat_scheme_op.proto

+6
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,11 @@ message TMoveIndex {
12931293
optional bool AllowOverwrite = 4;
12941294
}
12951295

1296+
message TSetVal {
1297+
optional sint64 NextValue = 1;
1298+
optional bool NextUsed = 2;
1299+
}
1300+
12961301
message TSequenceDescription {
12971302
optional string Name = 1; // mandatory
12981303
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
@@ -1305,6 +1310,7 @@ message TSequenceDescription {
13051310
optional uint64 Cache = 8; // number of items to cache, defaults to 1
13061311
optional sint64 Increment = 9; // increment at each call, defaults to 1
13071312
optional bool Cycle = 10; // true when cycle on overflow is allowed
1313+
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
13081314
}
13091315

13101316
message TSequenceSharding {

ydb/core/protos/tx_sequenceshard.proto

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ message TEvMarkSchemeShardPipe {
1717
uint64 Round = 3;
1818
}
1919

20+
message TSetVal {
21+
sint64 NextValue = 1;
22+
bool NextUsed = 2;
23+
}
24+
2025
message TEvCreateSequence {
2126
NKikimrProto.TPathID PathId = 1;
2227
uint64 TxId = 2;
@@ -40,6 +45,9 @@ message TEvCreateSequence {
4045
bool Cycle = 9;
4146
}
4247
bool Frozen = 10; // defaults to false
48+
oneof OptionalSetVal {
49+
TSetVal SetVal = 11;
50+
}
4351
}
4452

4553
message TEvCreateSequenceResult {

ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
137137
result.push_back(CreateCopyTable(NextPartId(nextId, result),
138138
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));
139139

140-
if (descr.GetOmitIndexes()) {
141-
continue;
142-
}
143-
144140
TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
145141
for (const auto& child: srcPath.Base()->GetChildren()) {
146142
const auto& name = child.first;
@@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
160156
continue;
161157
}
162158

159+
if (descr.GetOmitIndexes()) {
160+
continue;
161+
}
162+
163163
if (!srcIndexPath.IsTableIndex()) {
164164
continue;
165165
}
@@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
185185
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
186186
scheme.SetFailOnExist(true);
187187

188+
auto* copySequence = scheme.MutableCopySequence();
189+
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
188190
*scheme.MutableSequence() = std::move(sequenceDescription);
189191

190-
result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
192+
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
191193
}
192194
}
193195

ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp

+24-3
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class TConfigureParts : public TSubOperationState {
115115
event->Record.SetFrozen(true);
116116

117117
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
118-
"TCoptSequence TConfigureParts ProgressState"
118+
"TCopySequence TConfigureParts ProgressState"
119119
<< " sending TEvCreateSequence to tablet " << tabletId
120120
<< " operationId# " << OperationId
121121
<< " at tablet " << ssId);
@@ -274,6 +274,18 @@ class TProposedCopySequence : public TSubOperationState {
274274
<< " operationId#" << OperationId;
275275
}
276276

277+
void UpdateSequenceDescription(NKikimrSchemeOp::TSequenceDescription& descr) {
278+
descr.SetStartValue(GetSequenceResult.GetStartValue());
279+
descr.SetMinValue(GetSequenceResult.GetMinValue());
280+
descr.SetMaxValue(GetSequenceResult.GetMaxValue());
281+
descr.SetCache(GetSequenceResult.GetCache());
282+
descr.SetIncrement(GetSequenceResult.GetIncrement());
283+
descr.SetCycle(GetSequenceResult.GetCycle());
284+
auto* setValMsg = descr.MutableSetVal();
285+
setValMsg->SetNextValue(GetSequenceResult.GetNextValue());
286+
setValMsg->SetNextUsed(GetSequenceResult.GetNextUsed());
287+
}
288+
277289
public:
278290
TProposedCopySequence(TOperationId id)
279291
: OperationId(id)
@@ -333,7 +345,15 @@ class TProposedCopySequence : public TSubOperationState {
333345
return false;
334346
}
335347

348+
TPathId pathId = txState->TargetPathId;
349+
336350
NIceDb::TNiceDb db(context.GetDB());
351+
352+
auto sequenceInfo = context.SS->Sequences.at(pathId);
353+
UpdateSequenceDescription(sequenceInfo->Description);
354+
355+
context.SS->PersistSequence(db, pathId, *sequenceInfo);
356+
337357
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
338358
context.OnComplete.ActivateTx(OperationId);
339359
return true;
@@ -387,7 +407,7 @@ class TProposedCopySequence : public TSubOperationState {
387407
return false;
388408
}
389409

390-
auto getSequenceResult = ev->Get()->Record;
410+
GetSequenceResult = ev->Get()->Record;
391411

392412
Y_ABORT_UNLESS(txState->Shards.size() == 1);
393413
for (auto shard : txState->Shards) {
@@ -397,7 +417,8 @@ class TProposedCopySequence : public TSubOperationState {
397417
Y_ABORT_UNLESS(currentTabletId != InvalidTabletId);
398418

399419
auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvRestoreSequence>(
400-
txState->TargetPathId, getSequenceResult);
420+
txState->TargetPathId, GetSequenceResult);
421+
401422
event->Record.SetTxId(ui64(OperationId.GetTxId()));
402423
event->Record.SetTxPartId(OperationId.GetSubTxId());
403424

ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ class TConfigureParts : public TSubOperationState {
142142
if (alterData->Description.HasCycle()) {
143143
event->Record.SetCycle(alterData->Description.GetCycle());
144144
}
145+
if (alterData->Description.HasSetVal()) {
146+
event->Record.MutableSetVal()->SetNextValue(alterData->Description.GetSetVal().GetNextValue());
147+
event->Record.MutableSetVal()->SetNextUsed(alterData->Description.GetSetVal().GetNextUsed());
148+
}
145149

146150
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
147151
"TCreateSequence TConfigureParts ProgressState"

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
106106
task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));
107107

108108
const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
109-
if (sourcePath.IsResolved()) {
110-
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
109+
const TPath exportPathItem = exportPath.Child(ToString(itemIdx));
110+
if (sourcePath.IsResolved() && exportPathItem.IsResolved()) {
111+
auto exportDescription = GetTableDescription(ss, exportPathItem.Base()->PathId);
112+
exportDescription.MutableTable()->SetName(sourcePath.LeafName());
113+
task.MutableTable()->CopyFrom(exportDescription);
111114
}
112115

113116
task.SetSnapshotStep(exportInfo->SnapshotStep);

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
7272
if (fromSequence.has_cycle()) {
7373
seqDesc->SetCycle(fromSequence.cycle());
7474
}
75+
if (fromSequence.has_set_val()) {
76+
auto* setVal = seqDesc->MutableSetVal();
77+
setVal->SetNextUsed(fromSequence.set_val().next_used());
78+
setVal->SetNextValue(fromSequence.set_val().next_value());
79+
}
7580

7681
break;
7782
}

ydb/core/tx/schemeshard/ut_helpers/helpers.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -2362,16 +2362,17 @@ namespace NSchemeShardUT_Private {
23622362
runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release()));
23632363
}
23642364

2365-
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) {
2365+
i64 WaitNextValResult(
2366+
TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus) {
23662367
auto ev = runtime.GrabEdgeEventRethrow<NSequenceProxy::TEvSequenceProxy::TEvNextValResult>(sender);
23672368
auto* msg = ev->Get();
2368-
UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS);
2369+
UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
23692370
return msg->Value;
23702371
}
23712372

2372-
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) {
2373+
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus) {
23732374
auto sender = runtime.AllocateEdgeActor(0);
23742375
SendNextValRequest(runtime, sender, path);
2375-
return WaitNextValResult(runtime, sender);
2376+
return WaitNextValResult(runtime, sender, expectedStatus);
23762377
}
23772378
}

ydb/core/tx/schemeshard/ut_helpers/helpers.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,11 @@ namespace NSchemeShardUT_Private {
553553
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);
554554

555555
void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
556-
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender);
557-
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path);
556+
i64 WaitNextValResult(
557+
TTestActorRuntime& runtime, const TActorId& sender,
558+
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
559+
i64 DoNextVal(
560+
TTestActorRuntime& runtime, const TString& path,
561+
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
558562

559563
} //NSchemeShardUT_Private

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

+147
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,153 @@ value {
955955
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
956956
}
957957

958+
Y_UNIT_TEST(ShouldRestoreSequence) {
959+
TPortManager portManager;
960+
const ui16 port = portManager.GetPort();
961+
962+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
963+
UNIT_ASSERT(s3Mock.Start());
964+
965+
TTestBasicRuntime runtime;
966+
TTestEnv env(runtime);
967+
968+
ui64 txId = 100;
969+
970+
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
971+
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
972+
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
973+
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
974+
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);
975+
976+
TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
977+
TableDescription {
978+
Name: "Original"
979+
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
980+
Columns { Name: "value" Type: "Uint64" }
981+
KeyColumnNames: ["key"]
982+
}
983+
SequenceDescription {
984+
Name: "myseq"
985+
}
986+
)");
987+
env.TestWaitNotification(runtime, txId);
988+
989+
i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
990+
UNIT_ASSERT_VALUES_EQUAL(value, 1);
991+
992+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
993+
ExportToS3Settings {
994+
endpoint: "localhost:%d"
995+
scheme: HTTP
996+
items {
997+
source_path: "/MyRoot/Original"
998+
destination_prefix: ""
999+
}
1000+
}
1001+
)", port));
1002+
env.TestWaitNotification(runtime, txId);
1003+
TestGetExport(runtime, txId, "/MyRoot");
1004+
1005+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1006+
ImportFromS3Settings {
1007+
endpoint: "localhost:%d"
1008+
scheme: HTTP
1009+
items {
1010+
source_prefix: ""
1011+
destination_path: "/MyRoot/Restored"
1012+
}
1013+
}
1014+
)", port));
1015+
env.TestWaitNotification(runtime, txId);
1016+
TestGetImport(runtime, txId, "/MyRoot");
1017+
1018+
const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
1019+
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);
1020+
1021+
const auto& table = desc.GetPathDescription().GetTable();
1022+
1023+
value = DoNextVal(runtime, "/MyRoot/Restored/myseq");
1024+
UNIT_ASSERT_VALUES_EQUAL(value, 2);
1025+
1026+
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
1027+
}
1028+
1029+
Y_UNIT_TEST(ShouldRestoreSequenceWithOverflow) {
1030+
TPortManager portManager;
1031+
const ui16 port = portManager.GetPort();
1032+
1033+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
1034+
UNIT_ASSERT(s3Mock.Start());
1035+
1036+
TTestBasicRuntime runtime;
1037+
TTestEnv env(runtime);
1038+
1039+
ui64 txId = 100;
1040+
1041+
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
1042+
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
1043+
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
1044+
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
1045+
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);
1046+
1047+
TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
1048+
TableDescription {
1049+
Name: "Original"
1050+
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
1051+
Columns { Name: "value" Type: "Uint64" }
1052+
KeyColumnNames: ["key"]
1053+
}
1054+
SequenceDescription {
1055+
Name: "myseq"
1056+
MinValue: 1
1057+
MaxValue: 2
1058+
}
1059+
)");
1060+
env.TestWaitNotification(runtime, txId);
1061+
1062+
i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
1063+
UNIT_ASSERT_VALUES_EQUAL(value, 1);
1064+
1065+
value = DoNextVal(runtime, "/MyRoot/Original/myseq");
1066+
UNIT_ASSERT_VALUES_EQUAL(value, 2);
1067+
1068+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1069+
ExportToS3Settings {
1070+
endpoint: "localhost:%d"
1071+
scheme: HTTP
1072+
items {
1073+
source_path: "/MyRoot/Original"
1074+
destination_prefix: ""
1075+
}
1076+
}
1077+
)", port));
1078+
env.TestWaitNotification(runtime, txId);
1079+
TestGetExport(runtime, txId, "/MyRoot");
1080+
1081+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1082+
ImportFromS3Settings {
1083+
endpoint: "localhost:%d"
1084+
scheme: HTTP
1085+
items {
1086+
source_prefix: ""
1087+
destination_path: "/MyRoot/Restored"
1088+
}
1089+
}
1090+
)", port));
1091+
env.TestWaitNotification(runtime, txId);
1092+
TestGetImport(runtime, txId, "/MyRoot");
1093+
1094+
const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
1095+
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);
1096+
1097+
const auto& table = desc.GetPathDescription().GetTable();
1098+
1099+
value = DoNextVal(runtime, "/MyRoot/Restored/myseq", Ydb::StatusIds::SCHEME_ERROR);
1100+
UNIT_ASSERT_VALUES_EQUAL(value, 2);
1101+
1102+
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
1103+
}
1104+
9581105
Y_UNIT_TEST(ExportImportPg) {
9591106
TTestBasicRuntime runtime;
9601107
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));

ydb/core/tx/schemeshard/ut_restore/ya.make

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ PEERDIR(
2121
ydb/core/wrappers/ut_helpers
2222
ydb/core/ydb_convert
2323
ydb/library/yql/sql/pg
24-
ydb/library/yql/parser/pg_wrapper
24+
ydb/library/yql/parser/pg_wrapper
2525
)
2626

2727
SRCS(

ydb/core/tx/sequenceshard/tx_create_sequence.cpp

+9-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,15 @@ namespace NSequenceShard {
7474
sequence.Cycle = msg->Record.GetCycle();
7575
}
7676

77-
sequence.NextValue = sequence.StartValue;
78-
sequence.NextUsed = false;
77+
78+
if (msg->Record.OptionalSetVal_case() == NKikimrTxSequenceShard::TEvCreateSequence::kSetVal) {
79+
sequence.NextValue = msg->Record.GetSetVal().GetNextValue();
80+
sequence.NextUsed = msg->Record.GetSetVal().GetNextUsed();
81+
} else {
82+
sequence.NextUsed = false;
83+
sequence.NextValue = sequence.StartValue;
84+
}
85+
7986
if (msg->Record.OptionalCache_case() == NKikimrTxSequenceShard::TEvCreateSequence::kCache) {
8087
sequence.Cache = msg->Record.GetCache();
8188
if (sequence.Cache < 1) {

0 commit comments

Comments
 (0)