Skip to content

Commit 68c8f3f

Browse files
authored
Fix locks tests (#8733)
1 parent e546733 commit 68c8f3f

File tree

9 files changed

+165
-52
lines changed

9 files changed

+165
-52
lines changed

ydb/core/grpc_services/query/rpc_kqp_tx.cpp

+8-5
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class TBeginTransactionRPC : public TActorBootstrapped<TBeginTransactionRPC> {
124124
if (kqpResponse.HasTxMeta()) {
125125
beginTxResult->mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
126126
}
127+
*beginTxResult->mutable_issues() = issueMessage;
127128
}
128129

129130
Reply(record.GetYdbStatus(), beginTxResult);
@@ -168,7 +169,7 @@ class TFinishTransactionRPC : public TActorBootstrapped<TFinishTransactionRPC>
168169
private:
169170
virtual std::pair<TString, TString> GetReqData() const = 0;
170171
virtual void Fill(NKikimrKqp::TQueryRequest* req) const = 0;
171-
virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const = 0;
172+
virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const = 0;
172173

173174
void StateWork(TAutoPtr<IEventHandle>& ev) {
174175
try {
@@ -218,15 +219,15 @@ class TFinishTransactionRPC : public TActorBootstrapped<TFinishTransactionRPC>
218219
const auto& record = ev->Get()->Record.GetRef();
219220
FillCommonKqpRespFields(record, Request.get());
220221

222+
NYql::TIssues issues;
221223
if (record.HasResponse()) {
222224
const auto& kqpResponse = record.GetResponse();
223225
const auto& issueMessage = kqpResponse.GetQueryIssues();
224-
NYql::TIssues issues;
225226
NYql::IssuesFromMessage(issueMessage, issues);
226227
Request->RaiseIssues(issues);
227228
}
228229

229-
Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus()));
230+
Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus(), issues));
230231
}
231232

232233
void InternalError(const TString& message) {
@@ -271,9 +272,10 @@ class TCommitTransactionRPC : public TFinishTransactionRPC {
271272
req->MutableTxControl()->set_commit_tx(true);
272273
}
273274

274-
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override {
275+
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override {
275276
auto result = TEvCommitTransactionRequest::AllocateResult<Ydb::Query::CommitTransactionResponse>(Request);
276277
result->set_status(status);
278+
NYql::IssuesToMessage(issues, result->mutable_issues());
277279
return result;
278280
}
279281
};
@@ -293,9 +295,10 @@ class TRollbackTransactionRPC : public TFinishTransactionRPC {
293295
req->SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX);
294296
}
295297

296-
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override {
298+
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override {
297299
auto result = TEvRollbackTransactionRequest::AllocateResult<Ydb::Query::RollbackTransactionResponse>(Request);
298300
result->set_status(status);
301+
NYql::IssuesToMessage(issues, result->mutable_issues());
299302
return result;
300303
}
301304
};

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,25 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
125125
}
126126
AFL_ENSURE(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender);
127127

128+
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
129+
("Recv TEvScanData from ShardID=", ev->Sender)
130+
("ScanId", ev->Get()->ScanId)
131+
("Finished", ev->Get()->Finished)
132+
("Lock", [&]() {
133+
TStringBuilder builder;
134+
for (const auto& lock : ev->Get()->LocksInfo.Locks) {
135+
builder << lock.ShortDebugString();
136+
}
137+
return builder;
138+
}())
139+
("BrokenLocks", [&]() {
140+
TStringBuilder builder;
141+
for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
142+
builder << lock.ShortDebugString();
143+
}
144+
return builder;
145+
}());
146+
128147
TInstant startTime = TActivationContext::Now();
129148
if (ev->Get()->Finished) {
130149
state->State = EShardState::PostRunning;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+42-15
Original file line numberDiff line numberDiff line change
@@ -462,10 +462,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
462462
NYql::TIssues issues;
463463
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
464464

465-
LOG_D("Recv EvWriteResult from ShardID=" << shardId
465+
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
466466
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
467467
<< ", TxId=" << ev->Get()->Record.GetTxId()
468-
<< ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
468+
<< ", Locks= " << [&]() {
469+
TStringBuilder builder;
470+
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
471+
builder << lock.ShortDebugString();
472+
}
473+
return builder;
474+
}()
469475
<< ", Cookie=" << ev->Cookie
470476
<< ", error=" << issues.ToString());
471477

