Skip to content

Commit dea009b

Browse files
authored
24-3: Break persistent locks on scheme tx (#11525)
1 parent afbd118 commit dea009b

9 files changed

+70
-10
lines changed

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2351,9 +2351,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
23512351
Value2 String,
23522352
PRIMARY KEY (Key)
23532353
);
2354+
)", TTxControl::NoTx()).ExtractValueSync();
2355+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2356+
2357+
result = db.ExecuteQuery(R"(
23542358
UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1");
23552359
SELECT * FROM TestDdlDml2;
23562360
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
2361+
)", TTxControl::NoTx()).ExtractValueSync();
2362+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
2363+
2364+
result = db.ExecuteQuery(R"(
2365+
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (1, "1");
2366+
SELECT * FROM TestDdlDml2;
23572367
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
23582368
SELECT * FROM TestDdlDml2;
23592369
CREATE TABLE TestDdlDml33 (
@@ -2363,7 +2373,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
23632373
)", TTxControl::NoTx()).ExtractValueSync();
23642374
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
23652375
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
2366-
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
2376+
CompareYson(R"([[[1u];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
23672377
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
23682378
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
23692379

ydb/core/tx/datashard/alter_table_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
151152
}
152153

153154
TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx);
154-
DataShard.AddUserTable(tableId, info);
155+
TDataShardLocksDb locksDb(DataShard, txc);
156+
DataShard.AddUserTable(tableId, info, &locksDb);
155157

156158
if (info->NeedSchemaSnapshots()) {
157159
DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/create_cdc_stream_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -40,7 +41,8 @@ class TCreateCdcStreamUnit : public TExecutionUnit {
4041
Y_ABORT_UNLESS(version);
4142

4243
auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
43-
DataShard.AddUserTable(pathId, tableInfo);
44+
TDataShardLocksDb locksDb(DataShard, txc);
45+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
4446

4547
if (tableInfo->NeedSchemaSnapshots()) {
4648
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/datashard.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1872,7 +1872,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
18721872
newTableInfo->StatsNeedUpdate = true;
18731873

18741874
TDataShardLocksDb locksDb(*this, txc);
1875-
18761875
RemoveUserTable(prevId, &locksDb);
18771876
AddUserTable(newId, newTableInfo);
18781877

@@ -1951,8 +1950,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
19511950
}
19521951

19531952
newTableInfo->SetSchema(schema);
1954-
1955-
AddUserTable(pathId, newTableInfo);
1953+
TDataShardLocksDb locksDb(*this, txc);
1954+
AddUserTable(pathId, newTableInfo, &locksDb);
19561955

19571956
if (newTableInfo->NeedSchemaSnapshots()) {
19581957
AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1636,7 +1636,10 @@ class TDataShard
16361636
TableInfos.erase(tableId.LocalPathId);
16371637
}
16381638

1639-
void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
1639+
void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) {
1640+
if (locksDb) {
1641+
SysLocks.RemoveSchema(tableId, locksDb);
1642+
}
16401643
TableInfos[tableId.LocalPathId] = tableInfo;
16411644
SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes);
16421645
Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo);

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3682,6 +3682,44 @@ Y_UNIT_TEST_SUITE(Cdc) {
36823682
MustNotLoseSchemaSnapshot(true);
36833683
}
36843684

3685+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) {
3686+
TPortManager portManager;
3687+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3688+
.SetUseRealThreads(false)
3689+
.SetDomainName("Root")
3690+
);
3691+
3692+
auto& runtime = *server->GetRuntime();
3693+
const auto edgeActor = runtime.AllocateEdgeActor();
3694+
3695+
SetupLogging(runtime);
3696+
InitRoot(server, edgeActor);
3697+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3698+
3699+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3700+
Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));
3701+
3702+
ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);");
3703+
3704+
TString sessionId;
3705+
TString txId;
3706+
KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);");
3707+
3708+
UNIT_ASSERT_VALUES_EQUAL(
3709+
KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"),
3710+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");
3711+
3712+
WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));
3713+
3714+
UNIT_ASSERT_VALUES_EQUAL(
3715+
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"),
3716+
"ERROR: ABORTED");
3717+
3718+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3719+
R"({"update":{"value":10},"key":[1]})",
3720+
});
3721+
}
3722+
36853723
Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
36863724
TPortManager portManager;
36873725
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

ydb/core/tx/datashard/drop_cdc_stream_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -40,7 +41,8 @@ class TDropCdcStreamUnit : public TExecutionUnit {
4041
Y_ABORT_UNLESS(version);
4142

4243
auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId);
43-
DataShard.AddUserTable(pathId, tableInfo);
44+
TDataShardLocksDb locksDb(DataShard, txc);
45+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
4446

4547
if (tableInfo->NeedSchemaSnapshots()) {
4648
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/drop_index_notice_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -52,7 +53,8 @@ class TDropIndexNoticeUnit : public TExecutionUnit {
5253
}
5354

5455
Y_ABORT_UNLESS(tableInfo);
55-
DataShard.AddUserTable(pathId, tableInfo);
56+
TDataShardLocksDb locksDb(DataShard, txc);
57+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
5658

5759
if (tableInfo->NeedSchemaSnapshots()) {
5860
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/initiate_build_index_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -53,7 +54,8 @@ class TInitiateBuildIndexUnit : public TExecutionUnit {
5354
}
5455

5556
Y_ABORT_UNLESS(tableInfo);
56-
DataShard.AddUserTable(pathId, tableInfo);
57+
TDataShardLocksDb locksDb(DataShard, txc);
58+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
5759

5860
if (tableInfo->NeedSchemaSnapshots()) {
5961
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

0 commit comments

Comments
 (0)