Skip to content

Commit 43b3f6e

Browse files
authored
24-3: Do not lose ScanShards when altering (#9377)
1 parent 604788d commit 43b3f6e

File tree

3 files changed

+91
-1
lines changed

3 files changed

+91
-1
lines changed

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TPropose: public TSubOperationState {
6161
NIceDb::TNiceDb db(context.GetDB());
6262

6363
context.SS->PersistCdcStream(db, pathId);
64-
context.SS->CdcStreams[pathId] = stream->AlterData;
64+
context.SS->CdcStreams[pathId]->FinishAlter();
6565

6666
context.SS->ClearDescribePathCaches(path);
6767
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);

ydb/core/tx/schemeshard/schemeshard_info_types.h

+14
Original file line numberDiff line numberDiff line change
@@ -2437,6 +2437,20 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
24372437
return result;
24382438
}
24392439

2440+
void FinishAlter() {
2441+
Y_ABORT_UNLESS(AlterData);
2442+
2443+
AlterVersion = AlterData->AlterVersion;
2444+
Mode = AlterData->Mode;
2445+
Format = AlterData->Format;
2446+
VirtualTimestamps = AlterData->VirtualTimestamps;
2447+
ResolvedTimestamps = AlterData->ResolvedTimestamps;
2448+
AwsRegion = AlterData->AwsRegion;
2449+
State = AlterData->State;
2450+
2451+
AlterData.Reset();
2452+
}
2453+
24402454
ui64 AlterVersion = 1;
24412455
EMode Mode;
24422456
EFormat Format;

ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp

+76
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#include <ydb/core/metering/metering.h>
2+
#include <ydb/core/testlib/actors/block_events.h>
23
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
34
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
45
#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
6+
#include <ydb/core/tx/schemeshard/schemeshard_private.h>
57

68
#include <library/cpp/json/json_reader.h>
79
#include <library/cpp/json/json_writer.h>
@@ -1754,6 +1756,80 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
17541756
env.TestWaitNotification(runtime, txId);
17551757
}
17561758

1759+
Y_UNIT_TEST(RacyAlterStreamAndRestart) {
1760+
TTestBasicRuntime runtime;
1761+
TTestEnv env(runtime, TTestEnvOptions()
1762+
.EnableChangefeedInitialScan(true));
1763+
ui64 txId = 100;
1764+
1765+
TActorId schemeShardActorId;
1766+
auto findActorId = runtime.AddObserver<TEvSchemeShard::TEvModifySchemeTransactionResult>([&](auto& ev) {
1767+
if (!schemeShardActorId) {
1768+
schemeShardActorId = ev->Sender;
1769+
}
1770+
});
1771+
1772+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
1773+
Name: "Table"
1774+
Columns { Name: "key" Type: "Uint64" }
1775+
Columns { Name: "value" Type: "Uint64" }
1776+
KeyColumnNames: ["key"]
1777+
)");
1778+
env.TestWaitNotification(runtime, txId);
1779+
1780+
TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> blockedAlterStream(runtime, [&](auto& ev) {
1781+
const auto& record = ev->Get()->Record;
1782+
if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpAlterCdcStream) {
1783+
txId = record.GetTxId();
1784+
return true;
1785+
}
1786+
return false;
1787+
});
1788+
1789+
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
1790+
TableName: "Table"
1791+
StreamDescription {
1792+
Name: "Stream"
1793+
Mode: ECdcStreamModeKeysOnly
1794+
Format: ECdcStreamFormatProto
1795+
State: ECdcStreamStateScan
1796+
}
1797+
)");
1798+
env.TestWaitNotification(runtime, txId);
1799+
1800+
runtime.WaitFor("AlterCdcStream", [&]{ return blockedAlterStream.size(); });
1801+
blockedAlterStream.Stop();
1802+
1803+
UNIT_ASSERT(schemeShardActorId);
1804+
1805+
TBlockEvents<TEvPrivate::TEvProgressOperation> blockedProgress(runtime, [&](auto& ev) {
1806+
return schemeShardActorId == ev->Sender;
1807+
});
1808+
1809+
blockedAlterStream.Unblock();
1810+
runtime.WaitFor("Progress", [&]{ return blockedProgress.size(); });
1811+
blockedProgress.Stop();
1812+
1813+
RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
1814+
env.TestWaitNotification(runtime, txId);
1815+
1816+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
1817+
NLs::PathExist,
1818+
NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
1819+
});
1820+
1821+
TestDropCdcStream(runtime, ++txId, "/MyRoot", R"(
1822+
TableName: "Table"
1823+
StreamName: "Stream"
1824+
)");
1825+
env.TestWaitNotification(runtime, txId);
1826+
1827+
RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
1828+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
1829+
NLs::PathNotExist,
1830+
});
1831+
}
1832+
17571833
void Metering(bool serverless) {
17581834
TTestBasicRuntime runtime;
17591835
TTestEnv env(runtime, TTestEnvOptions()

0 commit comments

Comments
 (0)