Skip to content

Commit 7ff1c86

Browse files
authored
24-3: Fix volatile transactions getting stuck after a restart (#10698)
1 parent 7c22bbb commit 7ff1c86

File tree

2 files changed

+93
-5
lines changed

2 files changed

+93
-5
lines changed

ydb/core/tx/datashard/datashard_ut_volatile.cpp

+89
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <ydb/core/base/blobstorage.h>
77
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
8+
#include <ydb/core/testlib/actors/block_events.h>
89

910
namespace NKikimr {
1011

@@ -2988,6 +2989,94 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
29882989
"ERROR: ABORTED");
29892990
}
29902991

2992+
Y_UNIT_TEST(UpsertDependenciesShardsRestart) {
2993+
TPortManager pm;
2994+
TServerSettings serverSettings(pm.GetPort(2134));
2995+
serverSettings.SetDomainName("Root")
2996+
.SetUseRealThreads(false)
2997+
.SetEnableDataShardVolatileTransactions(true);
2998+
2999+
Tests::TServer::TPtr server = new TServer(serverSettings);
3000+
auto &runtime = *server->GetRuntime();
3001+
auto sender = runtime.AllocateEdgeActor();
3002+
3003+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3004+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3005+
3006+
InitRoot(server, sender);
3007+
3008+
UNIT_ASSERT_VALUES_EQUAL(
3009+
KqpSchemeExec(runtime, R"(
3010+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3011+
WITH (PARTITION_AT_KEYS = (10));
3012+
)"),
3013+
"SUCCESS");
3014+
3015+
const auto shards = GetTableShards(server, sender, "/Root/table");
3016+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
3017+
3018+
// We need to fill table with some data
3019+
Cerr << "========= Upserting initial values =========" << Endl;
3020+
KqpSimpleExec(runtime, R"(
3021+
UPSERT INTO `/Root/table` (key, subkey, value)
3022+
VALUES (1, 1), (11, 11)
3023+
)");
3024+
3025+
TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
3026+
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
3027+
[actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) {
3028+
return ev->GetRecipientRewrite() == actor;
3029+
});
3030+
3031+
Cerr << "========= Starting upsert 1 =========" << Endl;
3032+
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
3033+
UPSERT INTO `/Root/table` (key, value)
3034+
VALUES (2, 2), (12, 12);
3035+
)");
3036+
runtime.SimulateSleep(TDuration::Seconds(1));
3037+
3038+
Cerr << "========= Starting upsert 2 =========" << Endl;
3039+
auto upsertFuture2 = KqpSimpleSend(runtime, R"(
3040+
UPSERT INTO `/Root/table` (key, value)
3041+
VALUES (2, 1002), (12, 1012);
3042+
)");
3043+
runtime.SimulateSleep(TDuration::Seconds(1));
3044+
3045+
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
3046+
3047+
// We expect transaction to execute at shards[1]
3048+
// However at shards[0] it didn't even start due to blocked plans
3049+
// Now we need to restart both shards, without giving them a chance to communicate
3050+
std::vector<TActorId> shardActors{
3051+
ResolveTablet(runtime, shards.at(0)),
3052+
ResolveTablet(runtime, shards.at(1)),
3053+
};
3054+
for (auto& shardActor : shardActors) {
3055+
Cerr << "... killing actor " << shardActor << Endl;
3056+
// Perform a synchronous send, this makes sure both shards handle TEvPoison before anything else
3057+
runtime.Send(new IEventHandle(shardActor, TActorId(), new TEvents::TEvPoison), 0, /* viaActorSystem */ false);
3058+
}
3059+
3060+
blockedPlan.Stop().clear();
3061+
3062+
// Both queries should abort with UNDETERMINED
3063+
Cerr << "... waiting for query results" << Endl;
3064+
UNIT_ASSERT_VALUES_EQUAL(
3065+
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
3066+
"ERROR: UNDETERMINED");
3067+
UNIT_ASSERT_VALUES_EQUAL(
3068+
FormatResult(runtime.WaitFuture(std::move(upsertFuture2))),
3069+
"ERROR: UNDETERMINED");
3070+
3071+
// Split the second shard, which makes sure it's not stuck
3072+
Cerr << "========= Splitting shard 2 =========" << Endl;
3073+
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
3074+
ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(1), 15);
3075+
Cerr << "... split txId# " << txId << " started" << Endl;
3076+
WaitTxNotification(server, sender, txId);
3077+
Cerr << "... split finished" << Endl;
3078+
}
3079+
29913080
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
29923081

29933082
} // namespace NKikimr

ydb/core/tx/datashard/volatile_tx.cpp

+4-5
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,6 @@ namespace NKikimr::NDataShard {
271271

272272
void TVolatileTxManager::Start(const TActorContext& ctx) {
273273
for (auto& pr : VolatileTxs) {
274-
if (!pr.second->Dependencies.empty()) {
275-
continue;
276-
}
277274
switch (pr.second->State) {
278275
case EVolatileTxState::Waiting:
279276
for (ui64 target : pr.second->Participants) {
@@ -875,7 +872,7 @@ namespace NKikimr::NDataShard {
875872
if (info->AddCommitted) {
876873
RunCommitCallbacks(info);
877874
}
878-
if (info->Dependencies.empty() && ReadyToDbCommit(info)) {
875+
if (ReadyToDbCommit(info)) {
879876
AddPendingCommit(txId);
880877
}
881878
}
@@ -926,7 +923,9 @@ namespace NKikimr::NDataShard {
926923
case EVolatileTxState::Waiting:
927924
break;
928925
case EVolatileTxState::Committed:
929-
AddPendingCommit(dependentTxId);
926+
if (ReadyToDbCommit(dependent)) {
927+
AddPendingCommit(dependentTxId);
928+
}
930929
break;
931930
case EVolatileTxState::Aborting:
932931
Y_ABORT("FIXME: unexpected dependency removed from aborting tx");

0 commit comments

Comments
 (0)