@@ -20,9 +20,6 @@ using namespace Tests;
20
20
Y_UNIT_TEST_SUITE (TDataShardTracing) {
21
21
22
22
class FakeWilsonUploader : public TActorBootstrapped <FakeWilsonUploader> {
23
-
24
- using TTrace = std::array<ui64, 2 >;
25
-
26
23
public:
27
24
class Span {
28
25
public:
@@ -170,7 +167,7 @@ Y_UNIT_TEST_SUITE(TDataShardTracing) {
170
167
std::unordered_map<TString, Trace> Traces;
171
168
};
172
169
173
- Y_UNIT_TEST (TestTrace ) {
170
+ Y_UNIT_TEST (TestTraceDistributedUpsert ) {
174
171
TPortManager pm;
175
172
TServerSettings serverSettings (pm.GetPort (2134 ));
176
173
serverSettings.SetDomainName (" Root" )
@@ -181,8 +178,6 @@ Y_UNIT_TEST_SUITE(TDataShardTracing) {
181
178
182
179
auto sender = runtime.AllocateEdgeActor ();
183
180
184
- runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
185
-
186
181
InitRoot (server, sender);
187
182
188
183
auto policy = NLocalDb::CreateDefaultUserTablePolicy ();
@@ -192,9 +187,7 @@ Y_UNIT_TEST_SUITE(TDataShardTracing) {
192
187
193
188
FakeWilsonUploader *uploader = new FakeWilsonUploader ();
194
189
TActorId uploaderId = runtime.Register (uploader, 0 );
195
- runtime.RegisterService (NWilson::MakeWilsonUploaderId (),
196
- uploaderId,
197
- 0 );
190
+ runtime.RegisterService (NWilson::MakeWilsonUploaderId (), uploaderId, 0 );
198
191
runtime.SimulateSleep (TDuration::Seconds (10 ));
199
192
200
193
// Split shard at key 5
@@ -209,7 +202,15 @@ Y_UNIT_TEST_SUITE(TDataShardTracing) {
209
202
UNIT_ASSERT (tablets.size () == 2 );
210
203
}
211
204
212
- ExecSQL (server, sender, " UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);" );
205
+ NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId (15 , 4095 );
206
+ ExecSQL (
207
+ server,
208
+ sender,
209
+ " UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);" ,
210
+ true ,
211
+ Ydb::StatusIds::SUCCESS,
212
+ std::move (traceId)
213
+ );
213
214
214
215
uploader->BuildTraceTrees ();
215
216
@@ -230,14 +231,153 @@ Y_UNIT_TEST_SUITE(TDataShardTracing) {
230
231
UNIT_ASSERT (writeLogEntrySpan);
231
232
};
232
233
234
+ auto checkTxHasDatashardUnits = [](std::reference_wrapper<FakeWilsonUploader::Span> txSpan, ui8 count) {
235
+ auto executeSpan = txSpan.get ().FindOne (" Tablet.Transaction.Execute" );
236
+ UNIT_ASSERT (executeSpan);
237
+ auto unitSpans = executeSpan->get ().FindAll (" Datashard.Unit" );
238
+ UNIT_ASSERT_EQUAL (count, unitSpans.size ());
239
+ };
240
+
233
241
for (auto dsTxSpan : dsTxSpans) {
234
242
auto tabletTxs = dsTxSpan.get ().FindAll (" Tablet.Transaction" );
235
243
UNIT_ASSERT_EQUAL (2 , tabletTxs.size ()); // Each shard executes a proposal tablet tx and a progress tablet tx.
236
244
237
245
auto propose = tabletTxs[0 ];
238
246
checkTxHasWriteLog (propose);
247
+ checkTxHasDatashardUnits (propose, 3 );
248
+
239
249
auto progress = tabletTxs[1 ];
240
250
checkTxHasWriteLog (progress);
251
+ checkTxHasDatashardUnits (progress, 11 );
252
+ }
253
+ }
254
+
255
+ void SplitTable () {
256
+
257
+ }
258
+
259
+ Y_UNIT_TEST (TestTraceDestributedSelect) {
260
+ TPortManager pm;
261
+ TServerSettings serverSettings (pm.GetPort (2134 ));
262
+ serverSettings.SetDomainName (" Root" )
263
+ .SetUseRealThreads (false );
264
+
265
+ Tests::TServer::TPtr server = new TServer (serverSettings);
266
+ auto &runtime = *server->GetRuntime ();
267
+
268
+ auto sender = runtime.AllocateEdgeActor ();
269
+
270
+ InitRoot (server, sender);
271
+
272
+ auto policy = NLocalDb::CreateDefaultUserTablePolicy ();
273
+ policy->KeepEraseMarkers = true ;
274
+
275
+ CreateShardedTable (server, sender, " /Root" , " table-1" , 1 , false , policy.Get ());
276
+
277
+ FakeWilsonUploader *uploader = new FakeWilsonUploader ();
278
+ TActorId uploaderId = runtime.Register (uploader, 0 );
279
+ runtime.RegisterService (NWilson::MakeWilsonUploaderId (), uploaderId, 0 );
280
+ runtime.SimulateSleep (TDuration::Seconds (10 ));
281
+
282
+ // Split shard at key 5
283
+ SetSplitMergePartCountLimit (server->GetRuntime (), -1 );
284
+ {
285
+ auto senderSplit = runtime.AllocateEdgeActor ();
286
+ auto tablets = GetTableShards (server, senderSplit, " /Root/table-1" );
287
+ UNIT_ASSERT (tablets.size () == 1 );
288
+ ui64 txId = AsyncSplitTable (server, senderSplit, " /Root/table-1" , tablets.at (0 ), 5 );
289
+ WaitTxNotification (server, senderSplit, txId);
290
+ tablets = GetTableShards (server, senderSplit, " /Root/table-1" );
291
+ UNIT_ASSERT (tablets.size () == 2 );
292
+ }
293
+
294
+ ExecSQL (
295
+ server,
296
+ sender,
297
+ " UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);" ,
298
+ true ,
299
+ Ydb::StatusIds::SUCCESS
300
+ );
301
+
302
+ ExecSQL (
303
+ server,
304
+ sender,
305
+ " UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);" ,
306
+ true ,
307
+ Ydb::StatusIds::SUCCESS
308
+ );
309
+
310
+ {
311
+ // Compact and restart, so that upon SELECT we will go and load data from BS.
312
+ auto senderCompact = runtime.AllocateEdgeActor ();
313
+ auto shards = GetTableShards (server, senderCompact, " /Root/table-1" );
314
+ for (auto shard: shards) {
315
+ auto [tables, ownerId] = GetTables (server, shard);
316
+ auto compactionResult = CompactTable (runtime, shard, TTableId (ownerId, tables[" table-1" ].GetPathId ()), true );
317
+ UNIT_ASSERT_VALUES_EQUAL (compactionResult.GetStatus (), NKikimrTxDataShard::TEvCompactTableResult::OK);
318
+ }
319
+
320
+ for (auto shard: shards) {
321
+ TActorId sender = runtime.AllocateEdgeActor ();
322
+ GracefulRestartTablet (runtime, shard, sender);
323
+ }
324
+ }
325
+
326
+ NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId (15 , 4095 );
327
+
328
+ ExecSQL (
329
+ server,
330
+ sender,
331
+ " SELECT * FROM `/Root/table-1` WHERE key = 1 OR key = 3 OR key = 5 OR key = 7 OR key = 9;" ,
332
+ true ,
333
+ Ydb::StatusIds::SUCCESS,
334
+ std::move (traceId)
335
+ );
336
+
337
+ uploader->BuildTraceTrees ();
338
+
339
+ UNIT_ASSERT_EQUAL (1 , uploader->Traces .size ());
340
+
341
+ FakeWilsonUploader::Trace &trace = uploader->Traces .begin ()->second ;
342
+
343
+ Cout << trace.ToString () << Endl;
344
+
345
+ auto deSpan = trace.Root .BFSFindOne (" DataExecuter" );
346
+ UNIT_ASSERT (deSpan);
347
+
348
+ auto dsTxSpans = deSpan->get ().FindAll (" Datashard.Transaction" );
349
+ UNIT_ASSERT_EQUAL (2 , dsTxSpans.size ()); // Two shards, each executes a user transaction.
350
+
351
+ auto checkTxHasWriteLog = [](std::reference_wrapper<FakeWilsonUploader::Span> txSpan) {
352
+ auto writeLogSpan = txSpan.get ().FindOne (" Tablet.WriteLog" );
353
+ UNIT_ASSERT (writeLogSpan);
354
+ auto writeLogEntrySpan = writeLogSpan->get ().FindOne (" Tablet.WriteLog.LogEntry" );
355
+ UNIT_ASSERT (writeLogEntrySpan);
356
+ };
357
+
358
+ auto checkExecuteHasDatashardUnits = [](std::reference_wrapper<FakeWilsonUploader::Span> executeSpan, ui8 count) {
359
+ auto unitSpans = executeSpan.get ().FindAll (" Datashard.Unit" );
360
+ UNIT_ASSERT_EQUAL (count, unitSpans.size ());
361
+ };
362
+
363
+ for (auto dsTxSpan : dsTxSpans) {
364
+ auto tabletTxs = dsTxSpan.get ().FindAll (" Tablet.Transaction" );
365
+ UNIT_ASSERT_EQUAL (1 , tabletTxs.size ());
366
+
367
+ auto propose = tabletTxs[0 ];
368
+ checkTxHasWriteLog (propose);
369
+
370
+ // Blobs are loaded from BS.
371
+ UNIT_ASSERT_EQUAL (2 , propose.get ().FindAll (" Tablet.Transaction.Wait" ).size ());
372
+ UNIT_ASSERT_EQUAL (2 , propose.get ().FindAll (" Tablet.Transaction.Enqueued" ).size ());
373
+
374
+ // We execute tx multiple times, because we have to load data for it to execute.
375
+ auto executeSpans = propose.get ().FindAll (" Tablet.Transaction.Execute" );
376
+ UNIT_ASSERT_EQUAL (3 , executeSpans.size ());
377
+
378
+ checkExecuteHasDatashardUnits (executeSpans[0 ], 3 );
379
+ checkExecuteHasDatashardUnits (executeSpans[1 ], 1 );
380
+ checkExecuteHasDatashardUnits (executeSpans[2 ], 3 );
241
381
}
242
382
}
243
383
}
0 commit comments