Skip to content

Commit 0c2f282

Browse files
authored
Fix readset acks sent too early in volatile transactions (#1961)
1 parent c076edf commit 0c2f282

File tree

2 files changed

+110
-2
lines changed

2 files changed

+110
-2
lines changed

ydb/core/tx/datashard/datashard_ut_volatile.cpp

+108
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "datashard_ut_common_pq.h"
44
#include "datashard_active_transaction.h"
55

6+
#include <ydb/core/base/blobstorage.h>
67
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
78

89
namespace NKikimr {
@@ -2106,6 +2107,113 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
21062107
"{ items { uint32_value: 6 } items { uint32_value: 6 } }");
21072108
}
21082109

2110+
// Regression test for KIKIMR-21060
2111+
Y_UNIT_TEST(DistributedWriteRSNotAckedBeforeCommit) {
2112+
TPortManager pm;
2113+
TServerSettings serverSettings(pm.GetPort(2134));
2114+
serverSettings.SetDomainName("Root")
2115+
.SetUseRealThreads(false)
2116+
.SetDomainPlanResolution(1000)
2117+
.SetEnableDataShardVolatileTransactions(true);
2118+
2119+
Tests::TServer::TPtr server = new TServer(serverSettings);
2120+
auto &runtime = *server->GetRuntime();
2121+
auto sender = runtime.AllocateEdgeActor();
2122+
2123+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
2124+
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
2125+
2126+
InitRoot(server, sender);
2127+
2128+
CreateShardedTable(server, sender, "/Root", "table-1", 1);
2129+
CreateShardedTable(server, sender, "/Root", "table-2", 1);
2130+
2131+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);");
2132+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);");
2133+
2134+
// Block readset exchange
2135+
std::vector<std::unique_ptr<IEventHandle>> readSets;
2136+
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
2137+
Cerr << "... blocking readset" << Endl;
2138+
readSets.emplace_back(ev.Release());
2139+
});
2140+
2141+
// Start a distributed write to both tables
2142+
TString sessionId = CreateSessionRPC(runtime, "/Root");
2143+
auto upsertResult = SendRequest(
2144+
runtime,
2145+
MakeSimpleRequestRPC(R"(
2146+
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
2147+
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40);
2148+
)", sessionId, /* txId */ "", /* commitTx */ true),
2149+
"/Root");
2150+
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");
2151+
2152+
// Stop blocking further readsets
2153+
blockReadSets.Remove();
2154+
2155+
// Sleep a little to make sure everything so far is fully committed
2156+
runtime.SimulateSleep(TDuration::Seconds(1));
2157+
2158+
// Start blocking commits for table-1
2159+
const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
2160+
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
2161+
std::vector<std::unique_ptr<IEventHandle>> putResponses;
2162+
auto blockCommits = runtime.AddObserver<TEvBlobStorage::TEvPut>([&](TEvBlobStorage::TEvPut::TPtr& ev) {
2163+
auto* msg = ev->Get();
2164+
// Drop all put requests for table-1
2165+
if (msg->Id.TabletID() == shards1.at(0)) {
2166+
// We can't just drop requests, we must reply to it later
2167+
putResponses.emplace_back(new IEventHandle(
2168+
ev->Sender,
2169+
ev->GetRecipientRewrite(),
2170+
msg->MakeErrorResponse(NKikimrProto::BLOCKED, "Fake blocked response", 0).release(),
2171+
0,
2172+
ev->Cookie));
2173+
Cerr << "... dropping put " << msg->Id << Endl;
2174+
ev.Reset();
2175+
}
2176+
});
2177+
2178+
// Unblock readsets
2179+
for (auto& ev : readSets) {
2180+
runtime.Send(ev.release(), 0, true);
2181+
}
2182+
readSets.clear();
2183+
2184+
// Sleep to make sure those readsets are fully processed
2185+
// Bug was acknowledging readsets before tx state is fully persisted
2186+
runtime.SimulateSleep(TDuration::Seconds(1));
2187+
2188+
// Transaction will return success even when commits are blocked at this point
2189+
Cerr << "... awaiting upsert result" << Endl;
2190+
UNIT_ASSERT_VALUES_EQUAL(
2191+
FormatResult(AwaitResponse(runtime, std::move(upsertResult))),
2192+
"<empty>");
2193+
2194+
// Now we stop blocking commits and gracefully restart the tablet, all pending commits will be lost
2195+
blockCommits.Remove();
2196+
for (auto& ev : putResponses) {
2197+
runtime.Send(ev.release(), 0, true);
2198+
}
2199+
Cerr << "... restarting tablet " << shards1.at(0) << Endl;
2200+
GracefulRestartTablet(runtime, shards1.at(0), sender);
2201+
2202+
// We must see all rows as committed, i.e. nothing should be lost
2203+
Cerr << "... reading final result" << Endl;
2204+
UNIT_ASSERT_VALUES_EQUAL(
2205+
KqpSimpleExec(runtime, R"(
2206+
SELECT key, value FROM `/Root/table-1`
2207+
UNION ALL
2208+
SELECT key, value FROM `/Root/table-2`
2209+
ORDER BY key
2210+
)"),
2211+
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
2212+
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
2213+
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
2214+
"{ items { uint32_value: 4 } items { uint32_value: 40 } }");
2215+
}
2216+
21092217
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
21102218

21112219
} // namespace NKikimr

ydb/core/tx/datashard/volatile_tx.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,7 @@ namespace NKikimr::NDataShard {
824824
}
825825
info->DelayedConfirmations.clear();
826826

827-
// Send delayed acks on commit
827+
// Send delayed acks when changes are persisted
828828
// TODO: maybe move it into a parameter?
829829
struct TDelayedAcksState : public TThrRefBase {
830830
TVector<THolder<IEventHandle>> DelayedAcks;
@@ -833,7 +833,7 @@ namespace NKikimr::NDataShard {
833833
: DelayedAcks(std::move(info->DelayedAcks))
834834
{}
835835
};
836-
txc.DB.OnCommit([state = MakeIntrusive<TDelayedAcksState>(info)]() {
836+
txc.DB.OnPersistent([state = MakeIntrusive<TDelayedAcksState>(info)]() {
837837
for (auto& ev : state->DelayedAcks) {
838838
TActivationContext::Send(ev.Release());
839839
}

0 commit comments

Comments
 (0)