Skip to content

KIKIMR-20530 Datashard tracing enhancements #746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
<< ". " << "Send request to target, requestId: " << requestId << ", targetId: " << targetId);
auto status = timerDuration == cancelAfter ? NYql::NDqProto::StatusIds::CANCELLED : NYql::NDqProto::StatusIds::TIMEOUT;
StartQueryTimeout(requestId, timerDuration, status);
Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId, std::move(ev->TraceId));
}

void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) {
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
void MakeNewQueryState(TEvKqp::TEvQueryRequest::TPtr& ev) {
++QueryId;
YQL_ENSURE(!QueryState);
NWilson::TTraceId id;
if (false) { // change to enable Wilson tracing
id = NWilson::TTraceId::NewTraceId(15, 4095);
LOG_I("wilson tracing started, id: " + id.GetHexTraceId());
}
auto selfId = SelfId();
auto as = TActivationContext::ActorSystem();
ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
Settings.TableService, Settings.QueryService, std::move(id), SessionId,
Settings.TableService, Settings.QueryService, std::move(ev->TraceId), SessionId,
AppData()->MonotonicTimeProvider->Now());
if (QueryState->UserRequestContext->TraceId.empty()) {
QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString();
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/tablet_flat/flat_exec_seat.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ namespace NTabletFlatExecutor {

void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept;

NWilson::TSpan CreateExecutionSpan() noexcept {
return NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Execute");
}

void StartEnqueuedSpan() noexcept {
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued");
}
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,7 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons
Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation");

TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self);
seat->Self->SetupTxSpanName();

LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self));

Expand Down Expand Up @@ -1653,16 +1654,16 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
PrivatePageCache->ResetTouchesAndToLoad(true);
TPageCollectionTxEnv env(*Database, *PrivatePageCache);

TTransactionContext txc(*seat, Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId, seat->Self->TxSpan);
txc.NotEnoughMemory(seat->NotEnoughMemoryCount);

Database->Begin(Stamp(), env);

LWTRACK(TransactionExecuteBegin, seat->Self->Orbit, seat->UniqID);

NWilson::TSpan txExecuteSpan = seat->CreateExecutionSpan();
txc.StartExecutionSpan();
const bool done = seat->Self->Execute(txc, ctx.MakeFor(OwnerActorId));
txExecuteSpan.EndOk();
txc.FinishExecutionSpan();

LWTRACK(TransactionExecuteEnd, seat->Self->Orbit, seat->UniqID, done);

