Skip to content

Commit 1706b70

Browse files
authored
Merge 1ed48a8 into 7983705
2 parents 7983705 + 1ed48a8 commit 1706b70

16 files changed

+251
-25
lines changed

ydb/core/protos/flat_scheme_op.proto

+6
Original file line numberDiff line numberDiff line change
@@ -1294,6 +1294,10 @@ message TMoveIndex {
12941294
}
12951295

12961296
message TSequenceDescription {
1297+
message TSetVal {
1298+
optional sint64 NextValue = 1;
1299+
optional bool NextUsed = 2;
1300+
}
12971301
optional string Name = 1; // mandatory
12981302
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
12991303
optional uint64 Version = 3; // incremented every time sequence is altered
@@ -1305,6 +1309,7 @@ message TSequenceDescription {
13051309
optional uint64 Cache = 8; // number of items to cache, defaults to 1
13061310
optional sint64 Increment = 9; // increment at each call, defaults to 1
13071311
optional bool Cycle = 10; // true when cycle on overflow is allowed
1312+
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
13081313
}
13091314

13101315
message TSequenceSharding {
@@ -1613,6 +1618,7 @@ message TDescribeOptions {
16131618
optional bool ShowPrivateTable = 7 [default = false];
16141619
optional bool ReturnChannelsBinding = 8 [default = false];
16151620
optional bool ReturnRangeKey = 9 [default = true];
1621+
optional bool ReturnSetVal = 10 [default = false];
16161622
}
16171623

16181624
// Request to read scheme for a specific path

ydb/core/protos/tx_sequenceshard.proto

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ message TEvMarkSchemeShardPipe {
1818
}
1919

2020
message TEvCreateSequence {
21+
message TSetVal {
22+
sint64 NextValue = 1;
23+
bool NextUsed = 2;
24+
}
25+
2126
NKikimrProto.TPathID PathId = 1;
2227
uint64 TxId = 2;
2328
uint64 TxPartId = 3;
@@ -40,6 +45,7 @@ message TEvCreateSequence {
4045
bool Cycle = 9;
4146
}
4247
bool Frozen = 10; // defaults to false
48+
TSetVal SetVal = 11;
4349
}
4450

4551
message TEvCreateSequenceResult {

ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
137137
result.push_back(CreateCopyTable(NextPartId(nextId, result),
138138
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));
139139

140-
if (descr.GetOmitIndexes()) {
141-
continue;
142-
}
143-
144140
TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
145141
for (const auto& child: srcPath.Base()->GetChildren()) {
146142
const auto& name = child.first;
@@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
160156
continue;
161157
}
162158

159+
if (descr.GetOmitIndexes()) {
160+
continue;
161+
}
162+
163163
if (!srcIndexPath.IsTableIndex()) {
164164
continue;
165165
}
@@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
185185
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
186186
scheme.SetFailOnExist(true);
187187

188+
auto* copySequence = scheme.MutableCopySequence();
189+
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
188190
*scheme.MutableSequence() = std::move(sequenceDescription);
189191

190-
result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
192+
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
191193
}
192194
}
193195

ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp

+24-3
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class TConfigureParts : public TSubOperationState {
115115
event->Record.SetFrozen(true);
116116

117117
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
118-
"TCoptSequence TConfigureParts ProgressState"
118+
"TCopySequence TConfigureParts ProgressState"
119119
<< " sending TEvCreateSequence to tablet " << tabletId
120120
<< " operationId# " << OperationId
121121
<< " at tablet " << ssId);
@@ -274,6 +274,18 @@ class TProposedCopySequence : public TSubOperationState {
274274
<< " operationId#" << OperationId;
275275
}
276276

277+
void UpdateSequenceDescription(NKikimrSchemeOp::TSequenceDescription& descr) {
278+
descr.SetStartValue(GetSequenceResult.GetStartValue());
279+
descr.SetMinValue(GetSequenceResult.GetMinValue());
280+
descr.SetMaxValue(GetSequenceResult.GetMaxValue());
281+
descr.SetCache(GetSequenceResult.GetCache());
282+
descr.SetIncrement(GetSequenceResult.GetIncrement());
283+
descr.SetCycle(GetSequenceResult.GetCycle());
284+
auto* setValMsg = descr.MutableSetVal();
285+
setValMsg->SetNextValue(GetSequenceResult.GetNextValue());
286+
setValMsg->SetNextUsed(GetSequenceResult.GetNextUsed());
287+
}
288+
277289
public:
278290
TProposedCopySequence(TOperationId id)
279291
: OperationId(id)
@@ -333,7 +345,15 @@ class TProposedCopySequence : public TSubOperationState {
333345
return false;
334346
}
335347

348+
TPathId pathId = txState->TargetPathId;
349+
336350
NIceDb::TNiceDb db(context.GetDB());
351+
352+
auto sequenceInfo = context.SS->Sequences.at(pathId);
353+
UpdateSequenceDescription(sequenceInfo->Description);
354+
355+
context.SS->PersistSequence(db, pathId, *sequenceInfo);
356+
337357
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
338358
context.OnComplete.ActivateTx(OperationId);
339359
return true;
@@ -387,7 +407,7 @@ class TProposedCopySequence : public TSubOperationState {
387407
return false;
388408
}
389409

390-
auto getSequenceResult = ev->Get()->Record;
410+
GetSequenceResult = ev->Get()->Record;
391411

392412
Y_ABORT_UNLESS(txState->Shards.size() == 1);
393413
for (auto shard : txState->Shards) {
@@ -397,7 +417,8 @@ class TProposedCopySequence : public TSubOperationState {
397417
Y_ABORT_UNLESS(currentTabletId != InvalidTabletId);
398418

399419
auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvRestoreSequence>(
400-
txState->TargetPathId, getSequenceResult);
420+
txState->TargetPathId, GetSequenceResult);
421+
401422
event->Record.SetTxId(ui64(OperationId.GetTxId()));
402423
event->Record.SetTxPartId(OperationId.GetSubTxId());
403424

ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ class TConfigureParts : public TSubOperationState {
142142
if (alterData->Description.HasCycle()) {
143143
event->Record.SetCycle(alterData->Description.GetCycle());
144144
}
145+
if (alterData->Description.HasSetVal()) {
146+
event->Record.MutableSetVal()->SetNextValue(alterData->Description.GetSetVal().GetNextValue());
147+
event->Record.MutableSetVal()->SetNextUsed(alterData->Description.GetSetVal().GetNextUsed());
148+
}
145149

146150
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
147151
"TCreateSequence TConfigureParts ProgressState"

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ static NKikimrSchemeOp::TPathDescription GetTableDescription(TSchemeShard* ss, c
7676
opts.SetReturnPartitioningInfo(false);
7777
opts.SetReturnPartitionConfig(true);
7878
opts.SetReturnBoundaries(true);
79+
opts.SetReturnSetVal(true);
7980

8081
auto desc = DescribePath(ss, TlsActivationContext->AsActorContext(), pathId, opts);
8182
auto record = desc->GetRecord();
@@ -106,8 +107,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
106107
task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));
107108

108109
const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
109-
if (sourcePath.IsResolved()) {
110-
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
110+
const TPath exportPathItem = exportPath.Child(ToString(itemIdx));
111+
if (sourcePath.IsResolved() && exportPathItem.IsResolved()) {
112+
auto exportDescription = GetTableDescription(ss, exportPathItem.Base()->PathId);
113+
exportDescription.MutableTable()->SetName(sourcePath.LeafName());
114+
task.MutableTable()->CopyFrom(exportDescription);
111115
}
112116

113117
task.SetSnapshotStep(exportInfo->SnapshotStep);

ydb/core/tx/schemeshard/schemeshard_impl.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -992,8 +992,10 @@ class TSchemeShard
992992
void DescribeTableIndex(const TPathId& pathId, const TString& name, TTableIndexInfo::TPtr indexInfo, NKikimrSchemeOp::TIndexDescription& entry);
993993
void DescribeCdcStream(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TCdcStreamDescription& desc);
994994
void DescribeCdcStream(const TPathId& pathId, const TString& name, TCdcStreamInfo::TPtr info, NKikimrSchemeOp::TCdcStreamDescription& desc);
995-
void DescribeSequence(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TSequenceDescription& desc);
996-
void DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info, NKikimrSchemeOp::TSequenceDescription& desc);
995+
void DescribeSequence(const TPathId& pathId, const TString& name,
996+
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal = false);
997+
void DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info,
998+
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal = false);
997999
void DescribeReplication(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TReplicationDescription& desc);
9981000
void DescribeReplication(const TPathId& pathId, const TString& name, TReplicationInfo::TPtr info, NKikimrSchemeOp::TReplicationDescription& desc);
9991001
void DescribeBlobDepot(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TBlobDepotDescription& desc);

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
7272
if (fromSequence.has_cycle()) {
7373
seqDesc->SetCycle(fromSequence.cycle());
7474
}
75+
if (fromSequence.has_set_val()) {
76+
auto* setVal = seqDesc->MutableSetVal();
77+
setVal->SetNextUsed(fromSequence.set_val().next_used());
78+
setVal->SetNextValue(fromSequence.set_val().next_value());
79+
}
7580

7681
break;
7782
}

