Skip to content

Fix race between table merge and borrowed gc compaction. Fixes #3154. #3164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 48 additions & 33 deletions ydb/core/tx/datashard/datashard_split_dst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,11 @@ class TDataShard::TTxInitSplitMergeDestination : public NTabletFlatExecutor::TTr
class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
private:
TEvDataShard::TEvSplitTransferSnapshot::TPtr Ev;
bool LastSnapshotReceived;

public:
TTxSplitTransferSnapshot(TDataShard* ds, TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev)
: NTabletFlatExecutor::TTransactionBase<TDataShard>(ds)
, Ev(ev)
, LastSnapshotReceived(false)
{}

TTxType GetTxType() const override { return TXTYPE_SPLIT_TRANSFER_SNAPSHOT; }
Expand Down Expand Up @@ -257,8 +255,6 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
}

if (Self->ReceiveSnapshotsFrom.empty()) {
LastSnapshotReceived = true;

const auto minVersion = mvcc ? Self->GetSnapshotManager().GetLowWatermark()
: Self->GetSnapshotManager().GetMinWriteVersion();

Expand Down Expand Up @@ -295,6 +291,12 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
// Note: we persist Ready, but keep current state in memory until Complete
Self->SetPersistState(TShardState::Ready, txc);
Self->State = TShardState::SplitDstReceivingSnapshot;

// Schedule a new transaction that will move shard to the Ready state
// and finish initialization. This new transaction is guaranteed to
// wait until async LoanTable above is complete and new parts are
// fully merged into the table.
Self->Execute(new TTxLastSnapshotReceived(Self));
}

return true;
Expand All @@ -307,39 +309,52 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack snapshot OpId " << opId);

ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));
}

// Note: we skip init in an unlikely event of state resetting between Execute and Complete
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
// We have received all the data, finish shard initialization
// Note: previously we used TxInit, however received system tables
// have been empty for years now, and since pipes are still open we
// may receive requests between TxInit loading the Ready state and
// its Complete method initializing everything properly. Instead
// necessary steps are repeated here.
Self->State = TShardState::Ready;

// We are already in StateWork, but we need to repeat many steps now that we are Ready
Self->SwitchToWork(ctx);

// We can send the registration request now that we are ready
Self->SendRegistrationRequestTimeCast(ctx);

// Initialize snapshot expiration queue with current context time
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
Self->PlanCleanup(ctx);
}
class TTxLastSnapshotReceived : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
public:
TTxLastSnapshotReceived(TDataShard* self)
: TTransactionBase(self)
{}

// Initialize change senders
Self->KillChangeSender(ctx);
Self->CreateChangeSender(ctx);
Self->MaybeActivateChangeSender(ctx);
Self->EmitHeartbeats();
bool Execute(TTransactionContext&, const TActorContext&) override {
return true;
}

// Switch mvcc state if needed
Self->CheckMvccStateChangeCanStart(ctx);
void Complete(const TActorContext& ctx) override {
// Note: we skip init in an unlikely event of state resetting before reaching Complete
if (Self->State == TShardState::SplitDstReceivingSnapshot) {
// We have received all the data, finish shard initialization
// Note: previously we used TxInit, however received system tables
// have been empty for years now, and since pipes are still open we
// may receive requests between TxInit loading the Ready state and
// its Complete method initializing everything properly. Instead
// necessary steps are repeated here.
Self->State = TShardState::Ready;

// We are already in StateWork, but we need to repeat many steps now that we are Ready
Self->SwitchToWork(ctx);

// We can send the registration request now that we are ready
Self->SendRegistrationRequestTimeCast(ctx);

// Initialize snapshot expiration queue with current context time
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
Self->PlanCleanup(ctx);
}

// Initialize change senders
Self->KillChangeSender(ctx);
Self->CreateChangeSender(ctx);
Self->MaybeActivateChangeSender(ctx);
Self->EmitHeartbeats();

// Switch mvcc state if needed
Self->CheckMvccStateChangeCanStart(ctx);
}
}
}
};
};

class TDataShard::TTxSplitReplicationSourceOffsets : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ namespace NKqpHelpers {
using TEvExecuteDataQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
Ydb::Table::ExecuteDataQueryResponse>;

using TEvExecuteSchemeQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest,
Ydb::Table::ExecuteSchemeQueryResponse>;

using TEvCreateSessionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest,
Ydb::Table::CreateSessionResponse>;

Expand Down Expand Up @@ -224,6 +227,31 @@ namespace NKqpHelpers {
return FormatResult(result);
}

inline Ydb::Table::ExecuteSchemeQueryRequest MakeSchemeRequestRPC(
const TString& sql, const TString& sessionId)
{
Ydb::Table::ExecuteSchemeQueryRequest request;
request.set_session_id(sessionId);
request.set_yql_text(sql);
return request;
}

inline NThreading::TFuture<Ydb::Table::ExecuteSchemeQueryResponse> SendRequest(
TTestActorRuntime& runtime, Ydb::Table::ExecuteSchemeQueryRequest&& request, const TString& database = {})
{
return NRpcService::DoLocalRpc<TEvExecuteSchemeQueryRequest>(
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
}

inline TString KqpSchemeExec(TTestActorRuntime& runtime, const TString& query) {
TString sessionId = CreateSessionRPC(runtime);
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSchemeRequestRPC(query, sessionId)));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
return "SUCCESS";
}

} // namespace NKqpHelpers
} // namespace NDataShard
} // namespace NKikimr
124 changes: 124 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4540,6 +4540,130 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}