Expand Down
25 changes: 18 additions & 7 deletions ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,15 @@ class TTransactionContext : public TTxMemoryProviderBase {
friend class TExecutor;

public:
TTransactionContext(TSeat &seat, ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
ui64 memoryLimit, ui64 taskId)
TTransactionContext(ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
ui64 memoryLimit, ui64 taskId, NWilson::TSpan &transactionSpan)
: TTxMemoryProviderBase(memoryLimit, taskId)
, Seat(seat)
, Tablet(tablet)
, Generation(gen)
, Step(step)
, Env(env)
, DB(db)
, TransactionSpan(transactionSpan)
{}

~TTransactionContext() {}
Expand All @@ -228,13 +228,22 @@ class TTransactionContext : public TTxMemoryProviderBase {
return Rescheduled_;
}

void StartExecutionSpan() noexcept {
TransactionExecutionSpan = NWilson::TSpan(TWilsonTablet::Tablet, TransactionSpan.GetTraceId(), "Tablet.Transaction.Execute");
}

void FinishExecutionSpan() noexcept {
TransactionExecutionSpan.EndOk();
}

public:
TSeat& Seat;
const ui64 Tablet = Max<ui32>();
const ui32 Generation = Max<ui32>();
const ui32 Step = Max<ui32>();
IExecuting &Env;
NTable::TDatabase &DB;
NWilson::TSpan &TransactionSpan;
NWilson::TSpan TransactionExecutionSpan;

private:
bool Rescheduled_ = false;
Expand Down Expand Up @@ -281,9 +290,7 @@ class ITransaction : TNonCopyable {

ITransaction(NWilson::TTraceId &&traceId)
: TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction"))
{
TxSpan.Attribute("Type", TypeName(*this));
}
{ }

virtual ~ITransaction() = default;
/// @return true if execution complete and transaction is ready for commit
Expand All @@ -300,6 +307,10 @@ class ITransaction : TNonCopyable {
out << TypeName(*this);
}

virtual void SetupTxSpanName() noexcept {
TxSpan.Attribute("Type", TypeName(*this));
}

void SetupTxSpan(NWilson::TTraceId traceId) noexcept {
TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction");
TxSpan.Attribute("Type", TypeName(*this));
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/testlib/actors/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,22 @@ namespace NActors {
GrabEdgeEventRethrow<TEvents::TEvWakeup>(SleepEdgeActor);
}

void TTestActorRuntime::SendToPipe(ui64 tabletId, const TActorId& sender, IEventBase* payload, ui32 nodeIndex, const NKikimr::NTabletPipe::TClientConfig& pipeConfig, TActorId clientId, ui64 cookie) {
void TTestActorRuntime::SendToPipe(ui64 tabletId, const TActorId& sender, IEventBase* payload, ui32 nodeIndex, const NKikimr::NTabletPipe::TClientConfig& pipeConfig, TActorId clientId, ui64 cookie, NWilson::TTraceId traceId) {
bool newPipe = (clientId == TActorId());
if (newPipe) {
clientId = ConnectToPipe(tabletId, sender, nodeIndex, pipeConfig);
}

SendToPipe(clientId, sender, payload, nodeIndex, cookie);
SendToPipe(clientId, sender, payload, nodeIndex, cookie, std::move(traceId));

if (newPipe) {
ClosePipe(clientId, sender, nodeIndex);
}
}

void TTestActorRuntime::SendToPipe(TActorId clientId, const TActorId& sender, IEventBase* payload,
ui32 nodeIndex, ui64 cookie) {
auto pipeEv = new IEventHandle(clientId, sender, payload, 0, cookie);
ui32 nodeIndex, ui64 cookie, NWilson::TTraceId traceId) {
auto pipeEv = new IEventHandle(clientId, sender, payload, 0, cookie, nullptr, std::move(traceId));
pipeEv->Rewrite(NKikimr::TEvTabletPipe::EvSend, clientId);
Send(pipeEv, nodeIndex, true);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/testlib/actors/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ namespace NActors {
}

void SendToPipe(ui64 tabletId, const TActorId& sender, IEventBase* payload, ui32 nodeIndex = 0,
const NKikimr::NTabletPipe::TClientConfig& pipeConfig = NKikimr::NTabletPipe::TClientConfig(), TActorId clientId = TActorId(), ui64 cookie = 0);
const NKikimr::NTabletPipe::TClientConfig& pipeConfig = NKikimr::NTabletPipe::TClientConfig(), TActorId clientId = TActorId(), ui64 cookie = 0, NWilson::TTraceId traceId = {});
void SendToPipe(TActorId clientId, const TActorId& sender, IEventBase* payload,
ui32 nodeIndex = 0, ui64 cookie = 0);
ui32 nodeIndex = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {});
TActorId ConnectToPipe(ui64 tabletId, const TActorId& sender, ui32 nodeIndex, const NKikimr::NTabletPipe::TClientConfig& pipeConfig);
void ClosePipe(TActorId clientId, const TActorId& sender, ui32 nodeIndex);
void DisconnectNodes(ui32 fromNodeIndex, ui32 toNodeIndex, bool async = true);
Expand Down
20 changes: 16 additions & 4 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2818,7 +2818,10 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&
UpdateProposeQueueSize();
} else {
// Prepare planned transactions as soon as possible
Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
NWilson::TSpan datashardTransactionSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "Datashard.Transaction", NWilson::EFlags::AUTO_END);
datashardTransactionSpan.Attribute("Shard", std::to_string(TabletID()));

Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false, std::move(datashardTransactionSpan)), ctx);
}
}

