Skip to content

Commit 6bd2a85

Browse files
authored
TestShardRestartPlannedCommitShouldSucceed + EvWrite (#1969)
1 parent 11e4668 commit 6bd2a85

15 files changed

+318
-88
lines changed

ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
7878

7979
try {
8080
bool useGenericReadSets = dataTx->GetUseGenericReadSets();
81-
const auto& kqpLocks = dataTx->GetKqpLocks();
81+
const auto& kqpLocks = dataTx->HasKqpLocks() ? dataTx->GetKqpLocks() : NKikimrDataEvents::TKqpLocks{};
8282
auto& tasksRunner = dataTx->GetKqpTasksRunner();
8383

8484
auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - dataTx->GetTxSize());
@@ -111,8 +111,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
111111
}
112112
}
113113

114-
KqpFillOutReadSets(op->OutReadSets(), kqpLocks,
115-
dataTx->HasKqpLocks(), useGenericReadSets, tasksRunner, DataShard.SysLocksTable(), tabletId);
114+
KqpFillOutReadSets(op->OutReadSets(), kqpLocks, useGenericReadSets, &tasksRunner, DataShard.SysLocksTable(), tabletId);
116115
} catch (const TMemoryLimitExceededException&) {
117116
LOG_T("Operation " << *op << " at " << tabletId
118117
<< " exceeded memory limit " << txc.GetMemoryLimit()
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#include "datashard_impl.h"
2+
#include "datashard_kqp.h"
3+
#include "datashard_pipeline.h"
4+
#include "execution_unit_ctors.h"
5+
#include "setup_sys_locks.h"
6+
#include "datashard_locks_db.h"
7+
8+
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
9+
10+
namespace NKikimr {
11+
namespace NDataShard {
12+
13+
using namespace NMiniKQL;
14+
15+
#define LOG_T(stream) LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, stream)
16+
#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, stream)
17+
#define LOG_E(stream) LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, stream)
18+
#define LOG_C(stream) LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, stream)
19+
#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, stream)
20+
21+
class TBuildWriteOutRSUnit : public TExecutionUnit {
22+
public:
23+
TBuildWriteOutRSUnit(TDataShard& dataShard, TPipeline& pipeline);
24+
~TBuildWriteOutRSUnit() override;
25+
26+
bool IsReadyToExecute(TOperation::TPtr op) const override;
27+
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override;
28+
void Complete(TOperation::TPtr op, const TActorContext& ctx) override;
29+
30+
private:
31+
EExecutionStatus OnTabletNotReady(TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx);
32+
};
33+
34+
TBuildWriteOutRSUnit::TBuildWriteOutRSUnit(TDataShard& dataShard, TPipeline& pipeline)
35+
: TExecutionUnit(EExecutionUnitKind::BuildWriteOutRS, true, dataShard, pipeline) {}
36+
37+
TBuildWriteOutRSUnit::~TBuildWriteOutRSUnit() {}
38+
39+
bool TBuildWriteOutRSUnit::IsReadyToExecute(TOperation::TPtr) const {
40+
return true;
41+
}
42+
43+
EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransactionContext& txc,
44+
const TActorContext& ctx)
45+
{
46+
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
47+
auto writeTx = writeOp->GetWriteTx();
48+
Y_ABORT_UNLESS(writeTx);
49+
50+
DataShard.ReleaseCache(*writeOp);
51+
52+
if (writeOp->IsTxDataReleased()) {
53+
switch (Pipeline.RestoreDataTx(writeOp, txc)) {
54+
case ERestoreDataStatus::Ok:
55+
break;
56+
case ERestoreDataStatus::Restart:
57+
return EExecutionStatus::Restart;
58+
case ERestoreDataStatus::Error:
59+
Y_ABORT("Failed to restore writeOp data: %s", writeTx->GetErrStr().c_str());
60+
}
61+
}
62+
63+
TDataShardLocksDb locksDb(DataShard, txc);
64+
TSetupSysLocks guardLocks(op, DataShard, &locksDb);
65+
66+
ui64 tabletId = DataShard.TabletID();
67+
68+
if (writeTx->CheckCancelled()) {
69+
writeOp->ReleaseTxData(txc);
70+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Tx was cancelled");
71+
DataShard.IncCounter(COUNTER_WRITE_CANCELLED);
72+
return EExecutionStatus::Executed;
73+
}
74+
75+
writeTx->SetReadVersion(DataShard.GetReadWriteVersions(writeOp).ReadVersion);
76+
77+
try {
78+
const auto& kqpLocks = writeTx->GetKqpLocks() ? writeTx->GetKqpLocks().value() : NKikimrDataEvents::TKqpLocks{};
79+
KqpFillOutReadSets(op->OutReadSets(), kqpLocks, true, nullptr, DataShard.SysLocksTable(), tabletId);
80+
} catch (const TNotReadyTabletException&) {
81+
LOG_C("Unexpected TNotReadyTabletException exception at build out rs");
82+
return OnTabletNotReady(*writeOp, txc, ctx);
83+
} catch (const yexception& e) {
84+
LOG_C("Exception while preparing out-readsets for KQP transaction " << *op << " at " << DataShard.TabletID() << ": " << e.what());
85+
if (op->IsImmediate()) {
86+
writeOp->ReleaseTxData(txc);
87+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Tx was terminated: " << e.what());
88+
return EExecutionStatus::Executed;
89+
} else {
90+
Y_FAIL_S("Unexpected exception in KQP out-readsets prepare: " << e.what());
91+
}
92+
}
93+
94+
return EExecutionStatus::Executed;
95+
}
96+
97+
void TBuildWriteOutRSUnit::Complete(TOperation::TPtr, const TActorContext&) {}
98+
99+
EExecutionStatus TBuildWriteOutRSUnit::OnTabletNotReady(TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx)
100+
{
101+
LOG_T("Tablet " << DataShard.TabletID() << " is not ready for " << writeOp << " execution");
102+
103+
DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
104+
105+
writeOp.ReleaseTxData(txc);
106+
return EExecutionStatus::Restart;
107+
}
108+
109+
THolder<TExecutionUnit> CreateBuildWriteOutRSUnit(TDataShard& dataShard, TPipeline& pipeline) {
110+
return THolder(new TBuildWriteOutRSUnit(dataShard, pipeline));
111+
}
112+
113+
} // namespace NDataShard
114+
} // namespace NKikimr

ydb/core/tx/datashard/check_write_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
122122
DataShard.GetProcessingParams() ? DataShard.GetProcessingParams()->GetCoordinators() : google::protobuf::RepeatedField<ui64>{}
123123
}
124124
));
125-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID());
125+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared write transaction " << *op << " at tablet " << DataShard.TabletID());
126126
}
127127

