Skip to content

Commit 7d7c53e

Browse files
committed
feat(data_integrity_trails): add locks to logs
1 parent e464f3d commit 7d7c53e

File tree

3 files changed

+107
-51
lines changed

3 files changed

+107
-51
lines changed

ydb/core/kqp/common/kqp_data_integrity_trails.h

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include <library/cpp/string_utils/base64/base64.h>
66

77
#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
8+
#include <ydb/core/tx/data_events/events.h>
9+
#include <ydb/core/tx/datashard/datashard.h>
810

911
namespace NKikimr {
1012
namespace NDataIntegrity {
@@ -96,23 +98,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction
9698
}
9799

98100
// DataExecuter
99-
inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
100-
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) {
101+
inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
102+
auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) {
101103
TStringStream ss;
102104
LogKeyValue("Component", "Executer", ss);
105+
LogKeyValue("Type", "Request", ss);
103106
LogKeyValue("TraceId", traceId, ss);
104107
LogKeyValue("PhyTxId", ToString(txId), ss);
108+
LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss);
105109

106110
if (shardId) {
107111
LogKeyValue("ShardId", ToString(*shardId), ss);
108112
}
109113

110-
LogKeyValue("Type", type, ss, /*last*/ true);
114+
LogKeyValue("TxType", type, ss, /*last*/ true);
115+
116+
return ss.Str();
117+
};
118+
119+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId));
120+
}
121+
122+
inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) {
123+
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
124+
const auto& record = ev->Get()->Record;
125+
126+
TStringStream ss;
127+
LogKeyValue("Component", "Executer", ss);
128+
LogKeyValue("Type", "Response", ss);
129+
LogKeyValue("State", state, ss);
130+
LogKeyValue("TraceId", traceId, ss);
131+
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
132+
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);
133+
134+
TStringBuilder locksDebugStr;
135+
locksDebugStr << "[";
136+
for (const auto& lock : record.GetTxLocks()) {
137+
locksDebugStr << lock.ShortDebugString();
138+
}
139+
locksDebugStr << "]";
140+
141+
LogKeyValue("Locks", locksDebugStr, ss);
142+
LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss);
143+
144+
NYql::TIssues issues;
145+
NYql::IssuesFromMessage(record.GetIssues(), issues);
146+
LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true);
147+
148+
return ss.Str();
149+
};
150+
151+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
152+
}
153+
154+
inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) {
155+
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
156+
const auto& record = ev->Get()->Record;
157+
158+
TStringStream ss;
159+
LogKeyValue("Component", "Executer", ss);
160+
LogKeyValue("Type", "Response", ss);
161+
LogKeyValue("State", state, ss);
162+
LogKeyValue("TraceId", traceId, ss);
163+
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
164+
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);
165+
166+
TStringBuilder locksDebugStr;
167+
locksDebugStr << "[";
168+
for (const auto& lock : record.GetTxLocks()) {
169+
locksDebugStr << lock.ShortDebugString();
170+
}
171+
locksDebugStr << "]";
172+
173+
LogKeyValue("Locks", locksDebugStr, ss);
174+
LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss);
175+
LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true);
176+
177+
return ss.Str();
178+
};
179+
180+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
181+
}
182+
183+
template <typename TActorResultInfo>
184+
inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) {
185+
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) {
186+
TStringStream ss;
187+
LogKeyValue("Component", "Executer", ss);
188+
LogKeyValue("Type", type, ss);
189+
LogKeyValue("TraceId", traceId, ss);
190+
LogKeyValue("PhyTxId", ToString(txId), ss);
191+
192+
TStringBuilder locksDebugStr;
193+
locksDebugStr << "[";
194+
for (const auto& lock : info.GetLocks()) {
195+
locksDebugStr << lock.ShortDebugString();
196+
}
197+
locksDebugStr << "]";
198+
199+
LogKeyValue("Locks", locksDebugStr, ss);
111200

112201
return ss.Str();
113202
};
114203

115-
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
204+
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info));
116205
}
117206

