Skip to content

Commit 618f5cf

Browse files
correct EvWrite locks processing (#8931)
1 parent 9b1e630 commit 618f5cf

File tree

3 files changed

+11
-5
lines changed

3 files changed

+11
-5
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,13 @@ void TColumnShard::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorCon
909909
}
910910

911911
void TColumnShard::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx) {
912-
auto op = GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSyncTransactionOperator>(ev->Get()->Record.GetTxId());
912+
auto opPtr = GetProgressTxController().GetTxOperatorOptional(ev->Get()->Record.GetTxId());
913+
if (!opPtr) {
914+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "missed_read_set_ack")("proto", ev->Get()->Record.DebugString())(
915+
"tx_id", ev->Get()->Record.GetTxId());
916+
return;
917+
}
918+
auto op = TValidator::CheckNotNull(dynamic_pointer_cast<TEvWriteCommitSyncTransactionOperator>(opPtr));
913919
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "read_set_ack")("proto", ev->Get()->Record.DebugString())("lock_id", op->GetLockId());
914920
auto tx = op->CreateReceiveResultAckTx(*this, ev->Get()->Record.GetTabletConsumer());
915921
Execute(tx.release(), ctx);

ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
188188
if (tabletId && *tabletId != i) {
189189
continue;
190190
}
191-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
191+
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
192192
new TEvPipeCache::TEvForward(
193193
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), 0), i, true),
194194
IEventHandle::FlagTrackDelivery, GetTxId());
@@ -202,7 +202,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
202202
readSetData.SetDecision(*TxBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
203203
for (auto&& i : ReceivingShards) {
204204
if (WaitShardsResultAck.contains(i)) {
205-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
205+
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
206206
new TEvPipeCache::TEvForward(
207207
new TEvTxProcessing::TEvReadSet(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), readSetData.SerializeAsString()), i,
208208
true),

ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
136136
}
137137

138138
void SendBrokenFlagAck(TColumnShard& owner) {
139-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
139+
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
140140
new TEvPipeCache::TEvForward(
141141
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), 0), ArbiterTabletId, true),
142142
IEventHandle::FlagTrackDelivery, GetTxId());
@@ -145,7 +145,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
145145
void SendResult(TColumnShard& owner) {
146146
NKikimrTx::TReadSetData readSetData;
147147
readSetData.SetDecision(SelfBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
148-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
148+
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
149149
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(
150150
0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), readSetData.SerializeAsString()),
151151
ArbiterTabletId, true),

0 commit comments

Comments
 (0)