128128
return EExecutionStatus::Executed;

ydb/core/tx/datashard/datashard_kqp.cpp

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -534,30 +534,32 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
534534
return result;
535535
}
536536

537-
void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrDataEvents::TKqpLocks& kqpLocks, bool hasKqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner, TSysLocks& sysLocks, ui64 tabletId)
537+
void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrDataEvents::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner* tasksRunner, TSysLocks& sysLocks, ui64 tabletId)
538538
{
539539
TMap<std::pair<ui64, ui64>, NKikimrTxDataShard::TKqpReadset> readsetData;
540540

541-
for (auto& [taskId, task] : tasksRunner.GetTasks()) {
542-
auto& taskRunner = tasksRunner.GetTaskRunner(task.GetId());
541+
if (tasksRunner) {
542+
for (auto& [taskId, task] : tasksRunner->GetTasks()) {
543+
auto& taskRunner = tasksRunner->GetTaskRunner(task.GetId());
543544

544-
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
545-
for (auto& channel : task.GetOutputs(i).GetChannels()) {
546-
if (channel.GetIsPersistent()) {
547-
MKQL_ENSURE_S(channel.GetSrcEndpoint().HasTabletId());
548-
MKQL_ENSURE_S(channel.GetDstEndpoint().HasTabletId());
545+
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
546+
for (auto& channel : task.GetOutputs(i).GetChannels()) {
547+
if (channel.GetIsPersistent()) {
548+
MKQL_ENSURE_S(channel.GetSrcEndpoint().HasTabletId());
549+
MKQL_ENSURE_S(channel.GetDstEndpoint().HasTabletId());
549550

550-
NDq::TDqSerializedBatch outputData;
551-
auto fetchStatus = FetchAllOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData);
552-
MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish);
553-
MKQL_ENSURE(!outputData.IsOOB(), "Out-of-band data transport is not yet supported");
551+
NDq::TDqSerializedBatch outputData;
552+
auto fetchStatus = FetchAllOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData);
553+
MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish);
554+
MKQL_ENSURE(!outputData.IsOOB(), "Out-of-band data transport is not yet supported");
554555

555-
auto key = std::make_pair(channel.GetSrcEndpoint().GetTabletId(), channel.GetDstEndpoint().GetTabletId());
556-
auto& channelData = *readsetData[key].AddOutputs();
556+
auto key = std::make_pair(channel.GetSrcEndpoint().GetTabletId(), channel.GetDstEndpoint().GetTabletId());
557+
auto& channelData = *readsetData[key].AddOutputs();
557558

558-
channelData.SetChannelId(channel.GetId());
559-
channelData.SetFinished(true);
560-
channelData.MutableData()->Swap(&outputData.Proto);
559+
channelData.SetChannelId(channel.GetId());
560+
channelData.SetFinished(true);
561+
channelData.MutableData()->Swap(&outputData.Proto);
562+
}
561563
}
562564
}
563565
}
@@ -566,7 +568,7 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrD
566568
NKikimrTx::TReadSetData::EDecision decision = NKikimrTx::TReadSetData::DECISION_COMMIT;
567569
TMap<std::pair<ui64, ui64>, NKikimrTx::TReadSetData> genericData;
568570

