Skip to content

Fix datashard sink tests #15550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/kqp/common/kqp_tx_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class TKqpTransactionManager : public IKqpTransactionManager {
auto& shardInfo = ShardsInfo.at(shardId);
if (auto lockPtr = shardInfo.Locks.FindPtr(lock.GetKey()); lockPtr) {
if (lock.Proto.GetHasWrites()) {
AFL_ENSURE(!ReadOnly);
lockPtr->Lock.Proto.SetHasWrites(true);
}

Expand Down
9 changes: 2 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void HandleFinalize(TEvents::TEvUndelivered::TPtr&) {
if (Request.LocksOp == ELocksOp::Commit && !ReadOnlyTx) {
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, "Buffer actor isn't available. Operation state unknown.");
ReplyErrorAndDie(Ydb::StatusIds::UNDETERMINED, issue);
} else {
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, "Buffer actor isn't available.");
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
}
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, "Buffer actor isn't available.");
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
}

void MakeResponseAndPassAway() {
Expand Down
35 changes: 22 additions & 13 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -778,19 +778,20 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
return builder;
}());

for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
Y_ABORT_UNLESS(Mode == EMode::WRITE);
if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) {
YQL_ENSURE(TxManager->BrokenLocks());
NYql::TIssues issues;
issues.AddIssue(*TxManager->GetLockIssue());
RuntimeError(
NYql::NDqProto::StatusIds::ABORTED,
NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED,
TStringBuilder() << "Transaction locks invalidated. Table `"
<< TablePath << "`.",
issues);
return;
if (Mode == EMode::WRITE) {
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) {
YQL_ENSURE(TxManager->BrokenLocks());
NYql::TIssues issues;
issues.AddIssue(*TxManager->GetLockIssue());
RuntimeError(
NYql::NDqProto::StatusIds::ABORTED,
NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED,
TStringBuilder() << "Transaction locks invalidated. Table `"
<< TablePath << "`.",
issues);
return;
}
}
}

Expand Down Expand Up @@ -1145,6 +1146,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
Callbacks->OnError(statusCode, std::move(issues));
}

void Unlink() {
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
}

void PassAway() override {;
Counters->WriteActorsCount->Dec();
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
Expand Down Expand Up @@ -2644,6 +2649,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
});
ExecuterActorId = {};
Y_ABORT_UNLESS(GetTotalMemory() == 0);

for (auto& [_, info] : WriteInfos) {
info.WriteTableActor->Unlink();
}
}

