Skip to content

Commit 5c47c34

Browse files
committed
Fix volatile transactions getting stuck after a restart
1 parent 2414fb7 commit 5c47c34

File tree

2 files changed

+89
-3
lines changed

2 files changed

+89
-3
lines changed

ydb/core/tx/datashard/datashard_ut_volatile.cpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <ydb/core/base/blobstorage.h>
77
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
8+
#include <ydb/core/testlib/actors/block_events.h>
89

910
namespace NKikimr {
1011

@@ -2989,6 +2990,94 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
29892990
"ERROR: ABORTED");
29902991
}
29912992

2993+
Y_UNIT_TEST(UpsertDependenciesShardsRestart) {
2994+
TPortManager pm;
2995+
TServerSettings serverSettings(pm.GetPort(2134));
2996+
serverSettings.SetDomainName("Root")
2997+
.SetUseRealThreads(false)
2998+
.SetEnableDataShardVolatileTransactions(true);
2999+
3000+
Tests::TServer::TPtr server = new TServer(serverSettings);
3001+
auto &runtime = *server->GetRuntime();
3002+
auto sender = runtime.AllocateEdgeActor();
3003+
3004+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3005+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3006+
3007+
InitRoot(server, sender);
3008+
3009+
UNIT_ASSERT_VALUES_EQUAL(
3010+
KqpSchemeExec(runtime, R"(
3011+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3012+
WITH (PARTITION_AT_KEYS = (10));
3013+
)"),
3014+
"SUCCESS");
3015+
3016+
const auto shards = GetTableShards(server, sender, "/Root/table");
3017+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
3018+
3019+
// We need to fill table with some data
3020+
Cerr << "========= Upserting initial values =========" << Endl;
3021+
KqpSimpleExec(runtime, R"(
3022+
UPSERT INTO `/Root/table` (key, subkey, value)
3023+
VALUES (1, 1), (11, 11)
3024+
)");
3025+
3026+
TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
3027+
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
3028+
[actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) {
3029+
return ev->GetRecipientRewrite() == actor;
3030+
});
3031+
3032+
Cerr << "========= Starting upsert 1 =========" << Endl;
3033+
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
3034+
UPSERT INTO `/Root/table` (key, value)
3035+
VALUES (2, 2), (12, 12);
3036+
)");
3037+
runtime.SimulateSleep(TDuration::Seconds(1));
3038+
3039+
Cerr << "========= Starting upsert 2 =========" << Endl;
3040+
auto upsertFuture2 = KqpSimpleSend(runtime, R"(
3041+
UPSERT INTO `/Root/table` (key, value)
3042+
VALUES (2, 1002), (12, 1012);
3043+
)");
3044+
runtime.SimulateSleep(TDuration::Seconds(1));
3045+
3046+
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
3047+
3048+
// We expect transaction to execute at shards[1]
3049+
// However at shards[0] it didn't even start due to blocked plans
3050+
// Now we need to restart both shards, without giving them a change to communicate
3051+
std::vector<TActorId> shardActors{
3052+
ResolveTablet(runtime, shards.at(0)),
3053+
ResolveTablet(runtime, shards.at(1)),
3054+
};
3055+
for (auto& shardActor : shardActors) {
3056+
Cerr << "... killing actor " << shardActor << Endl;
3057+
// Perform a synchronous send, this makes sure both shards handle TEvPoison
3058+
runtime.Send(new IEventHandle(shardActor, TActorId(), new TEvents::TEvPoison), 0, /* viaActorSystem */ false);
3059+
}
3060+
3061+
blockedPlan.Stop().clear();
3062+
3063+
// Both queries should abort with UNDETERMINED
3064+
Cerr << "... waiting for query results" << Endl;
3065+
UNIT_ASSERT_VALUES_EQUAL(
3066+
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
3067+
"ERROR: UNDETERMINED");
3068+
UNIT_ASSERT_VALUES_EQUAL(
3069+
FormatResult(runtime.WaitFuture(std::move(upsertFuture2))),
3070+
"ERROR: UNDETERMINED");
3071+
3072+
// Split the second shard, which makes sure it's not stuck
3073+
Cerr << "========= Splitting shard 2 =========" << Endl;
3074+
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
3075+
ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(1), 15);
3076+
Cerr << "... split txId# " << txId << " started" << Endl;
3077+
WaitTxNotification(server, sender, txId);
3078+
Cerr << "... split finished" << Endl;
3079+
}
3080+
29923081
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
29933082

29943083
} // namespace NKikimr

ydb/core/tx/datashard/volatile_tx.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,6 @@ namespace NKikimr::NDataShard {
271271

272272
void TVolatileTxManager::Start(const TActorContext& ctx) {
273273
for (auto& pr : VolatileTxs) {
274-
if (!pr.second->Dependencies.empty()) {
275-
continue;
276-
}
277274
switch (pr.second->State) {
278275
case EVolatileTxState::Waiting:
279276
for (ui64 target : pr.second->Participants) {

0 commit comments

Comments
 (0)