569-
if (hasKqpLocks && NeedValidateLocks(kqpLocks.GetOp())) {
571+
if (kqpLocks.HasOp() && NeedValidateLocks(kqpLocks.GetOp())) {
570572
bool sendLocks = SendLocks(kqpLocks, tabletId);
571573
YQL_ENSURE(sendLocks == !kqpLocks.GetLocks().empty());
572574

ydb/core/tx/datashard/datashard_kqp.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
3232
const NMiniKQL::TKqpDatashardComputeContext& computeCtx);
3333

3434
void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrDataEvents::TKqpLocks& kqpLocks,
35-
bool hasKqpLocks, bool useGenericReadSets,
36-
NKqp::TKqpTasksRunner& tasksRunner, TSysLocks& sysLocks, ui64 tabletId);
35+
bool useGenericReadSets, NKqp::TKqpTasksRunner* tasksRunner, TSysLocks& sysLocks, ui64 tabletId);
3736

3837
void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
3938
const NKikimrDataEvents::TKqpLocks& kqpLocks, const NKqp::TKqpTasksRunner* tasksRunner, ui64 tabletId);

ydb/core/tx/datashard/datashard_ut_order.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3166,7 +3166,7 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, StreamLookup) {
31663166
}
31673167
}
31683168

3169-
Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) {
3169+
Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWrite) {
31703170
TPortManager pm;
31713171
NKikimrConfig::TAppConfig app;
31723172
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
@@ -3187,17 +3187,25 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) {
31873187

31883188
InitRoot(server, sender);
31893189

3190-
CreateShardedTable(server, sender, "/Root", "table-1", 1);
3191-
CreateShardedTable(server, sender, "/Root", "table-2", 1);
3192-
auto table1shards = GetTableShards(server, sender, "/Root/table-1");
3190+
auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
3191+
auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1);
31933192

3194-
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
3195-
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1)"));
3193+
{
3194+
auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{};
3195+
auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
3196+
3197+
Cerr << "===== UPSERT initial rows" << Endl;
3198+
3199+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
3200+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1)"));
3201+
}
31963202

31973203
TString sessionId = CreateSessionRPC(runtime);
31983204

31993205
TString txId;
32003206
{
3207+
Cerr << "===== Begin SELECT" << Endl;
3208+
32013209
auto result = KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
32023210
SELECT * FROM `/Root/table-1`
32033211
UNION ALL
@@ -3235,6 +3243,11 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) {
32353243
};
32363244
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);
32373245

3246+
auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{};
3247+
auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
3248+
3249+
Cerr << "===== UPSERT and commit" << Endl;
3250+
32383251
// Send a commit request, it would block on readset exchange
32393252
auto myCommit2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
32403253
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
@@ -3246,18 +3259,24 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) {
32463259
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), expectedReadSets);
32473260

32483261
// Remove observer and gracefully restart the shard
3249-
Cerr << "... restarting tablet" << Endl;
3262+
Cerr << "===== restarting tablet" << Endl;
32503263
runtime.SetObserverFunc(prevObserverFunc);
3251-
GracefulRestartTablet(runtime, table1shards[0], sender);
3264+
GracefulRestartTablet(runtime, shards1[0], sender);
32523265

32533266
// The result of commit should be SUCCESS
32543267
{
3268+
Cerr << "===== Waiting for commit response" << Endl;
3269+
32553270
auto response = AwaitResponse(runtime, myCommit2);
32563271
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
32573272
}
32583273

3274+
evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};
3275+
32593276
// Select key 3 and verify its value was updated
32603277
{
3278+
Cerr << "===== Last SELECT" << Endl;
3279+
32613280
auto result = KqpSimpleExec(runtime, Q_(R"(SELECT key, value FROM `/Root/table-1` WHERE key = 3 ORDER BY key)"));
32623281
UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 3 } items { uint32_value: 2 } }");
32633282
}

ydb/core/tx/datashard/datashard_write_operation.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,8 @@ void TWriteOperation::BuildExecutionPlan(bool loaded)
575575

576576
plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
577577

