diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 5ace1b97d44b..9113d5d8edc9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -61,7 +61,7 @@ class TPropose: public TSubOperationState { NIceDb::TNiceDb db(context.GetDB()); context.SS->PersistCdcStream(db, pathId); - context.SS->CdcStreams[pathId] = stream->AlterData; + context.SS->CdcStreams[pathId]->FinishAlter(); context.SS->ClearDescribePathCaches(path); context.OnComplete.PublishToSchemeBoard(OperationId, pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 279ad7fee624..c227382a424c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2437,6 +2437,20 @@ struct TCdcStreamInfo : public TSimpleRefCount { return result; } + void FinishAlter() { + Y_ABORT_UNLESS(AlterData); + + AlterVersion = AlterData->AlterVersion; + Mode = AlterData->Mode; + Format = AlterData->Format; + VirtualTimestamps = AlterData->VirtualTimestamps; + ResolvedTimestamps = AlterData->ResolvedTimestamps; + AwsRegion = AlterData->AwsRegion; + State = AlterData->State; + + AlterData.Reset(); + } + ui64 AlterVersion = 1; EMode Mode; EFormat Format; diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index 560b63103c65..f81110939025 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -1,7 +1,9 @@ #include +#include #include #include #include +#include #include #include @@ -1754,6 +1756,80 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { env.TestWaitNotification(runtime, txId); } + Y_UNIT_TEST(RacyAlterStreamAndRestart) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableChangefeedInitialScan(true)); + ui64 txId = 100; + + TActorId schemeShardActorId; + auto findActorId = runtime.AddObserver([&](auto& ev) { + if (!schemeShardActorId) { + schemeShardActorId = ev->Sender; + } + }); + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TBlockEvents blockedAlterStream(runtime, [&](auto& ev) { + const auto& record = ev->Get()->Record; + if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpAlterCdcStream) { + txId = record.GetTxId(); + return true; + } + return false; + }); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId); + + runtime.WaitFor("AlterCdcStream", [&]{ return blockedAlterStream.size(); }); + blockedAlterStream.Stop(); + + UNIT_ASSERT(schemeShardActorId); + + TBlockEvents blockedProgress(runtime, [&](auto& ev) { + return schemeShardActorId == ev->Sender; + }); + + blockedAlterStream.Unblock(); + runtime.WaitFor("Progress", [&]{ return blockedProgress.size(); }); + blockedProgress.Stop(); + + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), + }); + + TestDropCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream" + )"); + env.TestWaitNotification(runtime, txId); + + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathNotExist, + }); + } + void Metering(bool serverless) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions()