Skip to content

Commit 4d22497

Browse files
authored
24-1 Correctly trigger borrow compaction for shadow data (#4978)
1 parent 178cf6e commit 4d22497

19 files changed

+289
-180
lines changed

ydb/core/tx/datashard/build_index.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ class TBuildIndexScan : public TActor<TBuildIndexScan>, public NTable::IScan {
328328

329329
EScan Seek(TLead& lead, ui64 seq) noexcept override {
330330
auto ctx = TActivationContext::AsActorContext().MakeFor(SelfId());
331-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
331+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
332332
"Seek no " << seq << " " << Debug());
333333
if (seq) {
334334
if (!WriteBuf.IsEmpty()) {
@@ -367,7 +367,7 @@ class TBuildIndexScan : public TActor<TBuildIndexScan>, public NTable::IScan {
367367

368368
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept override {
369369
auto ctx = TActivationContext::AsActorContext().MakeFor(SelfId());
370-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
370+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
371371
"Feed key " << DebugPrintPoint(KeyTypes, key, *AppData()->TypeRegistry)
372372
<< " " << Debug());
373373

ydb/core/tx/datashard/datashard__compact_borrowed.cpp

+39-25
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,59 @@ class TDataShard::TTxCompactBorrowed : public NTabletFlatExecutor::TTransactionB
2121
<< " for table " << pathId
2222
<< " at tablet " << Self->TabletID());
2323

24-
auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(Self->TabletID(), pathId);
24+
auto nothingToCompactResult = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(Self->TabletID(), pathId);
2525

26-
if (pathId.OwnerId != Self->GetPathOwnerId()) {
27-
// Ignore unexpected owner
28-
ctx.Send(Ev->Sender, std::move(response));
26+
if (pathId.OwnerId != Self->GetPathOwnerId()) { // ignore unexpected owner
27+
ctx.Send(Ev->Sender, std::move(nothingToCompactResult));
2928
return true;
3029
}
31-
3230
auto it = Self->TableInfos.find(pathId.LocalPathId);
33-
if (it == Self->TableInfos.end()) {
34-
// Ignore unexpected table (may normally happen with races)
35-
ctx.Send(Ev->Sender, std::move(response));
31+
if (it == Self->TableInfos.end()) { // ignore unexpected table (may normally happen with races)
32+
ctx.Send(Ev->Sender, std::move(nothingToCompactResult));
3633
return true;
3734
}
38-
3935
const TUserTable& tableInfo = *it->second;
36+
37+
THashSet<ui32> tablesToCompact;
38+
if (txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID())) {
39+
tablesToCompact.insert(tableInfo.LocalTid);
40+
}
41+
if (tableInfo.ShadowTid && txc.DB.HasBorrowed(tableInfo.ShadowTid, Self->TabletID())) {
42+
tablesToCompact.insert(tableInfo.ShadowTid);
43+
}
44+
45+
auto waiter = MakeIntrusive<TCompactBorrowedWaiter>(Ev->Sender, pathId.LocalPathId);
4046

41-
bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID());
42-
if (!hasBorrowed) {
47+
for (auto tableToCompact : tablesToCompact) {
4348
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
4449
"TEvCompactBorrowed request from " << Ev->Sender
4550
<< " for table " << pathId
46-
<< " has no borrowed parts"
51+
<< " starting compaction for local table " << tableToCompact
4752
<< " at tablet " << Self->TabletID());
48-
ctx.Send(Ev->Sender, std::move(response));
49-
return true;
50-
}
5153

52-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
53-
"TEvCompactBorrowed request from " << Ev->Sender
54-
<< " for table " << pathId
55-
<< " starting compaction for local table " << tableInfo.LocalTid
56-
<< " at tablet " << Self->TabletID());
57-
58-
Self->Executor()->CompactBorrowed(tableInfo.LocalTid);
59-
Self->IncCounter(COUNTER_TX_COMPACT_BORROWED);
60-
++tableInfo.Stats.CompactBorrowedCount;
54+
if (Self->Executor()->CompactBorrowed(tableToCompact)) {
55+
Self->IncCounter(COUNTER_TX_COMPACT_BORROWED);
56+
++tableInfo.Stats.CompactBorrowedCount;
57+
58+
waiter->CompactingTables.insert(tableToCompact);
59+
Self->CompactBorrowedWaiters[tableToCompact].push_back(waiter);
60+
} else {
61+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
62+
"TEvCompactBorrowed request from " << Ev->Sender
63+
<< " for table " << pathId
64+
<< " can not be compacted"
65+
<< " at tablet " << Self->TabletID());
66+
}
67+
}
6168

62-
Self->CompactBorrowedWaiters[tableInfo.LocalTid].emplace_back(Ev->Sender);
69+
if (waiter->CompactingTables.empty()) { // none has been triggered
70+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
71+
"TEvCompactBorrowed request from " << Ev->Sender
72+
<< " for table " << pathId
73+
<< " has no parts for borrowed compaction"
74+
<< " at tablet " << Self->TabletID());
75+
ctx.Send(Ev->Sender, std::move(nothingToCompactResult));
76+
}
6377

6478
return true;
6579
}

ydb/core/tx/datashard/datashard__compaction.cpp

+31-21
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,12 @@ void TDataShard::Handle(TEvDataShard::TEvCompactTable::TPtr& ev, const TActorCon
196196
void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) {
197197
auto finishedInfo = Executor()->GetFinishedCompactionInfo(tableId);
198198

199+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
200+
"CompactionComplete of tablet# "<< TabletID()
201+
<< ", table# " << tableId
202+
<< ", finished edge# " << finishedInfo.Edge
203+
<< ", ts " << finishedInfo.FullCompactionTs);
204+
199205
TLocalPathId localPathId = InvalidLocalPathId;
200206
if (tableId >= Schema::MinLocalTid) {
201207
for (auto& ti : TableInfos) {
@@ -234,11 +240,12 @@ void TDataShard::ReplyCompactionWaiters(
234240
<< ", finished edge# " << compactionInfo.Edge
235241
<< ", front# " << (CompactionWaiters[tableId].empty() ? 0UL : std::get<0>(CompactionWaiters[tableId].front())));
236242

237-
auto& fullCompactionQueue = CompactionWaiters[tableId];
238-
while (!fullCompactionQueue.empty()) {
239-
const auto& waiter = CompactionWaiters[tableId].front();
240-
if (std::get<0>(waiter) > compactionInfo.Edge)
243+
auto fullCompactionQueue = CompactionWaiters.FindPtr(tableId);
244+
while (fullCompactionQueue && !fullCompactionQueue->empty()) {
245+
const auto& waiter = fullCompactionQueue->front();
246+
if (std::get<0>(waiter) > compactionInfo.Edge) {
241247
break;
248+
}
242249

243250
const auto& sender = std::get<1>(waiter);
244251
auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>(
@@ -252,27 +259,30 @@ void TDataShard::ReplyCompactionWaiters(
252259
"Sending TEvCompactTableResult to# " << sender
253260
<< "pathId# " << TPathId(GetPathOwnerId(), localPathId));
254261

255-
fullCompactionQueue.pop_front();
262+
fullCompactionQueue->pop_front();
256263
}
257264

258-
auto& compactBorrowedQueue = CompactBorrowedWaiters[tableId];
259-
if (!compactBorrowedQueue.empty()) {
265+
auto compactBorrowedQueue = CompactBorrowedWaiters.FindPtr(tableId);
266+
if (compactBorrowedQueue && !compactBorrowedQueue->empty()) {
260267
const bool hasBorrowed = Executor()->HasBorrowed(tableId, TabletID());
261268
if (!hasBorrowed) {
262-
while (!compactBorrowedQueue.empty()) {
263-
const auto& waiter = compactBorrowedQueue.front();
264-
265-
auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(
266-
TabletID(),
267-
GetPathOwnerId(),
268-
localPathId);
269-
ctx.Send(waiter, std::move(response));
270-
271-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
272-
"Sending TEvCompactBorrowedResult to# " << waiter
273-
<< "pathId# " << TPathId(GetPathOwnerId(), localPathId));
274-
275-
compactBorrowedQueue.pop_front();
269+
while (!compactBorrowedQueue->empty()) {
270+
const auto& waiter = compactBorrowedQueue->front();
271+
waiter->CompactingTables.erase(tableId);
272+
273+
if (waiter->CompactingTables.empty()) { // all requested tables have been compacted
274+
auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(
275+
TabletID(),
276+
GetPathOwnerId(),
277+
waiter->RequestedTable);
278+
ctx.Send(waiter->ActorId, std::move(response));
279+
280+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
281+
"Sending TEvCompactBorrowedResult to# " << waiter->ActorId
282+
<< "pathId# " << TPathId(GetPathOwnerId(), waiter->RequestedTable));
283+
}
284+
285+
compactBorrowedQueue->pop_front();
276286
}
277287
}
278288
}

ydb/core/tx/datashard/datashard__stats.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,9 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo
361361
Actors.erase(ev->Sender);
362362

363363
ui64 tableId = ev->Get()->TableId;
364-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Stats rebuilt at datashard " << TabletID() << ", for tableId " << tableId);
364+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Stats rebuilt at datashard " << TabletID() << ", for tableId " << tableId
365+
<< ": RowCount " << ev->Get()->Stats.RowCount << ", DataSize " << ev->Get()->Stats.DataSize.Size
366+
<< (ev->Get()->PartOwners.size() > 1 || ev->Get()->PartOwners.size() == 1 && *ev->Get()->PartOwners.begin() != TabletID() ? ", with borrowed parts" : ""));
365367

366368
i64 dataSize = 0;
367369
if (TableInfos.contains(tableId)) {

ydb/core/tx/datashard/datashard_impl.h

+11-2
Original file line numberDiff line numberDiff line change
@@ -2822,10 +2822,19 @@ class TDataShard
28222822
// from the front
28232823
THashMap<ui32, TCompactionWaiterList> CompactionWaiters;
28242824

2825-
using TCompactBorrowedWaiterList = TList<TActorId>;
2825+
struct TCompactBorrowedWaiter : public TThrRefBase {
2826+
TCompactBorrowedWaiter(TActorId actorId, TLocalPathId requestedTable)
2827+
: ActorId(actorId)
2828+
, RequestedTable(requestedTable)
2829+
{ }
2830+
2831+
TActorId ActorId;
2832+
TLocalPathId RequestedTable;
2833+
THashSet<ui32> CompactingTables;
2834+
};
28262835

28272836
// tableLocalTid -> waiters, similar to CompactionWaiters
2828-
THashMap<ui32, TCompactBorrowedWaiterList> CompactBorrowedWaiters;
2837+
THashMap<ui32, TList<TIntrusivePtr<TCompactBorrowedWaiter>>> CompactBorrowedWaiters;
28292838

28302839
struct TReplicationSourceOffsetsReceiveState {
28312840
// A set of tables for which we already received offsets

ydb/core/tx/datashard/datashard_loans.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ NTabletFlatExecutor::ITransaction* TDataShard::CreateTxInitiateBorrowedPartsRetu
5050
}
5151

5252
void TDataShard::CompletedLoansChanged(const TActorContext &ctx) {
53+
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CompletedLoansChanged");
5354
Y_ABORT_UNLESS(Executor()->GetStats().CompactedPartLoans);
5455

5556
CheckInitiateBorrowedPartsReturn(ctx);

ydb/core/tx/datashard/datashard_ut_build_index.cpp

+98-3
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
9595
CreateShardedTable(server, sender, root, name, opts);
9696
}
9797

98-
Y_UNIT_TEST(TestRunScan) {
98+
Y_UNIT_TEST(RunScan) {
9999
TPortManager pm;
100100
TServerSettings serverSettings(pm.GetPort(2134));
101101
serverSettings.SetDomainName("Root")
@@ -130,8 +130,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
130130
// Alter table: disable shadow data and change compaction policy
131131
auto policy = NLocalDb::CreateDefaultUserTablePolicy();
132132
policy->KeepEraseMarkers = false;
133-
WaitTxNotification(server,
134-
AsyncAlterAndDisableShadow(server, "/Root", "table-2", policy.Get()));
133+
WaitTxNotification(server, AsyncAlterAndDisableShadow(server, "/Root", "table-2", policy.Get()));
135134

136135
// Shadow data must be visible now
137136
auto data2 = ReadShardedTable(server, "/Root/table-2");
@@ -140,6 +139,102 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
140139
"value = 300, key = 3\n"
141140
"value = 500, key = 5\n");
142141
}
142+
143+
Y_UNIT_TEST(ShadowBorrowCompaction) {
144+
TPortManager pm;
145+
TServerSettings serverSettings(pm.GetPort(2134));
146+
serverSettings
147+
.SetDomainName("Root")
148+
.SetUseRealThreads(false);
149+
150+
Tests::TServer::TPtr server = new TServer(serverSettings);
151+
auto &runtime = *server->GetRuntime();
152+
auto sender = runtime.AllocateEdgeActor();
153+
154+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
155+
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_TRACE);
156+
runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
157+
158+
// Allow manipulating shadow data using normal schemeshard operations
159+
runtime.GetAppData().AllowShadowDataInSchemeShardForTests = true;
160+
161+
InitRoot(server, sender);
162+
163+
CreateShardedTable(server, sender, "/Root", "table-1", 1, false);
164+
165+
// Upsert some initial values
166+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (2, 200), (3, 300), (4, 400), (5, 500);");
167+
168+
CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false);
169+
170+
auto observer = runtime.AddObserver<TEvDataShard::TEvCompactBorrowed>([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) {
171+
Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
172+
if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") {
173+
event.Reset();
174+
}
175+
});
176+
177+
auto snapshot = CreateVolatileSnapshot(server, { "/Root/table-1" });
178+
179+
DoBuildIndex(server, sender, "/Root/table-1", "/Root/table-2", snapshot, NKikimrTxDataShard::TEvBuildIndexProgressResponse::DONE);
180+
181+
// Writes to shadow data should not be visible yet
182+
auto data = ReadShardedTable(server, "/Root/table-2");
183+
UNIT_ASSERT_VALUES_EQUAL(data, "");
184+
185+
// Split index
186+
auto shards1 = GetTableShards(server, sender, "/Root/table-2");
187+
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
188+
189+
// Split would fail otherwise :(
190+
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
191+
192+
auto senderSplit = runtime.AllocateEdgeActor();
193+
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/table-2", shards1.at(0), 300);
194+
WaitTxNotification(server, senderSplit, txId);
195+
196+
auto shards2 = GetTableShards(server, sender, "/Root/table-2");
197+
UNIT_ASSERT_VALUES_EQUAL(shards2.size(), 2u);
198+
199+
for (auto shardIndex : xrange(2u)) {
200+
auto stats = WaitTableStats(runtime, shards2.at(shardIndex));
201+
// Cerr << "Received shard stats:" << Endl << stats.DebugString() << Endl;
202+
203+
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), shardIndex == 0 ? 2 : 3);
204+
205+
THashSet<ui64> owners(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
206+
// Note: datashard always adds current shard to part owners, even if there are no parts
207+
UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards1.at(0), shards2.at(shardIndex)}));
208+
209+
auto tableId = ResolveTableId(server, sender, "/Root/table-2");
210+
auto result = CompactBorrowed(runtime, shards2.at(shardIndex), tableId);
211+
// Cerr << "Compact result " << result.DebugString() << Endl;
212+
UNIT_ASSERT_VALUES_EQUAL(result.GetTabletId(), shards2.at(shardIndex));
213+
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetOwnerId(), tableId.PathId.OwnerId);
214+
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetLocalId(), tableId.PathId.LocalPathId);
215+
216+
for (int i = 0; i < 5 && (owners.size() > 1 || owners.contains(shards1.at(0))); ++i) {
217+
auto stats = WaitTableStats(runtime, shards2.at(shardIndex));
218+
owners = THashSet<ui64>(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
219+
}
220+
221+
UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards2.at(shardIndex)}));
222+
}
223+
224+
// Alter table: disable shadow data and change compaction policy
225+
auto policy = NLocalDb::CreateDefaultUserTablePolicy();
226+
policy->KeepEraseMarkers = false;
227+
WaitTxNotification(server, AsyncAlterAndDisableShadow(server, "/Root", "table-2", policy.Get()));
228+
229+
// Shadow data must be visible now
230+
auto data2 = ReadShardedTable(server, "/Root/table-2");
231+
UNIT_ASSERT_VALUES_EQUAL(data2,
232+
"value = 100, key = 1\n"
233+
"value = 200, key = 2\n"
234+
"value = 300, key = 3\n"
235+
"value = 400, key = 4\n"
236+
"value = 500, key = 5\n");
237+
}
143238
}
144239

145240
} // namespace NKikimr

0 commit comments

Comments
 (0)