578+
plan.push_back(EExecutionUnitKind::BuildWriteOutRS);
579+
plan.push_back(EExecutionUnitKind::StoreAndSendWriteOutRS);
578580
plan.push_back(EExecutionUnitKind::PrepareWriteTxInRS);
579581
plan.push_back(EExecutionUnitKind::LoadAndWaitInRS);
580582
plan.push_back(EExecutionUnitKind::ExecuteWrite);
@@ -612,6 +614,12 @@ bool TWriteOperation::OnStopping(TDataShard& self, const TActorContext& ctx) {
612614
// Immediate ops become ready when stopping flag is set
613615
return true;
614616
} else {
617+
// Distributed operations send notification when proposed
618+
if (GetTarget() && !HasCompletedFlag()) {
619+
auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(self.TabletID(), GetTxId());
620+
ctx.Send(GetTarget(), notify.Release(), 0, GetCookie());
621+
}
622+
615623
// Distributed ops avoid doing new work when stopping
616624
return false;
617625
}

ydb/core/tx/datashard/execution_unit.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,12 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind,
6060
return CreateBuildDistributedEraseTxOutRSUnit(dataShard, pipeline);
6161
case EExecutionUnitKind::BuildKqpDataTxOutRS:
6262
return CreateBuildKqpDataTxOutRSUnit(dataShard, pipeline);
63+
case EExecutionUnitKind::BuildWriteOutRS:
64+
return CreateBuildWriteOutRSUnit(dataShard, pipeline);
6365
case EExecutionUnitKind::StoreAndSendOutRS:
6466
return CreateStoreAndSendOutRSUnit(dataShard, pipeline);
67+
case EExecutionUnitKind::StoreAndSendWriteOutRS:
68+
return CreateStoreAndSendWriteOutRSUnit(dataShard, pipeline);
6569
case EExecutionUnitKind::PrepareDataTxInRS:
6670
return CreatePrepareDataTxInRSUnit(dataShard, pipeline);
6771
case EExecutionUnitKind::PrepareKqpDataTxInRS:

ydb/core/tx/datashard/execution_unit_ctors.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ THolder<TExecutionUnit> CreateProtectSchemeEchoesUnit(TDataShard& dataShard, TPi
3030
THolder<TExecutionUnit> CreateBuildDataTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline);
3131
THolder<TExecutionUnit> CreateBuildDistributedEraseTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline);
3232
THolder<TExecutionUnit> CreateBuildKqpDataTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline);
33-
THolder<TExecutionUnit> CreateStoreAndSendOutRSUnit(TDataShard &dataShard, TPipeline &pipeline);
34-
THolder<TExecutionUnit> CreatePrepareDataTxInRSUnit(TDataShard &dataShard, TPipeline &pipeline);
33+
THolder<TExecutionUnit> CreateBuildWriteOutRSUnit(TDataShard& dataShard, TPipeline& pipeline);
34+
THolder<TExecutionUnit> CreateStoreAndSendOutRSUnit(TDataShard& dataShard, TPipeline& pipeline);
35+
THolder<TExecutionUnit> CreateStoreAndSendWriteOutRSUnit(TDataShard& dataShard, TPipeline& pipeline);
36+
THolder<TExecutionUnit> CreatePrepareDataTxInRSUnit(TDataShard& dataShard, TPipeline& pipeline);
3537
THolder<TExecutionUnit> CreatePrepareKqpDataTxInRSUnit(TDataShard &dataShard, TPipeline &pipeline);
3638
THolder<TExecutionUnit> CreatePrepareWriteTxInRSUnit(TDataShard& dataShard, TPipeline& pipeline);
3739
THolder<TExecutionUnit> CreatePrepareDistributedEraseTxInRSUnit(TDataShard& dataShard, TPipeline& pipeline);

ydb/core/tx/datashard/execution_unit_kind.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ enum class EExecutionUnitKind: ui32 {
3030
ProtectSchemeEchoes,
3131
BuildDataTxOutRS,
3232
BuildKqpDataTxOutRS,
33+
BuildWriteOutRS,
3334
BuildDistributedEraseTxOutRS,
3435
StoreAndSendOutRS,
36+
StoreAndSendWriteOutRS,
3537
PrepareDataTxInRS,
3638
PrepareKqpDataTxInRS,
3739
PrepareWriteTxInRS,

ydb/core/tx/datashard/prepare_write_tx_in_rs_unit.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,8 @@ EExecutionStatus TPrepareWriteTxInRSUnit::Execute(TOperation::TPtr op, TTransact
5050

5151
if (writeTx->CheckCancelled()) {
5252
writeOp->ReleaseTxData(txc);
53-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)
54-
->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled");
55-
56-
DataShard.IncCounter(op->IsImmediate() ? COUNTER_IMMEDIATE_TX_CANCELLED : COUNTER_PLANNED_TX_CANCELLED);
57-
53+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Tx was cancelled");
54+
DataShard.IncCounter(COUNTER_WRITE_CANCELLED);
5855
return EExecutionStatus::Executed;
5956
}
6057

0 commit comments

Comments
 (0)