Skip to content

Commit 9eea4e3

Browse files
feat(data_integrity_trails): add logging of acquired and broken locks (#13164)
1 parent 18f45b9 commit 9eea4e3

11 files changed

+282
-47
lines changed

ydb/core/kqp/common/kqp_data_integrity_trails.h

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <library/cpp/string_utils/base64/base64.h>
77

88
#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
9+
#include <ydb/core/tx/data_events/events.h>
10+
#include <ydb/core/tx/datashard/datashard.h>
911

1012
namespace NKikimr {
1113
namespace NDataIntegrity {
@@ -97,23 +99,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction
9799
}
98100

99101
// DataExecuter
100-
inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
101-
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) {
102+
inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
103+
auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) {
102104
TStringStream ss;
103105
LogKeyValue("Component", "Executer", ss);
106+
LogKeyValue("Type", "Request", ss);
104107
LogKeyValue("TraceId", traceId, ss);
105108
LogKeyValue("PhyTxId", ToString(txId), ss);
109+
LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss);
106110

107111
if (shardId) {
108112
LogKeyValue("ShardId", ToString(*shardId), ss);
109113
}
110114

111-
LogKeyValue("Type", type, ss, /*last*/ true);
115+
LogKeyValue("TxType", type, ss, /*last*/ true);
116+
117+
return ss.Str();
118+
};
119+
120+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId));
121+
}
122+
123+
inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) {
124+
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
125+
const auto& record = ev->Get()->Record;
126+
127+
TStringStream ss;
128+
LogKeyValue("Component", "Executer", ss);
129+
LogKeyValue("Type", "Response", ss);
130+
LogKeyValue("State", state, ss);
131+
LogKeyValue("TraceId", traceId, ss);
132+
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
133+
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);
134+
135+
TStringBuilder locksDebugStr;
136+
locksDebugStr << "[";
137+
for (const auto& lock : record.GetTxLocks()) {
138+
locksDebugStr << lock.ShortDebugString() << " ";
139+
}
140+
locksDebugStr << "]";
141+
142+
LogKeyValue("Locks", locksDebugStr, ss);
143+
LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss);
144+
145+
NYql::TIssues issues;
146+
NYql::IssuesFromMessage(record.GetIssues(), issues);
147+
LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true);
148+
149+
return ss.Str();
150+
};
151+
152+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
153+
}
154+
155+
inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) {
156+
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
157+
const auto& record = ev->Get()->Record;
158+
159+
TStringStream ss;
160+
LogKeyValue("Component", "Executer", ss);
161+
LogKeyValue("Type", "Response", ss);
162+
LogKeyValue("State", state, ss);
163+
LogKeyValue("TraceId", traceId, ss);
164+
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
165+
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);
166+
167+
TStringBuilder locksDebugStr;
168+
locksDebugStr << "[";
169+
for (const auto& lock : record.GetTxLocks()) {
170+
locksDebugStr << lock.ShortDebugString() << " ";
171+
}
172+
locksDebugStr << "]";
173+
174+
LogKeyValue("Locks", locksDebugStr, ss);
175+
LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss);
176+
LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true);
177+
178+
return ss.Str();
179+
};
180+
181+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
182+
}
183+
184+
template <typename TActorResultInfo>
185+
inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) {
186+
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) {
187+
TStringStream ss;
188+
LogKeyValue("Component", "Executer", ss);
189+
LogKeyValue("Type", type, ss);
190+
LogKeyValue("TraceId", traceId, ss);
191+
LogKeyValue("PhyTxId", ToString(txId), ss);
192+
193+
TStringBuilder locksDebugStr;
194+
locksDebugStr << "[";
195+
for (const auto& lock : info.GetLocks()) {
196+
locksDebugStr << lock.ShortDebugString() << " ";
197+
}
198+
locksDebugStr << "]";
199+
200+
LogKeyValue("Locks", locksDebugStr, ss);
112201

113202
return ss.Str();
114203
};
115204

116-
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
205+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info));
117206
}
118207

119208
// WriteActor,BufferActor

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
198198
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
199199
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
200200
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
201+
NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
201202
for (auto& lock : info.GetLocks()) {
202203
if (!TxManager) {
203204
Locks.push_back(lock);
@@ -216,6 +217,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
216217
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
217218
NKikimrKqp::TEvKqpOutputActorResultInfo info;
218219
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
220+
NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
219221
for (auto& lock : info.GetLocks()) {
220222
if (!TxManager) {
221223
Locks.push_back(lock);
@@ -506,6 +508,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
506508
TShardState* shardState = ShardStates.FindPtr(shardId);
507509
YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId);
508510

511+
NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
509512
LOG_D("Got propose result, shard: " << shardId << ", status: "
510513
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
511514
<< ", error: " << res->GetError());
@@ -575,6 +578,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
575578
NYql::TIssues issues;
576579
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
577580

581+
NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
578582
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
579583
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
580584
<< ", TxId=" << ev->Get()->Record.GetTxId()
@@ -1105,7 +1109,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11051109
transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
11061110
}
11071111

1108-
NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
1112+
NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
11091113

11101114
LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards");
11111115
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
@@ -1246,6 +1250,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12461250
NYql::TIssues issues;
12471251
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
12481252

1253+
NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
12491254
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
12501255
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
12511256
<< ", TxId=" << ev->Get()->Record.GetTxId()
@@ -1317,6 +1322,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
13171322
TShardState* shardState = ShardStates.FindPtr(shardId);
13181323
YQL_ENSURE(shardState);
13191324

1325+
NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
13201326
LOG_D("Got propose result, shard: " << shardId << ", status: "
13211327
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
13221328
<< ", error: " << res->GetError());
@@ -1807,7 +1813,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18071813
flags));
18081814
}
18091815

1810-
NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
1816+
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
1817+
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
18111818

18121819
ResponseEv->Orbit.Fork(evData->Orbit);
18131820
ev = std::move(evData);
@@ -1843,7 +1850,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18431850

18441851
auto traceId = ExecuterSpan.GetTraceId();
18451852

1846-
NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
1853+
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
1854+
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
18471855

18481856
auto shardsToString = [](const auto& shards) {
18491857
TStringBuilder builder;

0 commit comments

Comments
 (0)