Skip to content

Commit 8791163

Browse files
committed
Always show table with broken locks (#8763)
1 parent 84f2f0e commit 8791163

File tree

8 files changed

+88
-40
lines changed

8 files changed

+88
-40
lines changed

ydb/core/kqp/common/kqp_tx.cpp

+19-12
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,31 @@ namespace NKqp {
77

88
using namespace NYql;
99

10-
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
10+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKikimrPathId& pathId) {
1111
TStringBuilder message;
1212
message << "Transaction locks invalidated.";
1313

14-
TMaybe<TString> tableName;
15-
if (invalidatedLock) {
16-
TKikimrPathId id(invalidatedLock->GetSchemeShard(), invalidatedLock->GetPathId());
17-
auto table = txCtx.TableByIdMap.FindPtr(id);
18-
if (table) {
19-
tableName = *table;
14+
if (pathId.OwnerId() != 0) {
15+
auto table = txCtx.TableByIdMap.FindPtr(pathId);
16+
YQL_ENSURE(table);
17+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
18+
} else {
19+
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
20+
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
21+
if (pathId.TableId() == pathId.TableId()) {
22+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
23+
}
2024
}
25+
YQL_ENSURE(false);
2126
}
27+
}
2228

23-
if (tableName) {
24-
message << " Table: " << *tableName;
25-
}
26-
27-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
29+
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpTxLock& invalidatedLock) {
30+
return GetLocksInvalidatedIssue(
31+
txCtx,
32+
TKikimrPathId(
33+
invalidatedLock.GetSchemeShard(),
34+
invalidatedLock.GetPathId()));
2835
}
2936

3037
std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,

ydb/core/kqp/common/kqp_tx.h

+1
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ class TTransactionsCache {
434434
}
435435
};
436436

437+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
437438
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
438439
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);
439440

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+22-26
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
241241
}
242242

243243
void Finalize() {
244-
if (LocksBroken) {
245-
TString message = "Transaction locks invalidated.";
246-
247-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
248-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
249-
}
244+
YQL_ENSURE(!LocksBroken);
250245

251246
auto& response = *ResponseEv->Record.MutableResponse();
252247

@@ -1183,6 +1178,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11831178
CheckExecutionComplete();
11841179
return;
11851180
}
1181+
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
1182+
LOG_D("Broken locks: " << res->Record.DebugString());
1183+
YQL_ENSURE(shardState->State == TShardState::EState::Executing);
1184+
shardState->State = TShardState::EState::Finished;
1185+
Counters->TxProxyMon->TxResultAborted->Inc();
1186+
LocksBroken = true;
1187+
1188+
if (!res->Record.GetTxLocks().empty()) {
1189+
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
1190+
res->Record.GetTxLocks(0).GetSchemeShard(),
1191+
res->Record.GetTxLocks(0).GetPathId());
1192+
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
1193+
}
1194+
1195+
CheckExecutionComplete();
1196+
return;
1197+
}
11861198
default:
11871199
{
11881200
return ShardError(res->Record);
@@ -1235,31 +1247,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12351247
shardState->State = TShardState::EState::Finished;
12361248

12371249
Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?
1238-
12391250
LocksBroken = true;
12401251

1241-
TMaybe<TString> tableName;
12421252
if (!res->Record.GetTxLocks().empty()) {
1243-
auto& lock = res->Record.GetTxLocks(0);
1244-
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
1245-
auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
1246-
if (it != TasksGraph.GetStagesInfo().end()) {
1247-
tableName = it->second.Meta.TableConstInfo->Path;
1248-
}
1253+
ResponseEv->BrokenLockPathId = TKikimrPathId(
1254+
res->Record.GetTxLocks(0).GetSchemeShard(),
1255+
res->Record.GetTxLocks(0).GetPathId());
12491256
}
12501257

1251-
// Reply as soon as we know which table had locks invalidated
1252-
if (tableName) {
1253-
auto message = TStringBuilder()
1254-
<< "Transaction locks invalidated. Table: " << *tableName;
1255-
1256-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
1257-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
1258-
}
1259-
1260-
// Receive more replies from other shards
1261-
CheckExecutionComplete();
1262-
return;
1258+
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
12631259
}
12641260
case NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED: {
12651261
YQL_ENSURE(false);

ydb/core/kqp/executer_actor/kqp_executer.h

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct TEvKqpExecuter {
2626

2727
NLWTrace::TOrbit Orbit;
2828
IKqpGateway::TKqpSnapshot Snapshot;
29+
std::optional<NYql::TKikimrPathId> BrokenLockPathId;
2930
ui64 ResultRowsCount = 0;
3031
ui64 ResultRowsBytes = 0;
3132

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -1768,7 +1768,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17681768
auto& response = *ResponseEv->Record.MutableResponse();
17691769

17701770
response.SetStatus(status);
1771-
response.MutableIssues()->Swap(issues);
1771+
if (issues) {
1772+
response.MutableIssues()->Swap(issues);
1773+
}
17721774

17731775
LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
17741776
<< ", to ActorId: " << Target);

ydb/core/kqp/provider/yql_kikimr_provider.h

-1
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
323323

324324
for (const auto& info : tableInfos) {
325325
tableInfoMap.emplace(info.GetTableName(), &info);
326-
327326
TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
328327
TableByIdMap.emplace(pathId, info.GetTableName());
329328
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
14391439

14401440
// Invalidate query cache on scheme/internal errors
14411441
switch (status) {
1442+
case Ydb::StatusIds::ABORTED: {
1443+
if (ev->BrokenLockPathId) {
1444+
issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId));
1445+
}
1446+
break;
1447+
}
14421448
case Ydb::StatusIds::SCHEME_ERROR:
14431449
case Ydb::StatusIds::INTERNAL_ERROR:
14441450
InvalidateQuery();

ydb/core/kqp/ut/tx/kqp_locks_ut.cpp

+36
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,42 @@ Y_UNIT_TEST_SUITE(KqpLocks) {
204204
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
205205
CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0)));
206206
}
207+
208+
Y_UNIT_TEST(TwoPhaseTx) {
209+
TKikimrRunner kikimr;
210+
auto db = kikimr.GetTableClient();
211+
212+
auto session1 = db.CreateSession().GetValueSync().GetSession();
213+
auto session2 = db.CreateSession().GetValueSync().GetSession();
214+
215+
auto result = session1.ExecuteDataQuery(Q_(R"(
216+
REPLACE INTO `/Root/Test` (Group, Name, Comment) VALUES (1U, "Paul", "Changed");
217+
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
218+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync();
219+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
220+
221+
auto tx1 = result.GetTransaction();
222+
UNIT_ASSERT(tx1);
223+
224+
result = session2.ExecuteDataQuery(Q_(R"(
225+
REPLACE INTO `/Root/Test` (Group, Name, Comment)
226+
VALUES (1U, "Paul", "Changed");
227+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
228+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
229+
230+
result = session1.ExecuteDataQuery(Q_(R"(
231+
SELECT * FROM `KeyValue`;
232+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
233+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
234+
235+
auto commitResult = tx1->Commit().GetValueSync();
236+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
237+
commitResult.GetIssues().PrintTo(Cerr);
238+
UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
239+
[] (const NYql::TIssue& issue) {
240+
return issue.GetMessage().Contains("/Root/Test");
241+
}), commitResult.GetIssues().ToString());
242+
}
207243
}
208244

209245
} // namespace NKqp

0 commit comments

Comments
 (0)