ydb/core/tx/schemeshard/schemeshard_path_describer.cpp

+9-4
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
211211
bool returnBackupInfo = Params.GetBackupInfo();
212212
bool returnBoundaries = false;
213213
bool returnRangeKey = true;
214+
bool returnSetVal = Params.GetReturnSetVal();
214215
if (Params.HasOptions()) {
215216
returnConfig = Params.GetOptions().GetReturnPartitionConfig();
216217
returnPartitioning = Params.GetOptions().GetReturnPartitioningInfo();
@@ -361,7 +362,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
361362
Self->DescribeCdcStream(childPathId, childName, *entry->AddCdcStreams());
362363
break;
363364
case NKikimrSchemeOp::EPathTypeSequence:
364-
Self->DescribeSequence(childPathId, childName, *entry->AddSequences());
365+
Self->DescribeSequence(childPathId, childName, *entry->AddSequences(), returnSetVal);
365366
break;
366367
default:
367368
Y_FAIL_S("Unexpected table's child"
@@ -1241,24 +1242,28 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
12411242
}
12421243

12431244
void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name,
1244-
NKikimrSchemeOp::TSequenceDescription& desc)
1245+
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal)
12451246
{
12461247
auto it = Sequences.find(pathId);
12471248
Y_VERIFY_S(it != Sequences.end(), "Sequence not found"
12481249
<< " pathId# " << pathId
12491250
<< " name# " << name);
1250-
DescribeSequence(pathId, name, it->second, desc);
1251+
DescribeSequence(pathId, name, it->second, desc, fillSetVal);
12511252
}
12521253

