Skip to content

Commit 42b6cd6

Browse files
committed
Allow streams on index table (ydb-platform#6827)
1 parent 66d836f commit 42b6cd6

19 files changed

+512
-411
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -913,10 +913,6 @@ message TCreateCdcStream {
913913
optional TCdcStreamDescription StreamDescription = 2;
914914
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
915915
optional uint32 TopicPartitions = 4;
916-
oneof IndexMode {
917-
google.protobuf.Empty AllIndexes = 5; // Create topic per each index
918-
string IndexName = 6;
919-
}
920916
}
921917

922918
message TAlterCdcStream {
@@ -1631,7 +1627,6 @@ message TIndexBuildControl {
16311627

16321628
message TLockConfig {
16331629
optional string Name = 1;
1634-
optional bool AllowIndexImplLock = 2;
16351630
}
16361631

16371632
message TLockGuard {

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,12 @@ class TAlterCdcStream: public TSubOperation {
143143
.NotDeleted()
144144
.IsTable()
145145
.NotAsyncReplicaTable()
146-
.IsCommonSensePath()
147146
.NotUnderOperation();
148147

148+
if (checks && !tablePath.IsInsideTableIndexPath()) {
149+
checks.IsCommonSensePath();
150+
}
151+
149152
if (!checks) {
150153
result->SetError(checks.GetStatus(), checks.GetError());
151154
return result;
@@ -370,10 +373,13 @@ class TAlterCdcStreamAtTable: public TSubOperation {
370373
.NotDeleted()
371374
.IsTable()
372375
.NotAsyncReplicaTable()
373-
.IsCommonSensePath()
374376
.NotUnderDeleting()
375377
.NotUnderOperation();
376378

379+
if (checks && !tablePath.IsInsideTableIndexPath()) {
380+
checks.IsCommonSensePath();
381+
}
382+
377383
if (!checks) {
378384
result->SetError(checks.GetStatus(), checks.GetError());
379385
return result;
@@ -476,10 +482,10 @@ class TAlterCdcStreamAtTable: public TSubOperation {
476482
} // anonymous
477483

478484
std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
479-
const TOperationId& opId,
480-
const TPath& workingDirPath,
481-
const TString& tableName,
482-
const TString& streamName)
485+
const TOperationId& opId,
486+
const TPath& workingDirPath,
487+
const TString& tableName,
488+
const TString& streamName)
483489
{
484490
const auto tablePath = workingDirPath.Child(tableName);
485491
{
@@ -492,9 +498,12 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
492498
.NotDeleted()
493499
.IsTable()
494500
.NotAsyncReplicaTable()
495-
.IsCommonSensePath()
496501
.NotUnderOperation();
497502

503+
if (checks && !tablePath.IsInsideTableIndexPath()) {
504+
checks.IsCommonSensePath();
505+
}
506+
498507
if (!checks) {
499508
return CreateReject(opId, checks.GetStatus(), checks.GetError());
500509
}
@@ -521,11 +530,11 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
521530
}
522531

523532
void DoAlterStream(
524-
const NKikimrSchemeOp::TAlterCdcStream& op,
525-
const TOperationId& opId,
526-
const TPath& workingDirPath,
527-
const TPath& tablePath,
528-
TVector<ISubOperation::TPtr>& result)
533+
TVector<ISubOperation::TPtr>& result,
534+
const NKikimrSchemeOp::TAlterCdcStream& op,
535+
const TOperationId& opId,
536+
const TPath& workingDirPath,
537+
const TPath& tablePath)
529538
{
530539
{
531540
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
@@ -601,7 +610,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
601610

602611
TVector<ISubOperation::TPtr> result;
603612

604-
DoAlterStream(op, opId, workingDirPath, tablePath, result);
613+
DoAlterStream(result, op, opId, workingDirPath, tablePath);
605614

606615
if (op.HasGetReady()) {
607616
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#pragma once
22

3-
#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
43
#include "schemeshard__operation_common.h"
4+
#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
55
#include "schemeshard__operation_part.h"
66
#include "schemeshard_impl.h"
77

@@ -17,10 +17,10 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
1717
const TString& streamName);
1818

1919
void DoAlterStream(
20+
TVector<ISubOperation::TPtr>& result,
2021
const NKikimrSchemeOp::TAlterCdcStream& op,
2122
const TOperationId& opId,
2223
const TPath& workingDirPath,
23-
const TPath& tablePath,
24-
TVector<ISubOperation::TPtr>& result);
24+
const TPath& tablePath);
2525

2626
} // namespace NKikimr::NSchemesShard::NCdc

ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
#include "schemeshard__operation_part.h"
1+
#include "schemeshard__operation_alter_cdc_stream.h"
22
#include "schemeshard__operation_common.h"
3+
#include "schemeshard__operation_part.h"
34
#include "schemeshard_impl.h"
45

5-
#include "schemeshard__operation_alter_cdc_stream.h"
6-
76
#include <ydb/core/tx/schemeshard/backup/constants.h>
87

98
#include <ydb/core/engine/mkql_proto.h>
@@ -111,7 +110,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons
111110

112111
TVector<ISubOperation::TPtr> result;
113112

114-
NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);
113+
NCdc::DoAlterStream(result, alterCdcStreamOp, opId, workingDirPath, tablePath);
115114

116115
if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) {
117116
DoCreateIncBackupTable(opId, backupTablePath, schema, result);

0 commit comments

Comments
 (0)