Skip to content

Commit 4880f2d

Browse files
authored
Fix datashard sink tests (#15550)
1 parent 0a870d7 commit 4880f2d

10 files changed

+270
-91
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ class TKqpTransactionManager : public IKqpTransactionManager {
9191
auto& shardInfo = ShardsInfo.at(shardId);
9292
if (auto lockPtr = shardInfo.Locks.FindPtr(lock.GetKey()); lockPtr) {
9393
if (lock.Proto.GetHasWrites()) {
94-
AFL_ENSURE(!ReadOnly);
9594
lockPtr->Lock.Proto.SetHasWrites(true);
9695
}
9796

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+2-7
Original file line numberDiff line numberDiff line change
@@ -346,13 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
346346
}
347347

348348
void HandleFinalize(TEvents::TEvUndelivered::TPtr&) {
349-
if (Request.LocksOp == ELocksOp::Commit && !ReadOnlyTx) {
350-
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, "Buffer actor isn't available. Operation state unknown.");
351-
ReplyErrorAndDie(Ydb::StatusIds::UNDETERMINED, issue);
352-
} else {
353-
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, "Buffer actor isn't available.");
354-
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
355-
}
349+
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, "Buffer actor isn't available.");
350+
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
356351
}
357352

358353
void MakeResponseAndPassAway() {

ydb/core/kqp/runtime/kqp_write_actor.cpp

+22-13
Original file line numberDiff line numberDiff line change
@@ -797,19 +797,20 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
797797
return builder;
798798
}());
799799

800-
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
801-
Y_ABORT_UNLESS(Mode == EMode::WRITE);
802-
if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) {
803-
YQL_ENSURE(TxManager->BrokenLocks());
804-
NYql::TIssues issues;
805-
issues.AddIssue(*TxManager->GetLockIssue());
806-
RuntimeError(
807-
NYql::NDqProto::StatusIds::ABORTED,
808-
NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED,
809-
TStringBuilder() << "Transaction locks invalidated. Table `"
810-
<< TablePath << "`.",
811-
issues);
812-
return;
800+
if (Mode == EMode::WRITE) {
801+
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
802+
if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) {
803+
YQL_ENSURE(TxManager->BrokenLocks());
804+
NYql::TIssues issues;
805+
issues.AddIssue(*TxManager->GetLockIssue());
806+
RuntimeError(
807+
NYql::NDqProto::StatusIds::ABORTED,
808+
NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED,
809+
TStringBuilder() << "Transaction locks invalidated. Table `"
810+
<< TablePath << "`.",
811+
issues);
812+
return;
813+
}
813814
}
814815
}
815816

@@ -1164,6 +1165,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
11641165
Callbacks->OnError(statusCode, std::move(issues));
11651166
}
11661167

1168+
void Unlink() {
1169+
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
1170+
}
1171+
11671172
void PassAway() override {;
11681173
Counters->WriteActorsCount->Dec();
11691174
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
@@ -2677,6 +2682,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
26772682
});
26782683
ExecuterActorId = {};
26792684
Y_ABORT_UNLESS(GetTotalMemory() == 0);
2685+
2686+
for (auto& [_, info] : WriteInfos) {
2687+
info.WriteTableActor->Unlink();
2688+
}
26802689
}
26812690