Expand All @@ -2836,7 +2839,10 @@ void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, c
UpdateProposeQueueSize();
} else {
// Prepare planned transactions as soon as possible
Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
NWilson::TSpan datashardTransactionSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "Datashard.WriteTransaction", NWilson::EFlags::AUTO_END);
datashardTransactionSpan.Attribute("Shard", std::to_string(TabletID()));

Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false, std::move(datashardTransactionSpan)), ctx);
}
}

Expand Down Expand Up @@ -2901,12 +2907,18 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons
switch (item.Event->GetTypeRewrite()) {
case TEvDataShard::TEvProposeTransaction::EventType: {
auto event = IEventHandle::Downcast<TEvDataShard::TEvProposeTransaction>(std::move(item.Event));
Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
NWilson::TSpan datashardTransactionSpan(TWilsonTablet::Tablet, std::move(event->TraceId), "Datashard.Transaction", NWilson::EFlags::AUTO_END);
datashardTransactionSpan.Attribute("Shard", std::to_string(TabletID()));

Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true, std::move(datashardTransactionSpan)), ctx);
return;
}
case NEvents::TDataEvents::TEvWrite::EventType: {
auto event = IEventHandle::Downcast<NEvents::TDataEvents::TEvWrite>(std::move(item.Event));
Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
NWilson::TSpan datashardTransactionSpan(TWilsonTablet::Tablet, std::move(event->TraceId), "Datashard.WriteTransaction", NWilson::EFlags::AUTO_END);
datashardTransactionSpan.Attribute("Shard", std::to_string(TabletID()));

Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true, std::move(datashardTransactionSpan)), ctx);
return;
}
default:
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/tx/datashard/datashard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
return true;
}

NWilson::TSpan auxExecuteSpan;

if (!ActiveOp) {
const bool expireSnapshotsAllowed = (
Self->State == TShardState::Ready ||
Expand Down Expand Up @@ -66,7 +64,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
// it won't have a span, because we choose what operation to run in the transaction itself.
// We create transaction span and transaction execution spans here instead.
SetupTxSpan(ActiveOp->GetTraceId());
auxExecuteSpan = txc.Seat.CreateExecutionSpan();
txc.StartExecutionSpan();
}
}
}
Expand All @@ -81,7 +79,6 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
case EExecutionStatus::Restart:
// Restart even if current CompleteList is not empty
// It will be extended in subsequent iterations
auxExecuteSpan.EndOk();
return false;

case EExecutionStatus::Reschedule:
Expand Down Expand Up @@ -117,7 +114,6 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
}

// Commit all side effects
auxExecuteSpan.EndOk();
return true;
} catch (...) {
Y_ABORT("there must be no leaked exceptions");
Expand Down
28 changes: 9 additions & 19 deletions ydb/core/tx/datashard/datashard__propose_tx_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ namespace NDataShard {
TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *self,
TEvDataShard::TEvProposeTransaction::TPtr &&ev,
TInstant receivedAt, ui64 tieBreakerIndex,
bool delayed)
: TBase(self, std::move(ev->TraceId))
bool delayed,
NWilson::TSpan &&datashardTransactionSpan)
: TBase(self, datashardTransactionSpan.GetTraceId())
, Ev(std::move(ev))
, ReceivedAt(receivedAt)
, TieBreakerIndex(tieBreakerIndex)
, Kind(static_cast<EOperationKind>(Ev->Get()->GetTxKind()))
, TxId(Ev->Get()->GetTxId())
, Acked(!delayed)
, ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, TxSpan.GetTraceId(), "ProposeTransaction", NWilson::EFlags::AUTO_END)
{
ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID()));
}
, DatashardTransactionSpan(std::move(datashardTransactionSpan))
{ }

bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransactionContext &txc,
const TActorContext &ctx)
Expand Down Expand Up @@ -60,9 +59,7 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
TActorId target = Op ? Op->GetTarget() : Ev->Sender;
ui64 cookie = Op ? Op->GetCookie() : Ev->Cookie;

