Skip to content

Commit 34f54c9

Browse files
authored
Fix datashard read traces (#913)
Fixes #916
1 parent 57a192d commit 34f54c9

File tree

5 files changed

+108
-30
lines changed

5 files changed

+108
-30
lines changed

ydb/core/tablet_flat/shared_sausagecache.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,10 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
258258
};
259259

260260
struct TRequest : public TSimpleRefCount<TRequest> {
261-
TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection)
261+
TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection, NWilson::TTraceId &&traceId)
262262
: Label(pageCollection->Label())
263263
, PageCollection(std::move(pageCollection))
264+
, TraceId(std::move(traceId))
264265
{
265266

266267
}
@@ -275,6 +276,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
275276
ui64 PendingBlocks = 0;
276277
TBlocks ReadyBlocks;
277278
TDeque<ui32> PagesToRequest;
279+
NWilson::TTraceId TraceId;
278280
};
279281

280282
struct TExpectant {
@@ -587,7 +589,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
587589
}
588590
}
589591

590-
auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection));
592+
auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection), std::move(msg->Fetch->TraceId));
591593

592594
waitingRequest->Source = ev->Sender;
593595
waitingRequest->Owner = msg->Owner;
@@ -674,7 +676,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
674676
} else {
675677
AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes);
676678
// fetch cookie -> requested size
677-
auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest));
679+
auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest), std::move(waitingRequest->TraceId));
678680
NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch);
679681
}
680682
}
@@ -758,7 +760,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
758760
AddInFlyPages(toLoad.size(), sizeToLoad);
759761
// fetch cookie -> requested size;
760762
// event cookie -> ptr to queue
761-
auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad));
763+
auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad), std::move(wa.TraceId));
762764
NBlockIO::Start(this, wa.Owner, (ui64)&queue, wa.Priority, fetch);
763765
}
764766
}

ydb/core/tx/datashard/datashard__read_iterator.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -2107,7 +2107,6 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB
21072107

21082108
const ui64 tieBreaker = Self->NextTieBreakerIndex++;
21092109
Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev);
2110-
Op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, readSpan.GetTraceId(), "ReadIterator.ReadOperation", NWilson::EFlags::AUTO_END);
21112110

21122111
Op->BuildExecutionPlan(false);
21132112
Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
@@ -2537,7 +2536,8 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
25372536
auto* request = ev->Get();
25382537

25392538
if (!request->ReadSpan) {
2540-
request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "DataShard.Read");
2539+
request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "Datashard.Read", NWilson::EFlags::AUTO_END);
2540+
request->ReadSpan.Attribute("Shard", std::to_string(TabletID()));
25412541
}
25422542

25432543
const auto& record = request->Record;

ydb/core/tx/datashard/datashard_ut_trace.cpp

+97-17
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,93 @@ using namespace NDataShardReadTableTest;
2121

2222
Y_UNIT_TEST_SUITE(TDataShardTrace) {
2323

24+
void ExecSQL(Tests::TServer::TPtr server,
25+
TActorId sender,
26+
const TString &sql,
27+
Ydb::StatusIds::StatusCode code,
28+
NWilson::TTraceId traceId = {})
29+
{
30+
google::protobuf::Arena arena;
31+
auto &runtime = *server->GetRuntime();
32+
TAutoPtr<IEventHandle> handle;
33+
34+
THolder<NKqp::TEvKqp::TEvQueryRequest> request;
35+
if (traceId) {
36+
struct RequestCtx : NGRpcService::IRequestCtxMtSafe {
37+
RequestCtx(NWilson::TTraceId &&traceId) : TraceId(std::move(traceId)) {}
38+
39+
NWilson::TTraceId GetWilsonTraceId() const override {
40+
return TraceId.Clone();
41+
}
42+
43+
TMaybe<TString> GetTraceId() const override {
44+
return Nothing();
45+
}
46+
47+
const TMaybe<TString> GetDatabaseName() const override {
48+
return "";
49+
}
50+
51+
const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const override {
52+
return Ptr;
53+
}
54+
55+
const TString& GetSerializedToken() const override {
56+
return Token;
57+
}
58+
59+
bool IsClientLost() const override {
60+
return false;
61+
};
62+
63+
virtual const google::protobuf::Message* GetRequest() const override {
64+
return nullptr;
65+
};
66+
67+
const TMaybe<TString> GetRequestType() const override {
68+
return "_document_api_request";
69+
};
70+
71+
void SetFinishAction(std::function<void()>&& cb) override {
72+
Y_UNUSED(cb);
73+
};
74+
75+
google::protobuf::Arena* GetArena() override {
76+
return nullptr;
77+
};
78+
79+
TIntrusiveConstPtr<NACLib::TUserToken> Ptr;
80+
TString Token;
81+
NWilson::TTraceId TraceId;
82+
};
83+
84+
auto *txControl = google::protobuf::Arena::CreateMessage<Ydb::Table::TransactionControl>(&arena);
85+
txControl->mutable_begin_tx()->mutable_serializable_read_write();
86+
txControl->set_commit_tx(true);
87+
88+
auto ptr = std::make_shared<RequestCtx>(std::move(traceId));
89+
request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
90+
NKikimrKqp::QUERY_ACTION_EXECUTE,
91+
NKikimrKqp::QUERY_TYPE_SQL_DML,
92+
TActorId(),
93+
ptr,
94+
TString(), //sessionId
95+
TString(sql),
96+
TString(), //queryId
97+
txControl, //tx_control
98+
nullptr, //ydbParameters
99+
Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED, //collectStats
100+
nullptr, // query_cache_policy
101+
nullptr //operationParams
102+
);
103+
} else {
104+
request = MakeSQLRequest(sql, true);
105+
}
106+
runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr));
107+
auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
108+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
109+
}
110+
24111
class FakeWilsonUploader : public TActorBootstrapped<FakeWilsonUploader> {
25112
public:
26113
class Span {
@@ -232,7 +319,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
232319
server,
233320
sender,
234321
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
235-
true,
236322
Ydb::StatusIds::SUCCESS,
237323
std::move(traceId)
238324
);
@@ -292,15 +378,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
292378
server,
293379
sender,
294380
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
295-
true,
296381
Ydb::StatusIds::SUCCESS
297382
);
298383

