Skip to content

Commit 89d27b2

Browse files
committed
Investigate heartbeat problems
1 parent 67bd625 commit 89d27b2

File tree

4 files changed

+19
-1
lines changed

4 files changed

+19
-1
lines changed

ydb/core/change_exchange/change_record.h

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class IChangeRecord: public TThrRefBase {
3131
AsyncIndex,
3232
CdcDataChange,
3333
CdcHeartbeat,
34+
CdcHeartbeatPrivate,
3435
};
3536

3637
public:

ydb/core/tx/datashard/cdc_stream_heartbeat.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class TDataShard::TTxCdcStreamEmitHeartbeats: public NTabletFlatExecutor::TTrans
4545
const auto heartbeats = Self->GetCdcStreamHeartbeatManager().EmitHeartbeats(txc.DB, Edge);
4646

4747
for (const auto& [streamPathId, info] : heartbeats) {
48-
auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::CdcHeartbeat)
48+
for (auto kind : {TChangeRecord::EKind::CdcHeartbeatPrivate, TChangeRecord::EKind::CdcHeartbeat}) {
49+
auto recordPtr = TChangeRecordBuilder(kind)
4950
.WithOrder(Self->AllocateChangeRecordOrder(db))
5051
.WithGroup(0)
5152
.WithStep(info.Last.Step)
@@ -69,6 +70,7 @@ class TDataShard::TTxCdcStreamEmitHeartbeats: public NTabletFlatExecutor::TTrans
6970
.SchemaVersion = record.GetSchemaVersion(),
7071
});
7172
}
73+
}
7274

7375
return true;
7476
}

ydb/core/tx/datashard/change_record.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) cons
2525
case EKind::CdcHeartbeat: {
2626
break;
2727
}
28+
case EKind::CdcHeartbeatPrivate: {
29+
break;
30+
}
2831
}
2932
}
3033

@@ -54,6 +57,10 @@ TConstArrayRef<TCell> TChangeRecord::GetKey() const {
5457
case EKind::CdcHeartbeat: {
5558
Y_ABORT("Not supported");
5659
}
60+
61+
case EKind::CdcHeartbeatPrivate: {
62+
Y_ABORT("Not supported");
63+
}
5764
}
5865

5966
Y_ABORT_UNLESS(Key);
@@ -74,6 +81,7 @@ TInstant TChangeRecord::GetApproximateCreationDateTime() const {
7481
bool TChangeRecord::IsBroadcast() const {
7582
switch (Kind) {
7683
case EKind::CdcHeartbeat:
84+
case EKind::CdcHeartbeatPrivate:
7785
return true;
7886
default:
7987
return false;

ydb/core/tx/datashard/change_record_cdc_serializer.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class TBaseSerializer: public IChangeRecordSerializer {
5757
cmd.SetCreateTimeMS(record.GetApproximateCreationDateTime().MilliSeconds());
5858
switch (record.GetKind()) {
5959
case TChangeRecord::EKind::CdcDataChange:
60+
case TChangeRecord::EKind::CdcHeartbeatPrivate:
6061
return SerializeDataChange(cmd, record);
6162
case TChangeRecord::EKind::CdcHeartbeat:
6263
return SerializeHeartbeat(cmd, record);
@@ -294,6 +295,12 @@ class TYdbJsonSerializer: public TJsonSerializer {
294295
return SerializeVirtualTimestamp(json["resolved"], {record.GetStep(), record.GetTxId()});
295296
}
296297

298+
json["shard_id"] = Opts.ShardId;
299+
300+
if (record.GetKind() == TChangeRecord::EKind::CdcHeartbeatPrivate) {
301+
return SerializeVirtualTimestamp(json["resolved_private"], {record.GetStep(), record.GetTxId()});
302+
}
303+
297304
Y_ABORT_UNLESS(record.GetSchema());
298305
const auto body = ParseBody(record.GetBody());
299306

0 commit comments

Comments
 (0)