if (ProposeTransactionSpan) {
ProposeTransactionSpan.EndOk();
}
DatashardTransactionSpan.EndOk();
ctx.Send(target, result.Release(), 0, cookie);

return true;
Expand All @@ -76,17 +73,14 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
Ev = nullptr;
return true;
}

TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId());
TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, std::move(DatashardTransactionSpan));

// Unsuccessful operation parse.
if (op->IsAborted()) {
LWTRACK(ProposeTransactionParsed, op->Orbit, false);
Y_ABORT_UNLESS(op->Result());

if (ProposeTransactionSpan) {
ProposeTransactionSpan.EndError("Unsuccessful operation parse");
}
op->OperationSpan.EndError("Unsuccessful operation parse");
ctx.Send(op->GetTarget(), op->Result().Release());
return true;
}
Expand Down Expand Up @@ -166,10 +160,6 @@ void TDataShard::TTxProposeTransactionBase::Complete(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TTxProposeTransactionBase::Complete at " << Self->TabletID());

if (ProposeTransactionSpan) {
ProposeTransactionSpan.End();
}

if (Op) {
Y_ABORT_UNLESS(!Op->GetExecutionPlan().empty());
if (!CompleteList.empty()) {
Expand Down
30 changes: 12 additions & 18 deletions ydb/core/tx/datashard/datashard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ LWTRACE_USING(DATASHARD_PROVIDER)

namespace NKikimr::NDataShard {

TDataShard::TTxWrite::TTxWrite(TDataShard* self, NEvents::TDataEvents::TEvWrite::TPtr ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed)
: TBase(self, std::move(ev->TraceId))
TDataShard::TTxWrite::TTxWrite(TDataShard* self,
NEvents::TDataEvents::TEvWrite::TPtr ev,
TInstant receivedAt,
ui64 tieBreakerIndex,
bool delayed,
NWilson::TSpan &&datashardTransactionSpan)
: TBase(self, datashardTransactionSpan.GetTraceId())
, Ev(std::move(ev))
, ReceivedAt(receivedAt)
, TieBreakerIndex(tieBreakerIndex)
, TxId(Ev->Get()->GetTxId())
, Acked(!delayed)
, ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, TxSpan.GetTraceId(), "ProposeTransaction", NWilson::EFlags::AUTO_END)
{
ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID()));
}
, DatashardTransactionSpan(std::move(datashardTransactionSpan))
{ }

bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxWrite:: execute at tablet# " << Self->TabletID());
Expand Down Expand Up @@ -54,9 +57,7 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
TActorId target = Op ? Op->GetTarget() : Ev->Sender;
ui64 cookie = Op ? Op->GetCookie() : Ev->Cookie;

if (ProposeTransactionSpan) {
ProposeTransactionSpan.EndOk();
}
DatashardTransactionSpan.EndOk();
ctx.Send(target, result.release(), 0, cookie);

return true;
Expand All @@ -71,17 +72,14 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
return true;
}

TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId());
TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, std::move(DatashardTransactionSpan));
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);

// Unsuccessful operation parse.
if (op->IsAborted()) {
LWTRACK(ProposeTransactionParsed, op->Orbit, false);
Y_ABORT_UNLESS(writeOp->GetWriteResult());

if (ProposeTransactionSpan) {
ProposeTransactionSpan.EndError("TTxWrite:: unsuccessful operation parse");
}
op->OperationSpan.EndError("Unsuccessful operation parse");
ctx.Send(op->GetTarget(), writeOp->ReleaseWriteResult().release());
return true;
}
Expand Down Expand Up @@ -159,10 +157,6 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
void TDataShard::TTxWrite::Complete(const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxWrite complete: at tablet# " << Self->TabletID());

if (ProposeTransactionSpan) {
ProposeTransactionSpan.End();
}

if (Op) {
Y_ABORT_UNLESS(!Op->GetExecutionPlan().empty());
if (!CompleteList.empty()) {
Expand Down
Loading