void CompactBorrowed(TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) {
auto msg = MakeHolder<TEvDataShard::TEvCompactBorrowed>(tableId.PathId);
auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(shardId, sender, msg.Release(), 0, GetPipeConfigWithRetries());
runtime.GrabEdgeEventRethrow<TEvDataShard::TEvCompactBorrowedResult>(sender);
}

Y_UNIT_TEST(PostMergeNotCompactedTooEarly) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key int, value bytes, PRIMARY KEY (key))
WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
PARTITION_AT_KEYS = (5));
)");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
const auto tableId = ResolveTableId(server, sender, "/Root/table");

for (int i = 0; i < 20; ++i) {
Cerr << "... upserting key " << i << Endl;
auto query = Sprintf(R"(
UPSERT INTO `/Root/table` (key, value) VALUES (%d, '%s');
)", i, TString(128 * 1024, 'x').c_str());
ExecSQL(server, sender, query);
if (i >= 5) {
Cerr << "... compacting shard " << shards.at(1) << Endl;
CompactTable(runtime, shards.at(1), tableId, false);
} else if (i == 4) {
Cerr << "... compacting shard " << shards.at(0) << Endl;
CompactTable(runtime, shards.at(0), tableId, false);
}
}

// Read (and snapshot) current data, so it doesn't go away on compaction
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"),
"{ items { uint64_value: 20 } }");

// Delete all the data in shard 0, this is small and will stay in memtable
// But when borrowed dst compaction will have pressure to compact it all
ExecSQL(server, sender, "DELETE FROM `/Root/table` WHERE key < 5");

std::vector<TEvDataShard::TEvSplitTransferSnapshot::TPtr> snapshots;
auto captureSnapshots = runtime.AddObserver<TEvDataShard::TEvSplitTransferSnapshot>(
[&](TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev) {
auto* msg = ev->Get();
Cerr << "... captured snapshot from " << msg->Record.GetSrcTabletId() << Endl;
snapshots.emplace_back(ev.Release());
});

Cerr << "... merging table" << Endl;
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
ui64 txId = AsyncMergeTable(server, sender, "/Root/table", shards);
Cerr << "... started merge " << txId << Endl;
WaitFor(runtime, [&]{ return snapshots.size() >= 2; }, "both src tablet snapshots");

std::vector<TEvBlobStorage::TEvGet::TPtr> gets;
auto captureGets = runtime.AddObserver<TEvBlobStorage::TEvGet>(
[&](TEvBlobStorage::TEvGet::TPtr& ev) {
auto* msg = ev->Get();
if (msg->Queries[0].Id.TabletID() == shards.at(1)) {
Cerr << "... blocking blob get of " << msg->Queries[0].Id << Endl;
gets.emplace_back(ev.Release());
}
});

// Release snapshot for shard 0 then shard 1
captureSnapshots.Remove();
Cerr << "... unlocking snapshots from tablet " << shards.at(0) << Endl;
for (auto& ev : snapshots) {
if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(0)) {
runtime.Send(ev.Release(), 0, true);
}
}
Cerr << "... unblocking snapshots from tablet " << shards.at(1) << Endl;
for (auto& ev : snapshots) {
if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(1)) {
runtime.Send(ev.Release(), 0, true);
}
}

// Let it commit above snapshots and incorrectly compact after the first one is loaded and merged
runtime.SimulateSleep(TDuration::Seconds(1));
UNIT_ASSERT(gets.size() > 0);

Cerr << "... unblocking blob gets" << Endl;
captureGets.Remove();
for (auto& ev : gets) {
runtime.Send(ev.Release(), 0, true);
}

// Let it finish loading the second snapshot
runtime.SimulateSleep(TDuration::Seconds(1));

// Wait for merge to complete and start a borrowed compaction
// When bug is present it will cause newly compacted to part to have epoch larger than previously compacted
WaitTxNotification(server, sender, txId);
const auto merged = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(merged.size(), 1u);
Cerr << "... compacting borrowed parts in shard " << merged.at(0) << Endl;
CompactBorrowed(runtime, merged.at(0), tableId);

// Validate we have an expected number of rows
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"),
"{ items { uint64_value: 15 } }");
}

}

} // namespace NKikimr