118207
}

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
232232
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
233233
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
234234
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
235+
NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
235236
for (auto& lock : info.GetLocks()) {
236237
if (!TxManager) {
237238
Locks.push_back(lock);
@@ -250,6 +251,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
250251
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
251252
NKikimrKqp::TEvKqpOutputActorResultInfo info;
252253
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
254+
NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
253255
for (auto& lock : info.GetLocks()) {
254256
if (!TxManager) {
255257
Locks.push_back(lock);
@@ -501,6 +503,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
501503
TShardState* shardState = ShardStates.FindPtr(shardId);
502504
YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId);
503505

506+
NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
504507
LOG_D("Got propose result, shard: " << shardId << ", status: "
505508
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
506509
<< ", error: " << res->GetError());
@@ -570,6 +573,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
570573
NYql::TIssues issues;
571574
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
572575

576+
NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
573577
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
574578
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
575579
<< ", TxId=" << ev->Get()->Record.GetTxId()
@@ -1100,7 +1104,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11001104
transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
11011105
}
11021106

1103-
NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
1107+
NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
11041108

11051109
LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards");
11061110
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
@@ -1241,6 +1245,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12411245
NYql::TIssues issues;
12421246
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
12431247

1248+
NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
12441249
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
12451250
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
12461251
<< ", TxId=" << ev->Get()->Record.GetTxId()
@@ -1310,6 +1315,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
13101315
TShardState* shardState = ShardStates.FindPtr(shardId);
13111316
YQL_ENSURE(shardState);
13121317

1318+
NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
13131319
LOG_D("Got propose result, shard: " << shardId << ", status: "
13141320
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
13151321
<< ", error: " << res->GetError());
@@ -1793,7 +1799,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
17931799
flags));
17941800
}
17951801

1796-
NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
1802+
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
1803+
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
17971804

17981805
ResponseEv->Orbit.Fork(evData->Orbit);
17991806
ev = std::move(evData);
@@ -1829,7 +1836,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18291836

18301837
auto traceId = ExecuterSpan.GetTraceId();
18311838

1832-
NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
1839+
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
1840+
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
18331841

18341842
auto shardsToString = [](const auto& shards) {
18351843
TStringBuilder builder;

ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
4141
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
4242

4343
// check executer logs
44-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0);
44+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
4545
// check session actor logs
4646
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0);
4747
// check grpc logs
@@ -72,14 +72,12 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
7272
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
7373
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
7474

75-
// check executer logs
76-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 2);
7775
// check session actor logs
7876
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
7977
// check grpc logs
8078
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
8179
// check datashard logs
82-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 4);
80+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
8381
}
8482

8583
Y_UNIT_TEST(Ddl) {
@@ -129,53 +127,14 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
129127
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
130128

131129
// check executer logs (should be empty, because executer only logs modification operations)
132-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0);
130+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
133131
// check session actor logs
134132
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
135133
// check grpc logs
136134
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
137135
// check datashard logs (should be empty, because DataShard only logs modification operations)
138136
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
139137
}
140-
141-
Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
142-
TKikimrSettings serverSettings;
143-
TStringStream ss;
144-
serverSettings.LogStream = &ss;
145-
TKikimrRunner kikimr(serverSettings);
146-
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
147-
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
148-
149-
150-
const auto query = R"(
151-
--!syntax_v1
152-
153-
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
154-
(3u, "Value3"),
155-
(101u, "Value101"),
156-
(201u, "Value201");
157-
)";
158-
159-
if (Streaming) {
160-
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
161-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
162-
CollectStreamResult(result);
163-
} else {
164-
auto result = client.ExecuteYqlScript(query).GetValueSync();
165-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
166-
}
167-
168-
// check executer logs
169-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
170-
// check session actor logs (should contain double logs because this query was executed via worker actor)
171-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4);
172-
// check grpc logs
173-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
174-
// check datashard logs
175-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
176-
177-
Cout << ss.Str() << Endl;
178-
}
179138
}
180139

181140
} // namespace NKqp

0 commit comments

Comments
 (0)