26822691
void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, NYql::EYqlIssueCode id, const TString& message, const NYql::TIssues& subIssues) override {

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1751,7 +1751,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17511751
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << " send to: " << ExecuterId << " from: " << ev->Sender;
17521752

17531753
if (!QueryState || !QueryState->TxCtx || QueryState->TxCtx->BufferActorId != ev->Sender) {
1754-
LOG_E(logMsg << ": Old error.");
1754+
LOG_E(logMsg << ": Ignored error.");
17551755
return;
17561756
} else {
17571757
LOG_W(logMsg);

ydb/core/tx/datashard/datashard_ut_order.cpp

+28-9
Original file line numberDiff line numberDiff line change
@@ -2020,11 +2020,14 @@ Y_UNIT_TEST(TestPlannedTimeoutSplit) {
20202020
}
20212021
}
20222022

2023-
Y_UNIT_TEST(TestPlannedHalfOverloadedSplit) {
2023+
Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseSink) {
20242024
TPortManager pm;
20252025
TServerSettings serverSettings(pm.GetPort(2134));
2026+
NKikimrConfig::TAppConfig app;
2027+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
20262028
serverSettings.SetDomainName("Root")
2027-
.SetUseRealThreads(false);
2029+
.SetUseRealThreads(false)
2030+
.SetAppConfig(app);
20282031

20292032
Tests::TServer::TPtr server = new TServer(serverSettings);
20302033
auto &runtime = *server->GetRuntime();
@@ -2055,7 +2058,8 @@ Y_UNIT_TEST(TestPlannedHalfOverloadedSplit) {
20552058
TVector<THolder<IEventHandle>> txProposeResults;
20562059
auto captureMessages = [&](TAutoPtr<IEventHandle> &event) -> auto {
20572060
switch (event->GetTypeRewrite()) {
2058-
case TEvDataShard::EvProposeTransaction: {
2061+
case TEvDataShard::EvProposeTransaction:
2062+
case NKikimr::NEvents::TDataEvents::EvWrite: {
20592063
Cerr << "---- observed EvProposeTransactionResult ----" << Endl;
20602064
if (txProposes.size() == 0) {
20612065
// Capture the first propose
@@ -2064,7 +2068,8 @@ Y_UNIT_TEST(TestPlannedHalfOverloadedSplit) {
20642068
}
20652069
break;
20662070
}
2067-
case TEvDataShard::EvProposeTransactionResult: {
2071+
case TEvDataShard::EvProposeTransactionResult:
2072+
case NKikimr::NEvents::TDataEvents::EvWriteResult: {
20682073
Cerr << "---- observed EvProposeTransactionResult ----" << Endl;
20692074
if (txProposes.size() > 0) {
20702075
// Capture all propose results
@@ -2452,11 +2457,14 @@ Y_UNIT_TEST(TestReadTableSingleShardImmediate) {
24522457
UNIT_ASSERT_VALUES_EQUAL(seenPlanSteps, 0u);
24532458
}
24542459

2455-
Y_UNIT_TEST(TestImmediateQueueThenSplit) {
2460+
Y_UNIT_TEST_TWIN(TestImmediateQueueThenSplit, UseSink) {
24562461
TPortManager pm;
24572462
TServerSettings serverSettings(pm.GetPort(2134));
2463+
NKikimrConfig::TAppConfig app;
2464+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
24582465
serverSettings.SetDomainName("Root")
2459-
.SetUseRealThreads(false);
2466+
.SetUseRealThreads(false)
2467+
.SetAppConfig(app);
24602468

24612469
Tests::TServer::TPtr server = new TServer(serverSettings);
24622470
auto &runtime = *server->GetRuntime();
@@ -2499,6 +2507,7 @@ Y_UNIT_TEST(TestImmediateQueueThenSplit) {
24992507
}
25002508
break;
25012509
case TEvDataShard::EvProposeTransaction:
2510+
case NKikimr::NEvents::TDataEvents::EvWrite:
25022511
if (capturePropose) {
25032512
Cerr << "---- capture EvProposeTransaction ----" << Endl;
25042513
eventsPropose.emplace_back(event.Release());
@@ -2620,10 +2629,11 @@ Y_UNIT_TEST(TestImmediateQueueThenSplit) {
26202629
<< failures << " failures");
26212630
}
26222631

2623-
void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query) {
2632+
void TestLateKqpQueryAfterColumnDrop(bool dataQuery, bool useSink, const TString& query) {
26242633
TPortManager pm;
26252634
NKikimrConfig::TAppConfig app;
26262635
app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(false);
2636+
app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
26272637
TServerSettings serverSettings(pm.GetPort(2134));
26282638
serverSettings.SetDomainName("Root")
26292639
.SetUseRealThreads(false)
@@ -2671,6 +2681,15 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query) {
26712681
break;
26722682
}
26732683

2684+
case NKikimr::NEvents::TDataEvents::EvWrite: {
2685+
if (capturePropose) {
2686+
Cerr << "---- capture EvWrite ----" << Endl;
2687+
eventsPropose.emplace_back(ev.Release());
2688+
return TTestActorRuntime::EEventAction::DROP;
2689+
}
2690+
break;
2691+
}
2692+
26742693
case TEvDataShard::EvKqpScan: {
26752694
if (capturePropose) {
26762695
Cerr << "---- capture EvKqpScan ----" << Endl;
@@ -2735,8 +2754,8 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query) {
27352754
}
27362755
}
27372756

2738-
Y_UNIT_TEST(TestLateKqpScanAfterColumnDrop) {
2739-
TestLateKqpQueryAfterColumnDrop(false, "SELECT SUM(value2) FROM `/Root/table-1`");
2757+
Y_UNIT_TEST_TWIN(TestLateKqpScanAfterColumnDrop, UseSink) {
2758+
TestLateKqpQueryAfterColumnDrop(false, UseSink, "SELECT SUM(value2) FROM `/Root/table-1`");
27402759
}
27412760

27422761
Y_UNIT_TEST(TestSecondaryClearanceAfterShardRestartRace) {

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

+15-6
Original file line numberDiff line numberDiff line change
@@ -2570,16 +2570,19 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
25702570
});
25712571
}
25722572

2573-
Y_UNIT_TEST(ShouldReadFromHeadWithConflict) {
2573+
Y_UNIT_TEST_TWIN(ShouldReadFromHeadWithConflict, UseSink) {
25742574
// Similar to ShouldReadFromHead, but there is conflicting hanged operation.
25752575
// We will read all at once thus should not block
25762576

25772577
TPortManager pm;
25782578
TServerSettings serverSettings(pm.GetPort(2134));
2579+
NKikimrConfig::TAppConfig app;
2580+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
25792581
serverSettings.SetDomainName("Root")
25802582
.SetUseRealThreads(false)
25812583
// Blocked volatile transactions block reads, disable
2582-
.SetEnableDataShardVolatileTransactions(false);
2584+
.SetEnableDataShardVolatileTransactions(false)
2585+
.SetAppConfig(app);
25832586

25842587
const ui64 shardCount = 1;
25852588
TTestHelper helper(serverSettings, shardCount);
@@ -2624,16 +2627,19 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
26242627
}
26252628
}
26262629

2627-
Y_UNIT_TEST(ShouldReadFromHeadToMvccWithConflict) {
2630+
Y_UNIT_TEST_TWIN(ShouldReadFromHeadToMvccWithConflict, UseSink) {
26282631
// Similar to ShouldProperlyOrderConflictingTransactionsMvcc, but we read HEAD
26292632
//
26302633
// In this test HEAD read waits conflicting transaction: first time we read from HEAD and
26312634
// notice that result it not full. Then restart after conflicting operation finishes
26322635

26332636
TPortManager pm;
26342637
TServerSettings serverSettings(pm.GetPort(2134));
2638+
NKikimrConfig::TAppConfig app;
2639+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
26352640
serverSettings.SetDomainName("Root")
2636-
.SetUseRealThreads(false);
2641+
.SetUseRealThreads(false)
2642+
.SetAppConfig(app);
26372643

26382644
const ui64 shardCount = 1;
26392645
TTestHelper helper(serverSettings, shardCount);
@@ -2714,7 +2720,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
27142720
}
27152721
}
27162722

2717-
Y_UNIT_TEST(ShouldProperlyOrderConflictingTransactionsMvcc) {
2723+
Y_UNIT_TEST_TWIN(ShouldProperlyOrderConflictingTransactionsMvcc, UseSink) {
27182724
// 1. Start read-write multishard transaction: readset will be blocked
27192725
// to hang transaction. Write is the key we want to read.
27202726
// 2a. Check that we can read prior blocked step.
@@ -2727,8 +2733,11 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
27272733

27282734
TPortManager pm;
27292735
TServerSettings serverSettings(pm.GetPort(2134));
2736+
NKikimrConfig::TAppConfig app;
2737+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
27302738
serverSettings.SetDomainName("Root")
2731-
.SetUseRealThreads(false);
2739+
.SetUseRealThreads(false)
2740+
.SetAppConfig(app);
27322741

27332742
const ui64 shardCount = 1;
27342743
TTestHelper helper(serverSettings, shardCount);

ydb/core/tx/datashard/datashard_ut_replication.cpp

+12-4
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,14 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
228228
DoSplitMergeChanges(true);
229229
}
230230

231-
Y_UNIT_TEST(ReplicatedTable) {
231+
Y_UNIT_TEST_TWIN(ReplicatedTable, UseSink) {
232232
TPortManager pm;
233233
TServerSettings serverSettings(pm.GetPort(2134));
234+
NKikimrConfig::TAppConfig app;
235+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
234236
serverSettings.SetDomainName("Root")
235-
.SetUseRealThreads(false);
237+
.SetUseRealThreads(false)
238+
.SetAppConfig(app);
236239

237240
Tests::TServer::TPtr server = new TServer(serverSettings);
238241
auto &runtime = *server->GetRuntime();
@@ -244,8 +247,13 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
244247
CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions().Replicated(true));
245248

246249
ExecSQL(server, sender, "SELECT * FROM `/Root/table-1`");
247-
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true,
248-
Ydb::StatusIds::GENERIC_ERROR);
250+
if (UseSink) {
251+
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true,
252+
Ydb::StatusIds::BAD_REQUEST);
253+
} else {
254+
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true,
255+
Ydb::StatusIds::GENERIC_ERROR);
256+
}
249257

250258
WaitTxNotification(server, sender, AsyncAlterDropReplicationConfig(server, "/Root", "table-1"));
251259
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);");

ydb/core/tx/datashard/datashard_ut_rs.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,16 @@ struct IsReadSet {
6363
};
6464

6565
Y_UNIT_TEST_SUITE(TDataShardRSTest) {
66-
Y_UNIT_TEST(TestCleanupInRS) {
66+
Y_UNIT_TEST_TWIN(TestCleanupInRS, UseSink) {
6767
TPortManager pm;
6868
TServerSettings serverSettings(pm.GetPort(2134));
69+
NKikimrConfig::TAppConfig app;
70+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
6971
serverSettings.SetDomainName("Root")
7072
.SetUseRealThreads(false)
7173
// Volatile transactions avoid storing readsets in InReadSets table
72-
.SetEnableDataShardVolatileTransactions(false);
74+
.SetEnableDataShardVolatileTransactions(false)
75+
.SetAppConfig(app);
7376

7477
Tests::TServer::TPtr server = new TServer(serverSettings);
7578
auto &runtime = *server->GetRuntime();

0 commit comments

Comments
 (0)