Skip to content

Commit 9335b61

Browse files
authored
Fix volatile transactions getting stuck after a restart (#10679)
1 parent b91414f commit 9335b61

File tree

2 files changed

+93
-5
lines changed

2 files changed

+93
-5
lines changed

ydb/core/tx/datashard/datashard_ut_volatile.cpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <ydb/core/base/blobstorage.h>
77
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
8+
#include <ydb/core/testlib/actors/block_events.h>
89

910
namespace NKikimr {
1011

@@ -2989,6 +2990,94 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
29892990
"ERROR: ABORTED");
29902991
}
29912992

2993+
Y_UNIT_TEST(UpsertDependenciesShardsRestart) {
2994+
TPortManager pm;
2995+
TServerSettings serverSettings(pm.GetPort(2134));
2996+
serverSettings.SetDomainName("Root")
2997+
.SetUseRealThreads(false)
2998+
.SetEnableDataShardVolatileTransactions(true);
2999+
3000+
Tests::TServer::TPtr server = new TServer(serverSettings);
3001+
auto &runtime = *server->GetRuntime();
3002+
auto sender = runtime.AllocateEdgeActor();
3003+
3004+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3005+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3006+
3007+
InitRoot(server, sender);
3008+
3009+
UNIT_ASSERT_VALUES_EQUAL(
3010+
KqpSchemeExec(runtime, R"(
3011+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3012+
WITH (PARTITION_AT_KEYS = (10));
3013+
)"),
3014+
"SUCCESS");
3015+
3016+
const auto shards = GetTableShards(server, sender, "/Root/table");
3017+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
3018+
3019+
// We need to fill table with some data
3020+
Cerr << "========= Upserting initial values =========" << Endl;
3021+
KqpSimpleExec(runtime, R"(
3022+
UPSERT INTO `/Root/table` (key, subkey, value)
3023+
VALUES (1, 1), (11, 11)
3024+
)");
3025+
3026+
TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
3027+
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
3028+
[actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) {
3029+
return ev->GetRecipientRewrite() == actor;
3030+
});
3031+
3032+
Cerr << "========= Starting upsert 1 =========" << Endl;
3033+
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
3034+
UPSERT INTO `/Root/table` (key, value)
3035+
VALUES (2, 2), (12, 12);
3036+
)");
3037+
runtime.SimulateSleep(TDuration::Seconds(1));
3038+
3039+
Cerr << "========= Starting upsert 2 =========" << Endl;
3040+
auto upsertFuture2 = KqpSimpleSend(runtime, R"(
3041+
UPSERT INTO `/Root/table` (key, value)
3042+
VALUES (2, 1002), (12, 1012);
3043+
)");
3044+
runtime.SimulateSleep(TDuration::Seconds(1));
3045+
3046+
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
3047+
3048+
// We expect transaction to execute at shards[1]
3049+
// However at shards[0] it didn't even start due to blocked plans
3050+
// Now we need to restart both shards, without giving them a chance to communicate
3051+
std::vector<TActorId> shardActors{
3052+
ResolveTablet(runtime, shards.at(0)),
3053+
ResolveTablet(runtime, shards.at(1)),
3054+
};
3055+
for (auto& shardActor : shardActors) {
3056+
Cerr << "... killing actor " << shardActor << Endl;
3057+
// Perform a synchronous send, this makes sure both shards handle TEvPoison before anything else
3058+
runtime.Send(new IEventHandle(shardActor, TActorId(), new TEvents::TEvPoison), 0, /* viaActorSystem */ false);
3059+
}
3060+
3061+
blockedPlan.Stop().clear();
3062+
3063+
// Both queries should abort with UNDETERMINED
3064+
Cerr << "... waiting for query results" << Endl;
3065+
UNIT_ASSERT_VALUES_EQUAL(
3066+
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
3067+
"ERROR: UNDETERMINED");
3068+
UNIT_ASSERT_VALUES_EQUAL(
3069+
FormatResult(runtime.WaitFuture(std::move(upsertFuture2))),
3070+
"ERROR: UNDETERMINED");
3071+
3072+
// Split the second shard, which makes sure it's not stuck
3073+
Cerr << "========= Splitting shard 2 =========" << Endl;
3074+
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
3075+
ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(1), 15);
3076+
Cerr << "... split txId# " << txId << " started" << Endl;
3077+
WaitTxNotification(server, sender, txId);
3078+
Cerr << "... split finished" << Endl;
3079+
}
3080+
29923081
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
29933082

29943083
} // namespace NKikimr

ydb/core/tx/datashard/volatile_tx.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,6 @@ namespace NKikimr::NDataShard {
271271

272272
void TVolatileTxManager::Start(const TActorContext& ctx) {
273273
for (auto& pr : VolatileTxs) {
274-
if (!pr.second->Dependencies.empty()) {
275-
continue;
276-
}
277274
switch (pr.second->State) {
278275
case EVolatileTxState::Waiting:
279276
for (ui64 target : pr.second->Participants) {
@@ -875,7 +872,7 @@ namespace NKikimr::NDataShard {
875872
if (info->AddCommitted) {
876873
RunCommitCallbacks(info);
877874
}
878-
if (info->Dependencies.empty() && ReadyToDbCommit(info)) {
875+
if (ReadyToDbCommit(info)) {
879876
AddPendingCommit(txId);
880877
}
881878
}
@@ -926,7 +923,9 @@ namespace NKikimr::NDataShard {
926923
case EVolatileTxState::Waiting:
927924
break;
928925
case EVolatileTxState::Committed:
929-
AddPendingCommit(dependentTxId);
926+
if (ReadyToDbCommit(dependent)) {
927+
AddPendingCommit(dependentTxId);
928+
}
930929
break;
931930
case EVolatileTxState::Aborting:
932931
Y_ABORT("FIXME: unexpected dependency removed from aborting tx");

0 commit comments

Comments
 (0)