@@ -486,6 +492,18 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
486492
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED: {
487493
YQL_ENSURE(false);
488494
}
495+
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
496+
LOG_D("Broken locks: " << res->Record.DebugString());
497+
YQL_ENSURE(shardState->State == TShardState::EState::Preparing);
498+
Counters->TxProxyMon->TxResultAborted->Inc();
499+
LocksBroken = true;
500+
501+
YQL_ENSURE(!res->Record.GetTxLocks().empty());
502+
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
503+
res->Record.GetTxLocks(0).GetSchemeShard(),
504+
res->Record.GetTxLocks(0).GetPathId());
505+
ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
506+
}
489507
default:
490508
{
491509
return ShardError(res->Record);
@@ -863,6 +881,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
863881
return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, issues);
864882
}
865883
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
884+
issues.AddIssue(NYql::YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated."));
866885
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
867886
}
868887
}
@@ -923,6 +942,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
923942
}
924943

925944
void ExecutePlanned() {
945+
YQL_ENSURE(!LocksBroken);
926946
YQL_ENSURE(TxCoordinator);
927947
auto ev = MakeHolder<TEvTxProxy::TEvProposeTransaction>();
928948
ev->Record.SetCoordinatorID(TxCoordinator);
@@ -1133,10 +1153,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11331153
NYql::TIssues issues;
11341154
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
11351155

1136-
LOG_D("Recv EvWriteResult from ShardID=" << shardId
1156+
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
11371157
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
11381158
<< ", TxId=" << ev->Get()->Record.GetTxId()
1139-
<< ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
1159+
<< ", Locks= " << [&]() {
1160+
TStringBuilder builder;
1161+
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
1162+
builder << lock.ShortDebugString();
1163+
}
1164+
return builder;
1165+
}()
11401166
<< ", Cookie=" << ev->Cookie
11411167
<< ", error=" << issues.ToString());
11421168

@@ -1167,16 +1193,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11671193
shardState->State = TShardState::EState::Finished;
11681194
Counters->TxProxyMon->TxResultAborted->Inc();
11691195
LocksBroken = true;
1170-
1171-
if (!res->Record.GetTxLocks().empty()) {
1172-
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
1173-
res->Record.GetTxLocks(0).GetSchemeShard(),
1174-
res->Record.GetTxLocks(0).GetPathId());
1175-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
1176-
}
1177-
1178-
CheckExecutionComplete();
1179-
return;
1196+
YQL_ENSURE(!res->Record.GetTxLocks().empty());
1197+
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
1198+
res->Record.GetTxLocks(0).GetSchemeShard(),
1199+
res->Record.GetTxLocks(0).GetPathId());
1200+
ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
11801201
}
11811202
default:
11821203
{
@@ -1722,7 +1743,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
17221743
<< ", LocksOp=" << NKikimrDataEvents::TKqpLocks::ELocksOp_Name(evWriteTransaction->Record.GetLocks().GetOp())
17231744
<< ", SendingShards=" << shardsToString(evWriteTransaction->Record.GetLocks().GetSendingShards())
17241745
<< ", ReceivingShards=" << shardsToString(evWriteTransaction->Record.GetLocks().GetReceivingShards())
1725-
<< ", LocksCount= " << evWriteTransaction->Record.GetLocks().LocksSize());
1746+
<< ", Locks= " << [&]() {
1747+
TStringBuilder builder;
1748+
for (const auto& lock : evWriteTransaction->Record.GetLocks().GetLocks()) {
1749+
builder << lock.ShortDebugString();
1750+
}
1751+
return builder;
1752+
}());
17261753

