Skip to content

KIKIMR-20572: add tracing to read actors #841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op
limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit * 2, limit);

auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory,
AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
AppData()->FunctionRegistry, settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());

if (optimizeProtoForLocalExecution) {
TVector<google::protobuf::Message*>& taskSourceSettings = static_cast<TKqpComputeActor*>(computeActor)->MutableTaskSourceSettings();
Expand Down
23 changes: 20 additions & 3 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <library/cpp/threading/hot_swap/hot_swap.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/wilson_ids/wilson.h>

#include <util/generic/intrlist.h>

Expand Down Expand Up @@ -399,6 +400,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
, Counters(counters)
, UseFollowers(false)
, PipeCacheId(MainPipeCacheId)
, ReadActorSpan(TWilsonKqp::ReadActor, NWilson::TTraceId(args.TraceId), "ReadActor")
{
Y_ABORT_UNLESS(Arena);
Y_ABORT_UNLESS(settings->GetArena() == Arena->Get());
Expand Down Expand Up @@ -569,6 +571,9 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
ResolveShards[ResolveShardId] = state;
ResolveShardId += 1;

ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}
Expand Down Expand Up @@ -617,9 +622,13 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
}
}

ReadActorStateSpan.EndError(error);

return RuntimeError(error, statusCode);
}

ReadActorStateSpan.EndOk();

auto keyDesc = std::move(request->ResultSet[0].KeyDescription);

if (keyDesc->GetPartitions().size() == 1) {
Expand Down Expand Up @@ -896,10 +905,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
Counters->CreatedIterators->Inc();
ReadIdByTabletId[state->TabletId].push_back(id);

NWilson::TTraceId traceId; // TODO: get traceId from kqp.

Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery, 0, std::move(traceId));
IEventHandle::FlagTrackDelivery, 0, ReadActorSpan.GetTraceId());

if (!FirstShardStarted) {
state->IsFirst = true;
Expand Down Expand Up @@ -1385,6 +1392,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
}
}
TBase::PassAway();

ReadActorSpan.End();
}

void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) {
Expand All @@ -1395,6 +1404,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq

NYql::TIssues issues;
issues.AddIssue(std::move(issue));

if (ReadActorSpan) {
ReadActorSpan.EndError(issues.ToOneLineString());
}

Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}

Expand Down Expand Up @@ -1491,6 +1505,9 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
size_t TotalRetries = 0;

bool FirstShardStarted = false;

NWilson::TSpan ReadActorSpan;
NWilson::TSpan ReadActorStateSpan;
};


Expand Down
71 changes: 43 additions & 28 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#include "kqp_stream_lookup_actor.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>

#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
#include <ydb/core/protos/kqp_stats.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
#include <ydb/library/wilson_ids/wilson.h>

namespace NKikimr {
namespace NKqp {
Expand All @@ -25,24 +26,22 @@ static constexpr ui64 MAX_SHARD_RETRIES = 10;

class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
public:
TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input,
const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
TKqpStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings,
TIntrusivePtr<TKqpCounters> counters)
: LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId)
, InputIndex(inputIndex)
, Input(input)
, ComputeActorId(computeActorId)
, TypeEnv(typeEnv)
, Alloc(alloc)
: LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << args.InputIndex << ", CA Id " << args.ComputeActorId)
, InputIndex(args.InputIndex)
, Input(args.TransformInput)
, ComputeActorId(args.ComputeActorId)
, TypeEnv(args.TypeEnv)
, Alloc(args.Alloc)
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc))
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
, Counters(counters)
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
{
IngressStats.Level = statsLevel;
IngressStats.Level = args.StatsLevel;
}

virtual ~TKqpStreamLookupActor() {
Expand Down Expand Up @@ -174,6 +173,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
TActorBootstrapped<TKqpStreamLookupActor>::PassAway();

LookupActorSpan.End();
}

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
Expand Down Expand Up @@ -234,10 +235,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath(), NYql::NDqProto::StatusIds::SCHEME_ERROR);
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath();
LookupActorStateSpan.EndError(errorMsg);

return RuntimeError(errorMsg, NYql::NDqProto::StatusIds::SCHEME_ERROR);
}

LookupActorStateSpan.EndOk();