void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, NYql::EYqlIssueCode id, const TString& message, const NYql::TIssues& subIssues) override {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << " send to: " << ExecuterId << " from: " << ev->Sender;

if (!QueryState || !QueryState->TxCtx || QueryState->TxCtx->BufferActorId != ev->Sender) {
LOG_E(logMsg << ": Old error.");
LOG_E(logMsg << ": Ignored error.");
return;
} else {
LOG_W(logMsg);
Expand Down
37 changes: 28 additions & 9 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2020,11 +2020,14 @@ Y_UNIT_TEST(TestPlannedTimeoutSplit) {
}
}

Y_UNIT_TEST(TestPlannedHalfOverloadedSplit) {
Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseSink) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
.SetUseRealThreads(false)
.SetAppConfig(app);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
Expand Down Expand Up @@ -2055,7 +2058,8 @@ Y_UNIT_TEST(TestPlannedHalfOverloadedSplit) {
TVector<THolder<IEventHandle>> txProposeResults;
auto captureMessages = [&](TAutoPtr<IEventHandle> &event) -> auto {
switch (event->GetTypeRewrite()) {
case TEvDataShard::EvProposeTransaction: {
case TEvDataShard::EvProposeTransaction:
case NKikimr::NEvents::TDataEvents::EvWrite: {
Cerr << "---- observed EvProposeTransactionResult ----" << Endl;
if (txProposes.size() == 0) {
// Capture the first propose
Expand All @@ -2064,7 +2068,8 @@ Y_UNIT_TEST(TestPlannedHalfOverloadedSplit) {
}
break;
}
case TEvDataShard::EvProposeTransactionResult: {
case TEvDataShard::EvProposeTransactionResult:
case NKikimr::NEvents::TDataEvents::EvWriteResult: {
Cerr << "---- observed EvProposeTransactionResult ----" << Endl;
if (txProposes.size() > 0) {
// Capture all propose results
Expand Down Expand Up @@ -2452,11 +2457,14 @@ Y_UNIT_TEST(TestReadTableSingleShardImmediate) {
UNIT_ASSERT_VALUES_EQUAL(seenPlanSteps, 0u);
}

Y_UNIT_TEST(TestImmediateQueueThenSplit) {
Y_UNIT_TEST_TWIN(TestImmediateQueueThenSplit, UseSink) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
.SetUseRealThreads(false)
.SetAppConfig(app);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
Expand Down Expand Up @@ -2499,6 +2507,7 @@ Y_UNIT_TEST(TestImmediateQueueThenSplit) {
}
break;
case TEvDataShard::EvProposeTransaction:
case NKikimr::NEvents::TDataEvents::EvWrite:
if (capturePropose) {
Cerr << "---- capture EvProposeTransaction ----" << Endl;
eventsPropose.emplace_back(event.Release());
Expand Down Expand Up @@ -2620,10 +2629,11 @@ Y_UNIT_TEST(TestImmediateQueueThenSplit) {
<< failures << " failures");
}

void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query) {
void TestLateKqpQueryAfterColumnDrop(bool dataQuery, bool useSink, const TString& query) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(false);
app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
Expand Down Expand Up @@ -2671,6 +2681,15 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query) {
break;
}

case NKikimr::NEvents::TDataEvents::EvWrite: {
if (capturePropose) {
Cerr << "---- capture EvWrite ----" << Endl;
eventsPropose.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
break;
}

case TEvDataShard::EvKqpScan: {
if (capturePropose) {
Cerr << "---- capture EvKqpScan ----" << Endl;
Expand Down Expand Up @@ -2735,8 +2754,8 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query) {
}
}

Y_UNIT_TEST(TestLateKqpScanAfterColumnDrop) {
TestLateKqpQueryAfterColumnDrop(false, "SELECT SUM(value2) FROM `/Root/table-1`");
Y_UNIT_TEST_TWIN(TestLateKqpScanAfterColumnDrop, UseSink) {
TestLateKqpQueryAfterColumnDrop(false, UseSink, "SELECT SUM(value2) FROM `/Root/table-1`");
}

Y_UNIT_TEST(TestSecondaryClearanceAfterShardRestartRace) {
Expand Down
21 changes: 15 additions & 6 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2570,16 +2570,19 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
}

Y_UNIT_TEST(ShouldReadFromHeadWithConflict) {
Y_UNIT_TEST_TWIN(ShouldReadFromHeadWithConflict, UseSink) {
// Similar to ShouldReadFromHead, but there is conflicting hanged operation.
// We will read all at once thus should not block

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
// Blocked volatile transactions block reads, disable
.SetEnableDataShardVolatileTransactions(false);
.SetEnableDataShardVolatileTransactions(false)
.SetAppConfig(app);

const ui64 shardCount = 1;
TTestHelper helper(serverSettings, shardCount);
Expand Down Expand Up @@ -2624,16 +2627,19 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
}
}

Y_UNIT_TEST(ShouldReadFromHeadToMvccWithConflict) {
Y_UNIT_TEST_TWIN(ShouldReadFromHeadToMvccWithConflict, UseSink) {
// Similar to ShouldProperlyOrderConflictingTransactionsMvcc, but we read HEAD
//
// In this test HEAD read waits conflicting transaction: first time we read from HEAD and
// notice that result it not full. Then restart after conflicting operation finishes

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
.SetUseRealThreads(false)
.SetAppConfig(app);

const ui64 shardCount = 1;
TTestHelper helper(serverSettings, shardCount);
Expand Down Expand Up @@ -2714,7 +2720,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
}
}

Y_UNIT_TEST(ShouldProperlyOrderConflictingTransactionsMvcc) {
Y_UNIT_TEST_TWIN(ShouldProperlyOrderConflictingTransactionsMvcc, UseSink) {
// 1. Start read-write multishard transaction: readset will be blocked
// to hang transaction. Write is the key we want to read.
// 2a. Check that we can read prior blocked step.
Expand All @@ -2727,8 +2733,11 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
.SetUseRealThreads(false)
.SetAppConfig(app);

const ui64 shardCount = 1;
TTestHelper helper(serverSettings, shardCount);
Expand Down
16 changes: 12 additions & 4 deletions ydb/core/tx/datashard/datashard_ut_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,14 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
DoSplitMergeChanges(true);
}

Y_UNIT_TEST(ReplicatedTable) {
Y_UNIT_TEST_TWIN(ReplicatedTable, UseSink) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
.SetUseRealThreads(false)
.SetAppConfig(app);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
Expand All @@ -244,8 +247,13 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions().Replicated(true));

ExecSQL(server, sender, "SELECT * FROM `/Root/table-1`");
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true,
Ydb::StatusIds::GENERIC_ERROR);
if (UseSink) {
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true,
Ydb::StatusIds::BAD_REQUEST);
} else {
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true,
Ydb::StatusIds::GENERIC_ERROR);
}

WaitTxNotification(server, sender, AsyncAlterDropReplicationConfig(server, "/Root", "table-1"));
ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);");
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/datashard/datashard_ut_rs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ struct IsReadSet {
};

Y_UNIT_TEST_SUITE(TDataShardRSTest) {
Y_UNIT_TEST(TestCleanupInRS) {
Y_UNIT_TEST_TWIN(TestCleanupInRS, UseSink) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
// Volatile transactions avoid storing readsets in InReadSets table
.SetEnableDataShardVolatileTransactions(false);
.SetEnableDataShardVolatileTransactions(false)
.SetAppConfig(app);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
Expand Down
Loading
Loading