diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 1ca101657cb6..a3986c61f20f 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -5,6 +5,7 @@ #include #include +#include namespace NKikimr { @@ -2988,6 +2989,94 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "ERROR: ABORTED"); } + Y_UNIT_TEST(UpsertDependenciesShardsRestart) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE); + + InitRoot(server, sender); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key)) + WITH (PARTITION_AT_KEYS = (10)); + )"), + "SUCCESS"); + + const auto shards = GetTableShards(server, sender, "/Root/table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + // We need to fill table with some data + Cerr << "========= Upserting initial values =========" << Endl; + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/table` (key, subkey, value) + VALUES (1, 1), (11, 11) + )"); + + TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0)); + TBlockEvents blockedPlan(runtime, + [actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) { + return ev->GetRecipientRewrite() == actor; + }); + + Cerr << "========= Starting upsert 1 =========" << Endl; + auto upsertFuture1 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (2, 2), (12, 12); + )"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + Cerr << "========= Starting upsert 2 =========" << Endl; + auto upsertFuture2 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (2, 1002), (12, 1012); + )"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u); + + // We expect transaction to execute at shards[1] + // However at shards[0] it didn't even start due to blocked plans + // Now we need to restart both shards, without giving them a chance to communicate + std::vector shardActors{ + ResolveTablet(runtime, shards.at(0)), + ResolveTablet(runtime, shards.at(1)), + }; + for (auto& shardActor : shardActors) { + Cerr << "... killing actor " << shardActor << Endl; + // Perform a synchronous send, this makes sure both shards handle TEvPoison before anything else + runtime.Send(new IEventHandle(shardActor, TActorId(), new TEvents::TEvPoison), 0, /* viaActorSystem */ false); + } + + blockedPlan.Stop().clear(); + + // Both queries should abort with UNDETERMINED + Cerr << "... waiting for query results" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(runtime.WaitFuture(std::move(upsertFuture1))), + "ERROR: UNDETERMINED"); + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(runtime.WaitFuture(std::move(upsertFuture2))), + "ERROR: UNDETERMINED"); + + // Split the second shard, which makes sure it's not stuck + Cerr << "========= Splitting shard 2 =========" << Endl; + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(1), 15); + Cerr << "... split txId# " << txId << " started" << Endl; + WaitTxNotification(server, sender, txId); + Cerr << "... split finished" << Endl; + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 1a72b7c10783..38264b973bcc 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -271,9 +271,6 @@ namespace NKikimr::NDataShard { void TVolatileTxManager::Start(const TActorContext& ctx) { for (auto& pr : VolatileTxs) { - if (!pr.second->Dependencies.empty()) { - continue; - } switch (pr.second->State) { case EVolatileTxState::Waiting: for (ui64 target : pr.second->Participants) { @@ -875,7 +872,7 @@ namespace NKikimr::NDataShard { if (info->AddCommitted) { RunCommitCallbacks(info); } - if (info->Dependencies.empty() && ReadyToDbCommit(info)) { + if (ReadyToDbCommit(info)) { AddPendingCommit(txId); } } @@ -926,7 +923,9 @@ namespace NKikimr::NDataShard { case EVolatileTxState::Waiting: break; case EVolatileTxState::Committed: - AddPendingCommit(dependentTxId); + if (ReadyToDbCommit(dependent)) { + AddPendingCommit(dependentTxId); + } break; case EVolatileTxState::Aborting: Y_ABORT("FIXME: unexpected dependency removed from aborting tx");