From eca6c9ff40005d78aa4b35920e36f5598d8e7ea8 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 19 Jul 2024 12:59:38 +0300 Subject: [PATCH] Stream on index table: additional checks & tests --- ...chemeshard__operation_alter_cdc_stream.cpp | 11 ++- ...hemeshard__operation_create_cdc_stream.cpp | 8 ++ ...schemeshard__operation_drop_cdc_stream.cpp | 8 ++ .../ut_cdc_stream/ut_cdc_stream.cpp | 75 ++++++++++++++++--- 4 files changed, 88 insertions(+), 14 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 7b95b2b3ac82..5ace1b97d44b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -539,18 +539,15 @@ void DoAlterStream( { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl); outTx.MutableAlterCdcStream()->CopyFrom(op); - if (op.HasGetReady()) { outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId()); } result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx)); } - { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable); outTx.MutableAlterCdcStream()->CopyFrom(op); - if (op.HasGetReady()) { outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId()); } @@ -622,6 +619,14 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr result.push_back(DropLock(NextPartId(opId, result), outTx)); } + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } + return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index b759660e815e..4715c2db1e29 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -955,6 +955,14 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran DoCreateLock(result, opId, workingDirPath, tablePath); } + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); auto table = context.SS->Tables.at(tablePath.Base()->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index fac3fe3443a8..1654ca58bdba 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -539,6 +539,14 @@ void DoDropStream( result.push_back(DropLock(NextPartId(opId, result), outTx)); } + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } + { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); outTx.MutableDrop()->SetName(streamPath.Base()->Name); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index 256ef3cdcf38..8d2a9cd92504 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -1224,6 +1224,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(2)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(3)}); + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( TableName: "indexImplTable" StreamDescription { @@ -1252,12 +1255,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { - NLs::PathExist, - }); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), { - NLs::PathExist, - }); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(3)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(4)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {NLs::PathExist}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {NLs::PathExist}); TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( TableName: "indexImplTable" @@ -1272,6 +1273,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(4)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(5)}); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), }); @@ -1287,12 +1290,62 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { - NLs::PathNotExist, - }); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), { - NLs::PathNotExist, + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(5)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(6)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {NLs::PathNotExist}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {NLs::PathNotExist}); + } + + Y_UNIT_TEST(StreamOnBuildingIndexTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + THolder blockedBuildIndexRequest; + auto blockBuildIndexRequest = runtime.AddObserver([&](auto& ev) { + blockedBuildIndexRequest.Reset(ev.Release()); }); + + AsyncBuildIndex(runtime, ++txId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", "Index", {"indexed"}); + const auto buildIndexId = txId; + { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&blockedBuildIndexRequest](IEventHandle&) { + return bool(blockedBuildIndexRequest); + }); + runtime.DispatchEvents(opts); + } + blockBuildIndexRequest.Remove(); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusMultipleModifications}); + + runtime.Send(blockedBuildIndexRequest.Release(), 0, true); + env.TestWaitNotification(runtime, buildIndexId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); } Y_UNIT_TEST(DropIndexWithStream) {