Skip to content

Commit 7d3baf9

Browse files
authored
Fix race between table merge and borrowed gc compaction. Fixes #3154. (#3164)
1 parent bfd2866 commit 7d3baf9

File tree

3 files changed

+200
-33
lines changed

3 files changed

+200
-33
lines changed

ydb/core/tx/datashard/datashard_split_dst.cpp

+48-33
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,11 @@ class TDataShard::TTxInitSplitMergeDestination : public NTabletFlatExecutor::TTr
120120
class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
121121
private:
122122
TEvDataShard::TEvSplitTransferSnapshot::TPtr Ev;
123-
bool LastSnapshotReceived;
124123

125124
public:
126125
TTxSplitTransferSnapshot(TDataShard* ds, TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev)
127126
: NTabletFlatExecutor::TTransactionBase<TDataShard>(ds)
128127
, Ev(ev)
129-
, LastSnapshotReceived(false)
130128
{}
131129

132130
TTxType GetTxType() const override { return TXTYPE_SPLIT_TRANSFER_SNAPSHOT; }
@@ -257,8 +255,6 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
257255
}
258256

259257
if (Self->ReceiveSnapshotsFrom.empty()) {
260-
LastSnapshotReceived = true;
261-
262258
const auto minVersion = mvcc ? Self->GetSnapshotManager().GetLowWatermark()
263259
: Self->GetSnapshotManager().GetMinWriteVersion();
264260

@@ -295,6 +291,12 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
295291
// Note: we persist Ready, but keep current state in memory until Complete
296292
Self->SetPersistState(TShardState::Ready, txc);
297293
Self->State = TShardState::SplitDstReceivingSnapshot;
294+
295+
// Schedule a new transaction that will move shard to the Ready state
296+
// and finish initialization. This new transaction is guaranteed to
297+
// wait until async LoanTable above is complete and new parts are
298+
// fully merged into the table.
299+
Self->Execute(new TTxLastSnapshotReceived(Self));
298300
}
299301

300302
return true;
@@ -307,39 +309,52 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
307309
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack snapshot OpId " << opId);
308310

309311
ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));
312+
}
310313

311-
// Note: we skip init in an unlikely event of state resetting between Execute and Complete
312-
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
313-
// We have received all the data, finish shard initialization
314-
// Note: previously we used TxInit, however received system tables
315-
// have been empty for years now, and since pipes are still open we
316-
// may receive requests between TxInit loading the Ready state and
317-
// its Complete method initializing everything properly. Instead
318-
// necessary steps are repeated here.
319-
Self->State = TShardState::Ready;
320-
321-
// We are already in StateWork, but we need to repeat many steps now that we are Ready
322-
Self->SwitchToWork(ctx);
323-
324-
// We can send the registration request now that we are ready
325-
Self->SendRegistrationRequestTimeCast(ctx);
326-
327-
// Initialize snapshot expiration queue with current context time
328-
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
329-
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
330-
Self->PlanCleanup(ctx);
331-
}
314+
class TTxLastSnapshotReceived : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
315+
public:
316+
TTxLastSnapshotReceived(TDataShard* self)
317+
: TTransactionBase(self)
318+
{}
332319

333-
// Initialize change senders
334-
Self->KillChangeSender(ctx);
335-
Self->CreateChangeSender(ctx);
336-
Self->MaybeActivateChangeSender(ctx);
337-
Self->EmitHeartbeats();
320+
bool Execute(TTransactionContext&, const TActorContext&) override {
321+
return true;
322+
}
338323

339-
// Switch mvcc state if needed
340-
Self->CheckMvccStateChangeCanStart(ctx);
324+
void Complete(const TActorContext& ctx) override {
325+
// Note: we skip init in an unlikely event of state resetting before reaching Complete
326+
if (Self->State == TShardState::SplitDstReceivingSnapshot) {
327+
// We have received all the data, finish shard initialization
328+
// Note: previously we used TxInit, however received system tables
329+
// have been empty for years now, and since pipes are still open we
330+
// may receive requests between TxInit loading the Ready state and
331+
// its Complete method initializing everything properly. Instead
332+
// necessary steps are repeated here.
333+
Self->State = TShardState::Ready;
334+
335+
// We are already in StateWork, but we need to repeat many steps now that we are Ready
336+
Self->SwitchToWork(ctx);
337+
338+
// We can send the registration request now that we are ready
339+
Self->SendRegistrationRequestTimeCast(ctx);
340+
341+
// Initialize snapshot expiration queue with current context time
342+
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
343+
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
344+
Self->PlanCleanup(ctx);
345+
}
346+
347+
// Initialize change senders
348+
Self->KillChangeSender(ctx);
349+
Self->CreateChangeSender(ctx);
350+
Self->MaybeActivateChangeSender(ctx);
351+
Self->EmitHeartbeats();
352+
353+
// Switch mvcc state if needed
354+
Self->CheckMvccStateChangeCanStart(ctx);
355+
}
341356
}
342-
}
357+
};
343358
};
344359

