Skip to content

Commit b07f192

Browse files
authored
Merge 6a917f9 into 1bb608a
2 parents 1bb608a + 6a917f9 commit b07f192

File tree

8 files changed

+93
-36
lines changed

8 files changed

+93
-36
lines changed

ydb/core/kqp/common/kqp_tx.cpp

+21-12
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,33 @@ namespace NKqp {
77

88
using namespace NYql;
99

10-
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
10+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKikimrPathId& pathId) {
1111
TStringBuilder message;
1212
message << "Transaction locks invalidated.";
1313

14-
TMaybe<TString> tableName;
15-
if (invalidatedLock) {
16-
TKikimrPathId id(invalidatedLock->GetSchemeShard(), invalidatedLock->GetPathId());
17-
auto table = txCtx.TableByIdMap.FindPtr(id);
18-
if (table) {
19-
tableName = *table;
14+
if (pathId.OwnerId() != 0) {
15+
auto table = txCtx.TableByIdMap.FindPtr(pathId);
16+
if (!table) {
17+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
2018
}
19+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
20+
} else {
21+
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
22+
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
23+
if (pathId.TableId() == pathId.TableId()) {
24+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
25+
}
26+
}
27+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
2128
}
29+
}
2230

23-
if (tableName) {
24-
message << " Table: " << *tableName;
25-
}
26-
27-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
31+
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpTxLock& invalidatedLock) {
32+
return GetLocksInvalidatedIssue(
33+
txCtx,
34+
TKikimrPathId(
35+
invalidatedLock.GetSchemeShard(),
36+
invalidatedLock.GetPathId()));
2837
}
2938

3039
std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,

ydb/core/kqp/common/kqp_tx.h

+1
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ class TTransactionsCache {
434434
}
435435
};
436436

437+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
437438
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
438439
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);
439440

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+25-22
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
241241
}
242242

243243
void Finalize() {
244+
YQL_ENSURE(!AlreadyReplied);
244245
if (LocksBroken) {
245-
TString message = "Transaction locks invalidated.";
246-
247-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
248-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
246+
return ReplyErrorAndDie(
247+
Ydb::StatusIds::ABORTED,
248+
YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated. Unknown table."));
249249
}
250250

251251
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1183,6 +1183,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11831183
CheckExecutionComplete();
11841184
return;
11851185
}
1186+
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
1187+
LOG_D("Broken locks: " << res->Record.DebugString());
1188+
YQL_ENSURE(shardState->State == TShardState::EState::Executing);
1189+
shardState->State = TShardState::EState::Finished;
1190+
Counters->TxProxyMon->TxResultAborted->Inc();
1191+
LocksBroken = true;
1192+
1193+
if (!res->Record.GetTxLocks().empty()) {
1194+
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
1195+
res->Record.GetTxLocks(0).GetSchemeShard(),
1196+
res->Record.GetTxLocks(0).GetPathId());
1197+
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
1198+
}
1199+
1200+
CheckExecutionComplete();
1201+
return;
1202+
}
11861203
default:
11871204
{
11881205
return ShardError(res->Record);
@@ -1235,29 +1252,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12351252
shardState->State = TShardState::EState::Finished;
12361253

12371254
Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?
1238-
12391255
LocksBroken = true;
12401256

1241-
TMaybe<TString> tableName;
12421257
if (!res->Record.GetTxLocks().empty()) {
1243-
auto& lock = res->Record.GetTxLocks(0);
1244-
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
1245-
auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
1246-
if (it != TasksGraph.GetStagesInfo().end()) {
1247-
tableName = it->second.Meta.TableConstInfo->Path;
1248-
}
1249-
}
1250-
1251-
// Reply as soon as we know which table had locks invalidated
1252-
if (tableName) {
1253-
auto message = TStringBuilder()
1254-
<< "Transaction locks invalidated. Table: " << *tableName;
1255-
1256-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
1257-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
1258+
ResponseEv->BrokenLockPathId = TKikimrPathId(
1259+
res->Record.GetTxLocks(0).GetSchemeShard(),
1260+
res->Record.GetTxLocks(0).GetPathId());
1261+
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
12581262
}
12591263

1260-
// Receive more replies from other shards
12611264
CheckExecutionComplete();
12621265
return;
12631266
}

ydb/core/kqp/executer_actor/kqp_executer.h

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct TEvKqpExecuter {
2626

2727
NLWTrace::TOrbit Orbit;
2828
IKqpGateway::TKqpSnapshot Snapshot;
29+
std::optional<NYql::TKikimrPathId> BrokenLockPathId;
2930
ui64 ResultRowsCount = 0;
3031
ui64 ResultRowsBytes = 0;
3132

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -1768,7 +1768,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17681768
auto& response = *ResponseEv->Record.MutableResponse();
17691769

17701770
response.SetStatus(status);
1771-
response.MutableIssues()->Swap(issues);
1771+
if (issues) {
1772+
response.MutableIssues()->Swap(issues);
1773+
}
17721774

17731775
LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
17741776
<< ", to ActorId: " << Target);

ydb/core/kqp/provider/yql_kikimr_provider.h

-1
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
323323

324324
for (const auto& info : tableInfos) {
325325
tableInfoMap.emplace(info.GetTableName(), &info);
326-
327326
TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
328327
TableByIdMap.emplace(pathId, info.GetTableName());
329328
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
14391439

14401440
// Invalidate query cache on scheme/internal errors
14411441
switch (status) {
1442+
case Ydb::StatusIds::ABORTED: {
1443+
if (ev->BrokenLockPathId) {
1444+
issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId));
1445+
}
1446+
break;
1447+
}
14421448
case Ydb::StatusIds::SCHEME_ERROR:
14431449
case Ydb::StatusIds::INTERNAL_ERROR:
14441450
InvalidateQuery();

ydb/core/kqp/ut/tx/kqp_locks_ut.cpp

+36
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,42 @@ Y_UNIT_TEST_SUITE(KqpLocks) {
204204
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
205205
CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0)));
206206
}
207+
208+
Y_UNIT_TEST(TwoPhaseTx) {
209+
TKikimrRunner kikimr;
210+
auto db = kikimr.GetTableClient();
211+
212+
auto session1 = db.CreateSession().GetValueSync().GetSession();
213+
auto session2 = db.CreateSession().GetValueSync().GetSession();
214+
215+
auto result = session1.ExecuteDataQuery(Q_(R"(
216+
REPLACE INTO `/Root/Test` (Group, Name, Comment) VALUES (1U, "Paul", "Changed");
217+
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
218+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync();
219+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
220+
221+
auto tx1 = result.GetTransaction();
222+
UNIT_ASSERT(tx1);
223+
224+
result = session2.ExecuteDataQuery(Q_(R"(
225+
REPLACE INTO `/Root/Test` (Group, Name, Comment)
226+
VALUES (1U, "Paul", "Changed");
227+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
228+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
229+
230+
result = session1.ExecuteDataQuery(Q_(R"(
231+
SELECT * FROM `KeyValue`;
232+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
233+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
234+
235+
auto commitResult = tx1->Commit().GetValueSync();
236+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
237+
commitResult.GetIssues().PrintTo(Cerr);
238+
UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
239+
[] (const NYql::TIssue& issue) {
240+
return issue.GetMessage().Contains("/Root/Test");
241+
}), commitResult.GetIssues().ToString());
242+
}
207243
}
208244

209245
} // namespace NKqp

0 commit comments

Comments
 (0)