Skip to content

Commit 28bd906

Browse files
committed
Allow streams on index table (ydb-platform#6827)
1 parent 1cbae05 commit 28bd906

12 files changed

+428
-160
lines changed

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,12 @@ class TAlterCdcStream: public TSubOperation {
139139
.NotDeleted()
140140
.IsTable()
141141
.NotAsyncReplicaTable()
142-
.IsCommonSensePath()
143142
.NotUnderOperation();
144143

144+
if (checks && !tablePath.IsInsideTableIndexPath()) {
145+
checks.IsCommonSensePath();
146+
}
147+
145148
if (!checks) {
146149
result->SetError(checks.GetStatus(), checks.GetError());
147150
return result;
@@ -366,10 +369,13 @@ class TAlterCdcStreamAtTable: public TSubOperation {
366369
.NotDeleted()
367370
.IsTable()
368371
.NotAsyncReplicaTable()
369-
.IsCommonSensePath()
370372
.NotUnderDeleting()
371373
.NotUnderOperation();
372374

375+
if (checks && !tablePath.IsInsideTableIndexPath()) {
376+
checks.IsCommonSensePath();
377+
}
378+
373379
if (!checks) {
374380
result->SetError(checks.GetStatus(), checks.GetError());
375381
return result;
@@ -511,9 +517,12 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
511517
.NotDeleted()
512518
.IsTable()
513519
.NotAsyncReplicaTable()
514-
.IsCommonSensePath()
515520
.NotUnderOperation();
516521

522+
if (checks && !tablePath.IsInsideTableIndexPath()) {
523+
checks.IsCommonSensePath();
524+
}
525+
517526
if (!checks) {
518527
return {CreateReject(opId, checks.GetStatus(), checks.GetError())};
519528
}

ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,12 @@ class TNewCdcStream: public TSubOperation {
128128
.NotDeleted()
129129
.IsTable()
130130
.NotAsyncReplicaTable()
131-
.IsCommonSensePath()
132131
.NotUnderDeleting();
133132

133+
if (checks && !tablePath.IsInsideTableIndexPath()) {
134+
checks.IsCommonSensePath();
135+
}
136+
134137
if (!checks) {
135138
result->SetError(checks.GetStatus(), checks.GetError());
136139
return result;
@@ -522,10 +525,13 @@ class TNewCdcStreamAtTable: public TSubOperation {
522525
.IsAtLocalSchemeShard()
523526
.IsResolved()
524527
.NotDeleted()
525-
.IsCommonSensePath()
526528
.IsLikeDirectory()
527529
.NotUnderDeleting();
528530

531+
if (checks && !workingDir.IsTableIndex()) {
532+
checks.IsCommonSensePath();
533+
}
534+
529535
if (!checks) {
530536
result->SetError(checks.GetStatus(), checks.GetError());
531537
return result;
@@ -543,10 +549,12 @@ class TNewCdcStreamAtTable: public TSubOperation {
543549
.NotDeleted()
544550
.IsTable()
545551
.NotAsyncReplicaTable()
546-
.IsCommonSensePath()
547552
.NotUnderDeleting();
548553

549554
if (checks) {
555+
if (!tablePath.IsInsideTableIndexPath()) {
556+
checks.IsCommonSensePath();
557+
}
550558
if (InitialScan) {
551559
checks.IsUnderTheSameOperation(OperationId.GetTxId()); // lock op
552560
} else {
@@ -675,10 +683,13 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
675683
.NotDeleted()
676684
.IsTable()
677685
.NotAsyncReplicaTable()
678-
.IsCommonSensePath()
679686
.NotUnderDeleting()
680687
.NotUnderOperation();
681688

689+
if (checks && !tablePath.IsInsideTableIndexPath()) {
690+
checks.IsCommonSensePath();
691+
}
692+
682693
if (!checks) {
683694
return {CreateReject(opId, checks.GetStatus(), checks.GetError())};
684695
}

ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,12 @@ class TCreateLock: public TSubOperation {
124124
.IsResolved()
125125
.NotDeleted()
126126
.NotUnderDeleting()
127-
.IsCommonSensePath()
128127
.IsLikeDirectory();
129128

129+
if (checks && !parentPath.IsTableIndex()) {
130+
checks.IsCommonSensePath();
131+
}
132+
130133
if (!checks) {
131134
result->SetError(checks.GetStatus(), checks.GetError());
132135
return result;
@@ -143,8 +146,11 @@ class TCreateLock: public TSubOperation {
143146
.NotUnderDeleting()
144147
.NotUnderOperation()
145148
.IsTable()
146-
.NotAsyncReplicaTable()
147-
.IsCommonSensePath();
149+
.NotAsyncReplicaTable();
150+
151+
if (checks && !parentPath.IsTableIndex()) {
152+
checks.IsCommonSensePath();
153+
}
148154

149155
if (!checks) {
150156
result->SetError(checks.GetStatus(), checks.GetError());

ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,13 @@ class TDropCdcStream: public TSubOperation {
146146
.NotDeleted()
147147
.IsTable()
148148
.NotAsyncReplicaTable()
149-
.IsCommonSensePath()
150149
.IsUnderOperation()
151150
.IsUnderTheSameOperation(OperationId.GetTxId());
152151

152+
if (checks && !tablePath.IsInsideTableIndexPath()) {
153+
checks.IsCommonSensePath();
154+
}
155+
153156
if (!checks) {
154157
result->SetError(checks.GetStatus(), checks.GetError());
155158
return result;
@@ -328,10 +331,13 @@ class TDropCdcStreamAtTable: public TSubOperation {
328331
.NotDeleted()
329332
.IsTable()
330333
.NotAsyncReplicaTable()
331-
.IsCommonSensePath()
332334
.NotUnderDeleting()
333335
.NotUnderOperation();
334336

337+
if (checks && !tablePath.IsInsideTableIndexPath()) {
338+
checks.IsCommonSensePath();
339+
}
340+
335341
if (!checks) {
336342
result->SetError(checks.GetStatus(), checks.GetError());
337343
return result;
@@ -473,10 +479,13 @@ TVector<ISubOperation::TPtr> CreateDropCdcStream(TOperationId opId, const TTxTra
473479
.NotDeleted()
474480
.IsTable()
475481
.NotAsyncReplicaTable()
476-
.IsCommonSensePath()
477482
.NotUnderDeleting()
478483
.NotUnderOperation();
479484

485+
if (checks && !tablePath.IsInsideTableIndexPath()) {
486+
checks.IsCommonSensePath();
487+
}
488+
480489
if (!checks) {
481490
return {CreateReject(opId, checks.GetStatus(), checks.GetError())};
482491
}

ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -473,21 +473,22 @@ TVector<ISubOperation::TPtr> CreateDropIndex(TOperationId nextId, const TTxTrans
473473
result.push_back(CreateDropTableIndex(NextPartId(nextId, result), indexDropping));
474474
}
475475

476-
for (const auto& items: indexPath.Base()->GetChildren()) {
477-
Y_ABORT_UNLESS(context.SS->PathsById.contains(items.second));
478-
auto implPath = context.SS->PathsById.at(items.second);
479-
if (implPath->Dropped()) {
476+
for (const auto& [childName, childPathId] : indexPath.Base()->GetChildren()) {
477+
TPath child = indexPath.Child(childName);
478+
if (child.IsDeleted()) {
480479
continue;
481480
}
482481

483-
auto implTable = context.SS->PathsById.at(items.second);
484-
Y_ABORT_UNLESS(implTable->IsTable());
482+
Y_ABORT_UNLESS(child.Base()->IsTable());
485483

486484
auto implTableDropping = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
487485
auto operation = implTableDropping.MutableDrop();
488-
operation->SetName(items.first);
486+
operation->SetName(child.LeafName());
489487

490488
result.push_back(CreateDropTable(NextPartId(nextId, result), implTableDropping));
489+
if (auto reject = CascadeDropTableChildren(result, nextId, child)) {
490+
return {reject};
491+
}
491492
}
492493

493494
return result;

ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp

Lines changed: 2 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -443,102 +443,8 @@ TVector<ISubOperation::TPtr> CreateDropIndexedTable(TOperationId nextId, const T
443443

444444
TVector<ISubOperation::TPtr> result;
445445
result.push_back(CreateDropTable(NextPartId(nextId, result), tx));
446-
447-
for (const auto& [childName, childPathId] : table.Base()->GetChildren()) {
448-
TPath child = table.Child(childName);
449-
{
450-
TPath::TChecker checks = child.Check();
451-
checks
452-
.NotEmpty()
453-
.IsResolved();
454-
455-
if (checks) {
456-
if (child.IsDeleted()) {
457-
continue;
458-
}
459-
}
460-
461-
if (child.IsTableIndex()) {
462-
checks.IsTableIndex();
463-
} else if (child.IsCdcStream()) {
464-
checks.IsCdcStream();
465-
} else if (child.IsSequence()) {
466-
checks.IsSequence();
467-
}
468-
469-
checks.NotDeleted()
470-
.NotUnderDeleting()
471-
.NotUnderOperation();
472-
473-
if (!checks) {
474-
return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
475-
}
476-
}
477-
Y_ABORT_UNLESS(child.Base()->PathId == childPathId);
478-
479-
if (child.IsSequence()) {
480-
auto dropSequence = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence);
481-
dropSequence.MutableDrop()->SetName(ToString(child->Name));
482-
483-
result.push_back(CreateDropSequence(NextPartId(nextId, result), dropSequence));
484-
continue;
485-
} else if (child.IsTableIndex()) {
486-
auto dropIndex = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex);
487-
dropIndex.MutableDrop()->SetName(ToString(child.Base()->Name));
488-
489-
result.push_back(CreateDropTableIndex(NextPartId(nextId, result), dropIndex));
490-
} else if (child.IsCdcStream()) {
491-
auto dropStream = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl);
492-
dropStream.MutableDrop()->SetName(ToString(child.Base()->Name));
493-
494-
result.push_back(CreateDropCdcStreamImpl(NextPartId(nextId, result), dropStream));
495-
}
496-
497-
Y_ABORT_UNLESS(child.Base()->GetChildren().size() == 1);
498-
for (auto& [implName, implPathId] : child.Base()->GetChildren()) {
499-
Y_ABORT_UNLESS(implName == "indexImplTable" || implName == "streamImpl",
500-
"unexpected name %s", implName.c_str());
501-
502-
TPath implPath = child.Child(implName);
503-
{
504-
TPath::TChecker checks = implPath.Check();
505-
checks
506-
.NotEmpty()
507-
.IsResolved()
508-
.NotDeleted()
509-
.NotUnderDeleting()
510-
.NotUnderOperation();
511-
512-
if (checks) {
513-
if (implPath.Base()->IsTable()) {
514-
checks
515-
.IsTable()
516-
.IsInsideTableIndexPath();
517-
} else if (implPath.Base()->IsPQGroup()) {
518-
checks
519-
.IsPQGroup()
520-
.IsInsideCdcStreamPath();
521-
}
522-
}
523-
524-
if (!checks) {
525-
return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
526-
}
527-
}
528-
Y_ABORT_UNLESS(implPath.Base()->PathId == implPathId);
529-
530-
if (implPath.Base()->IsTable()) {
531-
auto dropIndexTable = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
532-
dropIndexTable.MutableDrop()->SetName(ToString(implPath.Base()->Name));
533-
534-
result.push_back(CreateDropTable(NextPartId(nextId, result), dropIndexTable));
535-
} else if (implPath.Base()->IsPQGroup()) {
536-
auto dropPQGroup = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup);
537-
dropPQGroup.MutableDrop()->SetName(ToString(implPath.Base()->Name));
538-
539-
result.push_back(CreateDropPQ(NextPartId(nextId, result), dropPQGroup));
540-
}
541-
}
446+
if (auto reject = CascadeDropTableChildren(result, nextId, table)) {
447+
return {reject};
542448
}
543449

544450
return result;

ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,12 @@ class TDropLock: public TSubOperation {
119119
.IsResolved()
120120
.NotDeleted()
121121
.NotUnderDeleting()
122-
.IsCommonSensePath()
123122
.IsLikeDirectory();
124123

124+
if (checks && !parentPath.IsTableIndex()) {
125+
checks.IsCommonSensePath();
126+
}
127+
125128
if (!checks) {
126129
result->SetError(checks.GetStatus(), checks.GetError());
127130
return result;
@@ -134,10 +137,12 @@ class TDropLock: public TSubOperation {
134137
checks
135138
.IsAtLocalSchemeShard()
136139
.IsResolved()
137-
.NotUnderDeleting()
138-
.IsCommonSensePath();
140+
.NotUnderDeleting();
139141

140142
if (checks) {
143+
if (!parentPath.IsTableIndex()) {
144+
checks.IsCommonSensePath();
145+
}
141146
if (dstPath.IsUnderOperation()) { // may be part of a consistent operation
142147
checks.IsUnderTheSameOperation(OperationId.GetTxId());
143148
} else {

0 commit comments

Comments
 (0)