345360
class TDataShard::TTxSplitReplicationSourceOffsets : public NTabletFlatExecutor::TTransactionBase<TDataShard> {

ydb/core/tx/datashard/datashard_ut_common_kqp.h

+28
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ namespace NKqpHelpers {
1111
using TEvExecuteDataQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
1212
Ydb::Table::ExecuteDataQueryResponse>;
1313

14+
using TEvExecuteSchemeQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest,
15+
Ydb::Table::ExecuteSchemeQueryResponse>;
16+
1417
using TEvCreateSessionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest,
1518
Ydb::Table::CreateSessionResponse>;
1619

@@ -224,6 +227,31 @@ namespace NKqpHelpers {
224227
return FormatResult(result);
225228
}
226229

230+
inline Ydb::Table::ExecuteSchemeQueryRequest MakeSchemeRequestRPC(
231+
const TString& sql, const TString& sessionId)
232+
{
233+
Ydb::Table::ExecuteSchemeQueryRequest request;
234+
request.set_session_id(sessionId);
235+
request.set_yql_text(sql);
236+
return request;
237+
}
238+
239+
inline NThreading::TFuture<Ydb::Table::ExecuteSchemeQueryResponse> SendRequest(
240+
TTestActorRuntime& runtime, Ydb::Table::ExecuteSchemeQueryRequest&& request, const TString& database = {})
241+
{
242+
return NRpcService::DoLocalRpc<TEvExecuteSchemeQueryRequest>(
243+
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
244+
}
245+
246+
inline TString KqpSchemeExec(TTestActorRuntime& runtime, const TString& query) {
247+
TString sessionId = CreateSessionRPC(runtime);
248+
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSchemeRequestRPC(query, sessionId)));
249+
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
250+
return TStringBuilder() << "ERROR: " << response.operation().status();
251+
}
252+
return "SUCCESS";
253+
}
254+
227255
} // namespace NKqpHelpers
228256
} // namespace NDataShard
229257
} // namespace NKikimr

ydb/core/tx/datashard/datashard_ut_snapshot.cpp

+124
Original file line numberDiff line numberDiff line change
@@ -4540,6 +4540,130 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
45404540
}
45414541
}
45424542