auto& resultSet = ev->Get()->Request->ResultSet;
YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)");
Partitioning = resultSet[0].KeyDescription->Partitioning;
Expand Down Expand Up @@ -342,8 +348,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
<< " was resolved: " << !!Partitioning);

if (!Partitioning) {
RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath()
<< " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
TString errorMsg = TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath()
<< " (request timeout exceeded)";
LookupActorStateSpan.EndError(errorMsg);

RuntimeError(errorMsg, NYql::NDqProto::StatusIds::TIMEOUT);
}
}

Expand Down Expand Up @@ -392,7 +401,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);

Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true),
IEventHandle::FlagTrackDelivery);
IEventHandle::FlagTrackDelivery, 0, LookupActorSpan.GetTraceId());

read.State = EReadState::Running;

Expand Down Expand Up @@ -438,6 +447,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));

Counters->IteratorsShardResolve->Inc();
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));

Expand Down Expand Up @@ -467,6 +479,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

NYql::TIssues issues;
issues.AddIssue(std::move(issue));

if (LookupActorSpan) {
LookupActorSpan.EndError(issues.ToOneLineString());
}

Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}

Expand Down Expand Up @@ -495,17 +512,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
ui64 ReadBytesCount = 0;

TIntrusivePtr<TKqpCounters> Counters;
NWilson::TSpan LookupActorSpan;
NWilson::TSpan LookupActorStateSpan;
};

} // namespace

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) {
auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory,
alloc, inputDesc, std::move(settings), counters);
auto actor = new TKqpStreamLookupActor(std::move(args), std::move(settings), counters);
return {actor, actor};
}

Expand Down
5 changes: 1 addition & 4 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
namespace NKikimr {
namespace NKqp {

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>);

} // namespace NKqp
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ namespace NKqp {
void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {
factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings,
NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv,
args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters);
return CreateStreamLookupActor(std::move(args), std::move(settings), counters);
});
}

Expand Down
64 changes: 63 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
}

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
"(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
Expand All @@ -377,6 +377,68 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Y_UNIT_TEST(TestTraceDistributedSelectViaReadActors) {
auto [runtime, server, sender] = TestCreateServer();

CreateShardedTable(server, sender, "/Root", "table-1", 1, false);

FakeWilsonUploader* uploader = new FakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

SplitTable(runtime, server, 5);

ExecSQL(
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
true,
Ydb::StatusIds::SUCCESS
);

ExecSQL(
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
true,
Ydb::StatusIds::SUCCESS
);

NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);

ExecSQL(
server,
sender,
"SELECT * FROM `/Root/table-1`;",
true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);

uploader->BuildTraceTrees();

UNIT_ASSERT_EQUAL(1, uploader->Traces.size());

FakeWilsonUploader::Trace& trace = uploader->Traces.begin()->second;

auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
UNIT_ASSERT(readActorSpan);

auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard.
UNIT_ASSERT_EQUAL(dsReads.size(), 2);

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , "
"(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])";
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Y_UNIT_TEST(TestTraceWriteImmediateOnShard) {
auto [runtime, server, sender] = TestCreateServer();

Expand Down
6 changes: 6 additions & 0 deletions ydb/library/wilson_ids/wilson.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ namespace NKikimr {
ProposeTransaction = 9,

ComputeActor = 9,

ReadActor = 9,
ReadActorShardsResolve = 10,

LookupActor = 9,
LookupActorShardsResolve = 10,
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
IMemoryQuotaManager::TPtr MemoryQuotaManager;
const google::protobuf::Message* SourceSettings = nullptr; // used only in case if we execute compute actor locally
TIntrusivePtr<NActors::TProtoArenaHolder> Arena; // Arena for SourceSettings
NWilson::TTraceId TraceId;
};

struct TSinkArguments {
Expand Down Expand Up @@ -247,6 +248,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NWilson::TTraceId TraceId;
};

struct TOutputTransformArguments {
Expand Down
6 changes: 4 additions & 2 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr,
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
.Arena = Task.GetArena()
.Arena = Task.GetArena(),
.TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();
Expand Down Expand Up @@ -1623,7 +1624,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.ProgramBuilder = *transform.ProgramBuilder,
.Alloc = TaskRunner->GetAllocatorPtr()
.Alloc = TaskRunner->GetAllocatorPtr(),
.TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();
Expand Down