Skip to content

Commit 9b24426

Browse files
authored
Subscribe to pipe state (#5685)
1 parent 40bcf99 commit 9b24426

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

ydb/core/tx/replication/service/table_writer_impl.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
100100
event->Record.SetSource(source);
101101
}
102102

103-
this->Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false));
103+
this->Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, true, ++SubscribeCookie));
104104
this->Become(&TThis::StateWaitingStatus);
105105
}
106106

@@ -140,7 +140,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
140140
}
141141

142142
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
143-
if (TabletId == ev->Get()->TabletId) {
143+
if (TabletId == ev->Get()->TabletId && ev->Cookie == SubscribeCookie) {
144144
Leave();
145145
}
146146
}
@@ -194,7 +194,9 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
194194
mutable TMaybe<TString> LogPrefix;
195195

196196
TActorId LeaderPipeCache;
197+
ui64 SubscribeCookie = 0;
197198
TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext;
199+
198200
}; // TTablePartitionWriter
199201

200202
template <class TChangeRecord>
@@ -421,7 +423,7 @@ class TLocalTableWriter
421423
return new TTablePartitionWriter<TChangeRecord>(this->SelfId(), partitionId, TTableId(this->PathId, Schema->Version));
422424
}
423425

424-
const TVector<TKeyDesc::TPartitionInfo>& GetPartitions() const override { return KeyDesc->GetPartitions(); };
426+
const TVector<TKeyDesc::TPartitionInfo>& GetPartitions() const override { return KeyDesc->GetPartitions(); }
425427
const TVector<NScheme::TTypeInfo>& GetSchema() const override { return KeyDesc->KeyColumnTypes; }
426428
NKikimrSchemeOp::ECdcStreamFormat GetStreamFormat() const override { return TChangeRecord::StreamType; }
427429

0 commit comments

Comments
 (0)