4543+
void CompactBorrowed(TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) {
4544+
auto msg = MakeHolder<TEvDataShard::TEvCompactBorrowed>(tableId.PathId);
4545+
auto sender = runtime.AllocateEdgeActor();
4546+
runtime.SendToPipe(shardId, sender, msg.Release(), 0, GetPipeConfigWithRetries());
4547+
runtime.GrabEdgeEventRethrow<TEvDataShard::TEvCompactBorrowedResult>(sender);
4548+
}
4549+
4550+
Y_UNIT_TEST(PostMergeNotCompactedTooEarly) {
4551+
TPortManager pm;
4552+
TServerSettings serverSettings(pm.GetPort(2134));
4553+
serverSettings.SetDomainName("Root")
4554+
.SetUseRealThreads(false)
4555+
.SetDomainPlanResolution(100);
4556+
4557+
Tests::TServer::TPtr server = new TServer(serverSettings);
4558+
auto &runtime = *server->GetRuntime();
4559+
auto sender = runtime.AllocateEdgeActor();
4560+
4561+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4562+
4563+
InitRoot(server, sender);
4564+
4565+
TDisableDataShardLogBatching disableDataShardLogBatching;
4566+
4567+
KqpSchemeExec(runtime, R"(
4568+
CREATE TABLE `/Root/table` (key int, value bytes, PRIMARY KEY (key))
4569+
WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
4570+
PARTITION_AT_KEYS = (5));
4571+
)");
4572+
4573+
const auto shards = GetTableShards(server, sender, "/Root/table");
4574+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
4575+
const auto tableId = ResolveTableId(server, sender, "/Root/table");
4576+
4577+
for (int i = 0; i < 20; ++i) {
4578+
Cerr << "... upserting key " << i << Endl;
4579+
auto query = Sprintf(R"(
4580+
UPSERT INTO `/Root/table` (key, value) VALUES (%d, '%s');
4581+
)", i, TString(128 * 1024, 'x').c_str());
4582+
ExecSQL(server, sender, query);
4583+
if (i >= 5) {
4584+
Cerr << "... compacting shard " << shards.at(1) << Endl;
4585+
CompactTable(runtime, shards.at(1), tableId, false);
4586+
} else if (i == 4) {
4587+
Cerr << "... compacting shard " << shards.at(0) << Endl;
4588+
CompactTable(runtime, shards.at(0), tableId, false);
4589+
}
4590+
}
4591+
4592+
// Read (and snapshot) current data, so it doesn't go away on compaction
4593+
UNIT_ASSERT_VALUES_EQUAL(
4594+
KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"),
4595+
"{ items { uint64_value: 20 } }");
4596+
4597+
// Delete all the data in shard 0, this is small and will stay in memtable
4598+
// But when borrowed dst compaction will have pressure to compact it all
4599+
ExecSQL(server, sender, "DELETE FROM `/Root/table` WHERE key < 5");
4600+
4601+
std::vector<TEvDataShard::TEvSplitTransferSnapshot::TPtr> snapshots;
4602+
auto captureSnapshots = runtime.AddObserver<TEvDataShard::TEvSplitTransferSnapshot>(
4603+
[&](TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev) {
4604+
auto* msg = ev->Get();
4605+
Cerr << "... captured snapshot from " << msg->Record.GetSrcTabletId() << Endl;
4606+
snapshots.emplace_back(ev.Release());
4607+
});
4608+
4609+
Cerr << "... merging table" << Endl;
4610+
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
4611+
ui64 txId = AsyncMergeTable(server, sender, "/Root/table", shards);
4612+
Cerr << "... started merge " << txId << Endl;
4613+
WaitFor(runtime, [&]{ return snapshots.size() >= 2; }, "both src tablet snapshots");
4614+
4615+
std::vector<TEvBlobStorage::TEvGet::TPtr> gets;
4616+
auto captureGets = runtime.AddObserver<TEvBlobStorage::TEvGet>(
4617+
[&](TEvBlobStorage::TEvGet::TPtr& ev) {
4618+
auto* msg = ev->Get();
4619+
if (msg->Queries[0].Id.TabletID() == shards.at(1)) {
4620+
Cerr << "... blocking blob get of " << msg->Queries[0].Id << Endl;
4621+
gets.emplace_back(ev.Release());
4622+
}
4623+
});
4624+
4625+
// Release snapshot for shard 0 then shard 1
4626+
captureSnapshots.Remove();
4627+
Cerr << "... unlocking snapshots from tablet " << shards.at(0) << Endl;
4628+
for (auto& ev : snapshots) {
4629+
if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(0)) {
4630+
runtime.Send(ev.Release(), 0, true);
4631+
}
4632+
}
4633+
Cerr << "... unblocking snapshots from tablet " << shards.at(1) << Endl;
4634+
for (auto& ev : snapshots) {
4635+
if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(1)) {
4636+
runtime.Send(ev.Release(), 0, true);
4637+
}
4638+
}
4639+
4640+
// Let it commit above snapshots and incorrectly compact after the first one is loaded and merged
4641+
runtime.SimulateSleep(TDuration::Seconds(1));
4642+
UNIT_ASSERT(gets.size() > 0);
4643+
4644+
Cerr << "... unblocking blob gets" << Endl;
4645+
captureGets.Remove();
4646+
for (auto& ev : gets) {
4647+
runtime.Send(ev.Release(), 0, true);
4648+
}
4649+
4650+
// Let it finish loading the second snapshot
4651+
runtime.SimulateSleep(TDuration::Seconds(1));
4652+
4653+
// Wait for merge to complete and start a borrowed compaction
4654+
// When bug is present it will cause newly compacted to part to have epoch larger than previously compacted
4655+
WaitTxNotification(server, sender, txId);
4656+
const auto merged = GetTableShards(server, sender, "/Root/table");
4657+
UNIT_ASSERT_VALUES_EQUAL(merged.size(), 1u);
4658+
Cerr << "... compacting borrowed parts in shard " << merged.at(0) << Endl;
4659+
CompactBorrowed(runtime, merged.at(0), tableId);
4660+
4661+
// Validate we have an expected number of rows
4662+
UNIT_ASSERT_VALUES_EQUAL(
4663+
KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"),
4664+
"{ items { uint64_value: 15 } }");
4665+
}
4666+
45434667
}
45444668

45454669
} // namespace NKikimr

0 commit comments

Comments
 (0)