17271754
LOG_D("ExecuteEvWriteTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
17281755

ydb/core/kqp/runtime/kqp_read_actor.cpp

+20
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,26 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
914914
return;
915915
}
916916

917+
CA_LOG_D("Recv TEvReadResult from ShardID=" << Reads[id].Shard->TabletId
918+
<< ", ReadId=" << id
919+
<< ", Status=" << Ydb::StatusIds::StatusCode_Name(record.GetStatus().GetCode())
920+
<< ", Finished=" << record.GetFinished()
921+
<< ", RowCount=" << record.GetRowCount()
922+
<< ", TxLocks= " << [&]() {
923+
TStringBuilder builder;
924+
for (const auto& lock : record.GetTxLocks()) {
925+
builder << lock.ShortDebugString();
926+
}
927+
return builder;
928+
}()
929+
<< ", BrokenTxLocks= " << [&]() {
930+
TStringBuilder builder;
931+
for (const auto& lock : record.GetBrokenTxLocks()) {
932+
builder << lock.ShortDebugString();
933+
}
934+
return builder;
935+
}());
936+
917937
if (!record.HasNodeId()) {
918938
Counters->ReadActorAbsentNodeId->Inc();
919939
} else if (record.GetNodeId() != SelfId().NodeId()) {

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

+21-2
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
277277
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
278278
const auto& record = ev->Get()->Record;
279279

280-
CA_LOG_D("TEvReadResult was received for table: " << StreamLookupWorker->GetTablePath() <<
281-
", readId: " << record.GetReadId() << ", finished: " << record.GetFinished());
282280

283281
auto readIt = Reads.find(record.GetReadId());
284282
if (readIt == Reads.end() || readIt->second.State != EReadState::Running) {
@@ -288,6 +286,27 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
288286

289287
auto& read = readIt->second;
290288

289+
CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId
290+
<< ", Table = " << StreamLookupWorker->GetTablePath()
291+
<< ", ReadId=" << record.GetReadId()
292+
<< ", Status=" << Ydb::StatusIds::StatusCode_Name(record.GetStatus().GetCode())
293+
<< ", Finished=" << record.GetFinished()
294+
<< ", RowCount=" << record.GetRowCount()
295+
<< ", TxLocks= " << [&]() {
296+
TStringBuilder builder;
297+
for (const auto& lock : record.GetTxLocks()) {
298+
builder << lock.ShortDebugString();
299+
}
300+
return builder;
301+
}()
302+
<< ", BrokenTxLocks= " << [&]() {
303+
TStringBuilder builder;
304+
for (const auto& lock : record.GetBrokenTxLocks()) {
305+
builder << lock.ShortDebugString();
306+
}
307+
return builder;
308+
}());
309+
291310
for (auto& lock : record.GetBrokenTxLocks()) {
292311
BrokenLocks.push_back(lock);
293312
}

ydb/core/kqp/runtime/kqp_write_actor.cpp

+21-3
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
374374
CA_LOG_D("Recv EvWriteResult from ShardID=" << ev->Get()->Record.GetOrigin()
375375
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
376376
<< ", TxId=" << ev->Get()->Record.GetTxId()
377-
<< ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
377+
<< ", Locks= " << [&]() {
378+
TStringBuilder builder;
379+
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
380+
builder << lock.ShortDebugString();
381+
}
382+
return builder;
383+
}()
378384
<< ", Cookie=" << ev->Cookie);
379385

380386
switch (ev->Get()->GetStatus()) {
@@ -526,7 +532,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
526532
CA_LOG_D("Got completed result TxId=" << ev->Get()->Record.GetTxId()
527533
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
528534
<< ", Cookie=" << ev->Cookie
529-
<< ", LocksCount=" << ev->Get()->Record.GetTxLocks().size());
535+
<< ", Locks=" << [&]() {
536+
TStringBuilder builder;
537+
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
538+
builder << lock.ShortDebugString();
539+
}
540+
return builder;
541+
}());
530542