299384
ExecSQL(
300385
server,
301386
sender,
302387
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
303-
true,
304388
Ydb::StatusIds::SUCCESS
305389
);
306390

@@ -326,7 +410,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
326410
server,
327411
sender,
328412
"SELECT * FROM `/Root/table-1` WHERE key = 1 OR key = 3 OR key = 5 OR key = 7 OR key = 9;",
329-
true,
330413
Ydb::StatusIds::SUCCESS,
331414
std::move(traceId)
332415
);
@@ -342,23 +425,23 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
342425
auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor");
343426
UNIT_ASSERT(lookupActorSpan);
344427

345-
auto dsReads = lookupActorSpan->get().FindAll("DataShard.Read"); // Lookup actor sends EvRead to each shard.
428+
auto dsReads = lookupActorSpan->get().FindAll("Datashard.Read"); // Lookup actor sends EvRead to each shard.
346429
UNIT_ASSERT_EQUAL(dsReads.size(), 2);
347430

348431
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
349432
", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) "
350-
", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (DataShard.Read "
433+
", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (Datashard.Read "
351434
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) "
352435
", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
353436
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
354437
", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
355-
"-> [(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)]) , (DataShard.Read "
438+
"-> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read "
356439
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) "
357440
", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
358441
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
359442
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
360-
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) "
361-
", (ReadIterator.ReadOperation)])])]) , (ComputeActor) , (RunTasks)])])";
443+
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
444+
"])])]) , (ComputeActor) , (RunTasks)])])";
362445
} else {
363446
auto deSpan = trace.Root.BFSFindOne("DataExecuter");
364447
UNIT_ASSERT(deSpan);
@@ -419,15 +502,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
419502
server,
420503
sender,
421504
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
422-
true,
423505
Ydb::StatusIds::SUCCESS
424506
);
425507

426508
ExecSQL(
427509
server,
428510
sender,
429511
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
430-
true,
431512
Ydb::StatusIds::SUCCESS
432513
);
433514

@@ -437,7 +518,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
437518
server,
438519
sender,
439520
"SELECT * FROM `/Root/table-1`;",
440-
true,
441521
Ydb::StatusIds::SUCCESS,
442522
std::move(traceId)
443523
);
@@ -451,17 +531,17 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
451531
auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
452532
UNIT_ASSERT(readActorSpan);
453533

454-
auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard.
534+
auto dsReads = readActorSpan->get().FindAll("Datashard.Read"); // Read actor sends EvRead to each shard.
455535
UNIT_ASSERT_EQUAL(dsReads.size(), 2);
456536

457537
std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
458538
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
459539
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
460-
"(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
461-
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , "
462-
"(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
540+
"(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
541+
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
542+
"]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
463543
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
464-
"[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])";
544+
"[(Tablet.WriteLog.LogEntry)])])])])])])])";
465545
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
466546
}
467547

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

+2-5
Original file line numberDiff line numberDiff line change
@@ -1796,14 +1796,11 @@ void ExecSQL(Tests::TServer::TPtr server,
17961796
TActorId sender,
17971797
const TString &sql,
17981798
bool dml,
1799-
Ydb::StatusIds::StatusCode code,
1800-
NWilson::TTraceId traceId)
1799+
Ydb::StatusIds::StatusCode code)
18011800
{
18021801
auto &runtime = *server->GetRuntime();
1803-
TAutoPtr<IEventHandle> handle;
1804-
18051802
auto request = MakeSQLRequest(sql, dml);
1806-
runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr, std::move(traceId)));
1803+
runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr));
18071804
auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
18081805
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
18091806
}

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -708,8 +708,7 @@ void ExecSQL(Tests::TServer::TPtr server,
708708
TActorId sender,
709709
const TString &sql,
710710
bool dml = true,
711-
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS,
712-
NWilson::TTraceId traceId = {});
711+
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
713712

714713
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
715714
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});

0 commit comments

Comments
 (0)