Skip to content

Commit 0c5b698

Browse files
authored
24-2: Always show table name in locks broken error (#8799)
1 parent 7b489c6 commit 0c5b698

File tree

8 files changed

+75
-36
lines changed

8 files changed

+75
-36
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
238238

239239
void Finalize() {
240240
if (LocksBroken) {
241-
TString message = "Transaction locks invalidated.";
242-
243-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
244-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
241+
return ReplyErrorAndDie(
242+
Ydb::StatusIds::ABORTED,
243+
YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated. Unknown table."));
245244
}
246245

247246
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1100,29 +1099,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11001099
shardState->State = TShardState::EState::Finished;
11011100

11021101
Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?
1103-
11041102
LocksBroken = true;
11051103

1106-
TMaybe<TString> tableName;
11071104
if (!res->Record.GetTxLocks().empty()) {
1108-
auto& lock = res->Record.GetTxLocks(0);
1109-
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
1110-
auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
1111-
if (it != TasksGraph.GetStagesInfo().end()) {
1112-
tableName = it->second.Meta.TableConstInfo->Path;
1113-
}
1114-
}
1115-
1116-
// Reply as soon as we know which table had locks invalidated
1117-
if (tableName) {
1118-
auto message = TStringBuilder()
1119-
<< "Transaction locks invalidated. Table: " << *tableName;
1120-
1121-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
1122-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
1105+
ResponseEv->BrokenLockPathId = TKikimrPathId(
1106+
res->Record.GetTxLocks(0).GetSchemeShard(),
1107+
res->Record.GetTxLocks(0).GetPathId());
1108+
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
11231109
}
11241110

1125-
// Receive more replies from other shards
11261111
CheckExecutionComplete();
11271112
return;
11281113
}

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ struct TEvKqpExecuter {
2525

2626
NLWTrace::TOrbit Orbit;
2727
IKqpGateway::TKqpSnapshot Snapshot;
28+
std::optional<NYql::TKikimrPathId> BrokenLockPathId;
2829
ui64 ResultRowsCount = 0;
2930
ui64 ResultRowsBytes = 0;
3031

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1722,7 +1722,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17221722
auto& response = *ResponseEv->Record.MutableResponse();
17231723

17241724
response.SetStatus(status);
1725-
response.MutableIssues()->Swap(issues);
1725+
if (issues) {
1726+
response.MutableIssues()->Swap(issues);
1727+
}
17261728

17271729
LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
17281730
<< ", to ActorId: " << Target);

ydb/core/kqp/provider/yql_kikimr_provider.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
316316

317317
for (const auto& info : tableInfos) {
318318
tableInfoMap.emplace(info.GetTableName(), &info);
319-
320319
TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
321320
TableByIdMap.emplace(pathId, info.GetTableName());
322321
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12791279

12801280
// Invalidate query cache on scheme/internal errors
12811281
switch (status) {
1282+
case Ydb::StatusIds::ABORTED: {
1283+
if (ev->BrokenLockPathId) {
1284+
issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId));
1285+
}
1286+
break;
1287+
}
12821288
case Ydb::StatusIds::SCHEME_ERROR:
12831289
case Ydb::StatusIds::INTERNAL_ERROR:
12841290
InvalidateQuery();

ydb/core/kqp/session_actor/kqp_tx.cpp

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,33 @@ namespace NKqp {
55

66
using namespace NYql;
77

8-
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
8+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKikimrPathId& pathId) {
99
TStringBuilder message;
1010
message << "Transaction locks invalidated.";
1111

12-
TMaybe<TString> tableName;
13-
if (invalidatedLock) {
14-
TKikimrPathId id(invalidatedLock->GetSchemeShard(), invalidatedLock->GetPathId());
15-
auto table = txCtx.TableByIdMap.FindPtr(id);
16-
if (table) {
17-
tableName = *table;
12+
if (pathId.OwnerId() != 0) {
13+
auto table = txCtx.TableByIdMap.FindPtr(pathId);
14+
if (!table) {
15+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
1816
}
17+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
18+
} else {
19+
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
20+
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
21+
if (pathId.TableId() == pathId.TableId()) {
22+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
23+
}
24+
}
25+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
1926
}
27+
}
2028

21-
if (tableName) {
22-
message << " Table: " << *tableName;
23-
}
24-
25-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
29+
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpTxLock& invalidatedLock) {
30+
return GetLocksInvalidatedIssue(
31+
txCtx,
32+
TKikimrPathId(
33+
invalidatedLock.GetSchemeShard(),
34+
invalidatedLock.GetPathId()));
2635
}
2736

2837
std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,

ydb/core/kqp/session_actor/kqp_tx.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ class TTransactionsCache {
434434
}
435435
};
436436

437+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
437438
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
438439
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);
439440

ydb/core/kqp/ut/tx/kqp_locks_ut.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,42 @@ Y_UNIT_TEST_SUITE(KqpLocks) {
204204
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
205205
CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0)));
206206
}
207+
208+
Y_UNIT_TEST(TwoPhaseTx) {
209+
TKikimrRunner kikimr;
210+
auto db = kikimr.GetTableClient();
211+
212+
auto session1 = db.CreateSession().GetValueSync().GetSession();
213+
auto session2 = db.CreateSession().GetValueSync().GetSession();
214+
215+
auto result = session1.ExecuteDataQuery(Q_(R"(
216+
REPLACE INTO `/Root/Test` (Group, Name, Comment) VALUES (1U, "Paul", "Changed");
217+
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
218+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync();
219+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
220+
221+
auto tx1 = result.GetTransaction();
222+
UNIT_ASSERT(tx1);
223+
224+
result = session2.ExecuteDataQuery(Q_(R"(
225+
REPLACE INTO `/Root/Test` (Group, Name, Comment)
226+
VALUES (1U, "Paul", "Changed");
227+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
228+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
229+
230+
result = session1.ExecuteDataQuery(Q_(R"(
231+
SELECT * FROM `KeyValue`;
232+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
233+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
234+
235+
auto commitResult = tx1->Commit().GetValueSync();
236+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
237+
commitResult.GetIssues().PrintTo(Cerr);
238+
UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
239+
[] (const NYql::TIssue& issue) {
240+
return issue.GetMessage().Contains("/Root/Test");
241+
}), commitResult.GetIssues().ToString());
242+
}
207243
}
208244

209245
} // namespace NKqp

0 commit comments

Comments
 (0)