531543
OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie);
532544

@@ -625,7 +637,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
625637
CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << evWrite->Record.GetTxId()
626638
<< ", TxMode=" << evWrite->Record.GetTxMode()
627639
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
628-
<< ", LocksCount= " << evWrite->Record.GetLocks().LocksSize()
640+
<< ", Locks= " << [&]() {
641+
TStringBuilder builder;
642+
for (const auto& lock : evWrite->Record.GetLocks().GetLocks()) {
643+
builder << lock.ShortDebugString();
644+
}
645+
return builder;
646+
}()
629647
<< ", Size=" << serializationResult.TotalDataSize << ", Cookie=" << metadata->Cookie
630648
<< ", OperationsCount=" << metadata->OperationsCount << ", IsFinal=" << metadata->IsFinal
631649
<< ", Attempts=" << metadata->SendAttempts);

ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp

+25-21
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
4343
)"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
4444
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
4545
result.GetIssues().PrintTo(Cerr);
46-
// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
47-
// [] (const NYql::TIssue& issue) {
48-
// return issue.GetMessage().Contains("/Root/Test");
49-
// }));
46+
if (!GetIsOlap()) {
47+
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
48+
[] (const NYql::TIssue& issue) {
49+
return issue.GetMessage().Contains("/Root/Test");
50+
}), result.GetIssues().ToString());
51+
}
5052

5153
result = session2.ExecuteQuery(Q_(R"(
5254
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
@@ -96,13 +98,13 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
9698
auto commitResult = tx1->Commit().GetValueSync();
9799
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
98100
commitResult.GetIssues().PrintTo(Cerr);
99-
// TODO:
100-
//UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
101-
// [] (const NYql::TIssue& issue) {
102-
// Y_UNUSED(issue);
103-
// return issue.GetMessage().Contains("/Root/Test");
104-
// return true;
105-
// }), commitResult.GetIssues().ToString());
101+
UNIT_ASSERT_C(commitResult.GetIssues().Size() != 0, commitResult.GetIssues().ToString());
102+
if (!GetIsOlap()) {
103+
UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
104+
[] (const NYql::TIssue& issue) {
105+
return issue.GetMessage().Contains("/Root/Test");
106+
}), commitResult.GetIssues().ToString());
107+
}
106108

107109
result = session2.ExecuteQuery(Q_(R"(
108110
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
@@ -196,10 +198,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
196198
)"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
197199
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
198200
result.GetIssues().PrintTo(Cerr);
199-
//UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
200-
// [] (const NYql::TIssue& issue) {
201-
// return issue.GetMessage().Contains("/Root/Test");
202-
// }));
201+
if (!GetIsOlap()) {
202+
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
203+
[] (const NYql::TIssue& issue) {
204+
return issue.GetMessage().Contains("/Root/Test");
205+
}), result.GetIssues().ToString());
206+
}
203207

204208
result = session1.ExecuteQuery(Q1_(R"(
205209
SELECT * FROM Test WHERE Group = 11;
@@ -255,12 +259,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
255259
)"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
256260
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
257261
result.GetIssues().PrintTo(Cerr);
258-
// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));
259-
260-
// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
261-
// [] (const NYql::TIssue& issue) {
262-
// return issue.GetMessage().Contains("/Root/Test");
263-
// }));
262+
if (!GetIsOlap()) {
263+
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
264+
[] (const NYql::TIssue& issue) {
265+
return issue.GetMessage().Contains("/Root/Test");
266+
}), result.GetIssues().ToString());
267+
}
264268

265269
result = session1.ExecuteQuery(Q1_(R"(
266270
SELECT * FROM Test WHERE Group = 11;

ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,9 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) {
227227
)"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync();
228228

229229
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
230-
// UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString());
230+
if (!GetIsOlap()) {
231+
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString());
232+
}
231233
}
232234
};
233235

0 commit comments

Comments
 (0)