Skip to content

Fix datashard read traces #913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ydb/core/tablet_flat/shared_sausagecache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,10 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
};

struct TRequest : public TSimpleRefCount<TRequest> {
TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection)
TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection, NWilson::TTraceId &&traceId)
: Label(pageCollection->Label())
, PageCollection(std::move(pageCollection))
, TraceId(std::move(traceId))
{

}
Expand All @@ -275,6 +276,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
ui64 PendingBlocks = 0;
TBlocks ReadyBlocks;
TDeque<ui32> PagesToRequest;
NWilson::TTraceId TraceId;
};

struct TExpectant {
Expand Down Expand Up @@ -587,7 +589,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
}
}

auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection));
auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection), std::move(msg->Fetch->TraceId));

waitingRequest->Source = ev->Sender;
waitingRequest->Owner = msg->Owner;
Expand Down Expand Up @@ -674,7 +676,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
} else {
AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes);
// fetch cookie -> requested size
auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest));
auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest), std::move(waitingRequest->TraceId));
NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch);
}
}
Expand Down Expand Up @@ -758,7 +760,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
AddInFlyPages(toLoad.size(), sizeToLoad);
// fetch cookie -> requested size;
// event cookie -> ptr to queue
auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad));
auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad), std::move(wa.TraceId));
NBlockIO::Start(this, wa.Owner, (ui64)&queue, wa.Priority, fetch);
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,6 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB

const ui64 tieBreaker = Self->NextTieBreakerIndex++;
Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev);
Op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, readSpan.GetTraceId(), "ReadIterator.ReadOperation", NWilson::EFlags::AUTO_END);

Op->BuildExecutionPlan(false);
Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
Expand Down Expand Up @@ -2537,7 +2536,8 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
auto* request = ev->Get();

if (!request->ReadSpan) {
request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "DataShard.Read");
request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "Datashard.Read", NWilson::EFlags::AUTO_END);
request->ReadSpan.Attribute("Shard", std::to_string(TabletID()));
}

const auto& record = request->Record;
Expand Down
114 changes: 97 additions & 17 deletions ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,93 @@ using namespace NDataShardReadTableTest;

Y_UNIT_TEST_SUITE(TDataShardTrace) {

void ExecSQL(Tests::TServer::TPtr server,
TActorId sender,
const TString &sql,
Ydb::StatusIds::StatusCode code,
NWilson::TTraceId traceId = {})
{
google::protobuf::Arena arena;
auto &runtime = *server->GetRuntime();
TAutoPtr<IEventHandle> handle;

THolder<NKqp::TEvKqp::TEvQueryRequest> request;
if (traceId) {
struct RequestCtx : NGRpcService::IRequestCtxMtSafe {
RequestCtx(NWilson::TTraceId &&traceId) : TraceId(std::move(traceId)) {}

NWilson::TTraceId GetWilsonTraceId() const override {
return TraceId.Clone();
}

TMaybe<TString> GetTraceId() const override {
return Nothing();
}

const TMaybe<TString> GetDatabaseName() const override {
return "";
}

const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const override {
return Ptr;
}

const TString& GetSerializedToken() const override {
return Token;
}

bool IsClientLost() const override {
return false;
};

virtual const google::protobuf::Message* GetRequest() const override {
return nullptr;
};

const TMaybe<TString> GetRequestType() const override {
return "_document_api_request";
};

void SetFinishAction(std::function<void()>&& cb) override {
Y_UNUSED(cb);
};

google::protobuf::Arena* GetArena() override {
return nullptr;
};

TIntrusiveConstPtr<NACLib::TUserToken> Ptr;
TString Token;
NWilson::TTraceId TraceId;
};

auto *txControl = google::protobuf::Arena::CreateMessage<Ydb::Table::TransactionControl>(&arena);
txControl->mutable_begin_tx()->mutable_serializable_read_write();
txControl->set_commit_tx(true);

auto ptr = std::make_shared<RequestCtx>(std::move(traceId));
request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
NKikimrKqp::QUERY_ACTION_EXECUTE,
NKikimrKqp::QUERY_TYPE_SQL_DML,
TActorId(),
ptr,
TString(), //sessionId
TString(sql),
TString(), //queryId
txControl, //tx_control
nullptr, //ydbParameters
Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED, //collectStats
nullptr, // query_cache_policy
nullptr //operationParams
);
} else {
request = MakeSQLRequest(sql, true);
}
runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr));
auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
}

class FakeWilsonUploader : public TActorBootstrapped<FakeWilsonUploader> {
public:
class Span {
Expand Down Expand Up @@ -232,7 +319,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);
Expand Down Expand Up @@ -292,15 +378,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
true,
Ydb::StatusIds::SUCCESS
);

ExecSQL(
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
true,
Ydb::StatusIds::SUCCESS
);

Expand All @@ -326,7 +410,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"SELECT * FROM `/Root/table-1` WHERE key = 1 OR key = 3 OR key = 5 OR key = 7 OR key = 9;",
true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);
Expand All @@ -342,23 +425,23 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor");
UNIT_ASSERT(lookupActorSpan);

auto dsReads = lookupActorSpan->get().FindAll("DataShard.Read"); // Lookup actor sends EvRead to each shard.
auto dsReads = lookupActorSpan->get().FindAll("Datashard.Read"); // Lookup actor sends EvRead to each shard.
UNIT_ASSERT_EQUAL(dsReads.size(), 2);

canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) "
", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (DataShard.Read "
", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)]) , (DataShard.Read "
"-> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) "
", (ReadIterator.ReadOperation)])])]) , (ComputeActor) , (RunTasks)])])";
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
"])])]) , (ComputeActor) , (RunTasks)])])";
} else {
auto deSpan = trace.Root.BFSFindOne("DataExecuter");
UNIT_ASSERT(deSpan);
Expand Down Expand Up @@ -419,15 +502,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
true,
Ydb::StatusIds::SUCCESS
);

ExecSQL(
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
true,
Ydb::StatusIds::SUCCESS
);

Expand All @@ -437,7 +518,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"SELECT * FROM `/Root/table-1`;",
true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);
Expand All @@ -451,17 +531,17 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
UNIT_ASSERT(readActorSpan);

auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard.
auto dsReads = readActorSpan->get().FindAll("Datashard.Read"); // Read actor sends EvRead to each shard.
UNIT_ASSERT_EQUAL(dsReads.size(), 2);

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , "
"(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
"]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])";
"[(Tablet.WriteLog.LogEntry)])])])])])])])";
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Expand Down
7 changes: 2 additions & 5 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1796,14 +1796,11 @@ void ExecSQL(Tests::TServer::TPtr server,
TActorId sender,
const TString &sql,
bool dml,
Ydb::StatusIds::StatusCode code,
NWilson::TTraceId traceId)
Ydb::StatusIds::StatusCode code)
{
auto &runtime = *server->GetRuntime();
TAutoPtr<IEventHandle> handle;

auto request = MakeSQLRequest(sql, dml);
runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr, std::move(traceId)));
runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr));
auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,7 @@ void ExecSQL(Tests::TServer::TPtr server,
TActorId sender,
const TString &sql,
bool dml = true,
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS,
NWilson::TTraceId traceId = {});
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);

NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
Expand Down