12531254
void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info,
1254-
NKikimrSchemeOp::TSequenceDescription& desc)
1255+
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal)
12551256
{
12561257
Y_VERIFY_S(info, "Empty sequence info"
12571258
<< " pathId# " << pathId
12581259
<< " name# " << name);
12591260

12601261
desc = info->Description;
12611262

1263+
if (!fillSetVal) {
1264+
desc.ClearSetVal();
1265+
}
1266+
12621267
desc.SetName(name);
12631268
PathIdFromPathId(pathId, desc.MutablePathId());
12641269
desc.SetVersion(info->AlterVersion);

ydb/core/tx/schemeshard/ut_helpers/helpers.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -2362,16 +2362,17 @@ namespace NSchemeShardUT_Private {
23622362
runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release()));
23632363
}
23642364

2365-
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) {
2365+
i64 WaitNextValResult(
2366+
TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus) {
23662367
auto ev = runtime.GrabEdgeEventRethrow<NSequenceProxy::TEvSequenceProxy::TEvNextValResult>(sender);
23672368
auto* msg = ev->Get();
2368-
UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS);
2369+
UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
23692370
return msg->Value;
23702371
}
23712372

2372-
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) {
2373+
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus) {
23732374
auto sender = runtime.AllocateEdgeActor(0);
23742375
SendNextValRequest(runtime, sender, path);
2375-
return WaitNextValResult(runtime, sender);
2376+
return WaitNextValResult(runtime, sender, expectedStatus);
23762377
}
23772378
}

ydb/core/tx/schemeshard/ut_helpers/helpers.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,11 @@ namespace NSchemeShardUT_Private {
553553
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);
554554

555555
void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
556-
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender);
557-
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path);
556+
i64 WaitNextValResult(
557+
TTestActorRuntime& runtime, const TActorId& sender,
558+
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
559+
i64 DoNextVal(
560+
TTestActorRuntime& runtime, const TString& path,
561+
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
558562

559563
} //NSchemeShardUT_Private

0 commit comments

Comments
 (0)