Skip to content

Commit c5b9fe2

Browse files
authored
Optimize heartbeats emission KIKIMR-20392 (#557)
1 parent 00eda0e commit c5b9fe2

8 files changed

+262
-79
lines changed

ydb/core/persqueue/partition.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui
187187
, NumChannels(numChannels)
188188
, WriteBufferIsFullCounter(nullptr)
189189
, WriteLagMs(TDuration::Minutes(1), 100)
190+
, LastEmittedHeartbeat(TRowVersion::Min())
190191
{
191192
TabletCounters.Populate(Counters);
192193

ydb/core/persqueue/partition.h

+1
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
755755
TInstant LastUsedStorageMeterTimestamp;
756756

757757
TDeque<std::unique_ptr<IEventBase>> PendingEvents;
758+
TRowVersion LastEmittedHeartbeat;
758759
};
759760

760761
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_sourcemanager.cpp

+3-8
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ TPartitionSourceManager::TModificationBatch::~TModificationBatch() {
236236
}
237237
}
238238

239-
TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const {
239+
TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmitHeartbeat() const {
240240
return HeartbeatEmitter.CanEmit();
241241
}
242242

@@ -331,13 +331,8 @@ void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TI
331331
}
332332
}
333333

334-
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) {
335-
Batch.HeartbeatEmitter.Process(SourceId, heartbeat);
336-
if (InMemory == MemoryStorage().end()) {
337-
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, std::move(heartbeat));
338-
} else {
339-
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat)));
340-
}
334+
void TPartitionSourceManager::TSourceManager::Update(THeartbeat&& heartbeat) {
335+
Batch.HeartbeatEmitter.Process(SourceId, std::move(heartbeat));
341336
}
342337

343338
TPartitionSourceManager::TSourceManager::operator bool() const {

ydb/core/persqueue/partition_sourcemanager.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class TPartitionSourceManager {
5050
std::optional<ui64> UpdatedSeqNo() const;
5151

5252
void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
53-
void Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat);
53+
void Update(THeartbeat&& heartbeat);
5454

5555
operator bool() const;
5656

@@ -77,7 +77,7 @@ class TPartitionSourceManager {
7777
TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format);
7878
~TModificationBatch();
7979

80-
TMaybe<THeartbeat> CanEmit() const;
80+
TMaybe<THeartbeat> CanEmitHeartbeat() const;
8181
TSourceManager GetSource(const TString& id);
8282

8383
void Cancel();

ydb/core/persqueue/partition_write.cpp

+36-35
Original file line numberDiff line numberDiff line change
@@ -910,13 +910,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
910910
<< " version " << *hbVersion
911911
);
912912

913-
auto heartbeat = THeartbeat{
914-
.Version = *hbVersion,
915-
.Data = p.Msg.Data,
916-
};
917-
918-
sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat));
919-
913+
sourceId.Update(THeartbeat{*hbVersion, p.Msg.Data});
920914
return ProcessResult::Continue;
921915
}
922916

@@ -1188,6 +1182,41 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
11881182
}
11891183
}
11901184

1185+
if (const auto heartbeat = sourceIdBatch.CanEmitHeartbeat()) {
1186+
if (heartbeat->Version > LastEmittedHeartbeat) {
1187+
LOG_INFO_S(
1188+
ctx, NKikimrServices::PERSQUEUE,
1189+
"Topic '" << TopicName() << "' partition " << Partition
1190+
<< " emit heartbeat " << heartbeat->Version
1191+
);
1192+
1193+
auto hbMsg = TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
1194+
.SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
1195+
.SeqNo = 0, // we don't use SeqNo because we disable deduplication
1196+
.PartNo = 0,
1197+
.TotalParts = 1,
1198+
.TotalSize = static_cast<ui32>(heartbeat->Data.size()),
1199+
.CreateTimestamp = CurrentTimestamp.MilliSeconds(),
1200+
.ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
1201+
.DisableDeduplication = true,
1202+
.WriteTimestamp = CurrentTimestamp.MilliSeconds(),
1203+
.Data = heartbeat->Data,
1204+
.UncompressedSize = 0,
1205+
.PartitionKey = {},
1206+
.ExplicitHashKey = {},
1207+
.External = false,
1208+
.IgnoreQuotaDeadline = true,
1209+
.HeartbeatVersion = std::nullopt,
1210+
}};
1211+
1212+
WriteInflightSize += heartbeat->Data.size();
1213+
auto result = ProcessRequest(hbMsg, parameters, request, ctx);
1214+
Y_ABORT_UNLESS(result == ProcessResult::Continue);
1215+
1216+
LastEmittedHeartbeat = heartbeat->Version;
1217+
}
1218+
}
1219+
11911220
UpdateWriteBufferIsFullState(ctx.Now());
11921221

11931222
if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) {
@@ -1385,34 +1414,6 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
13851414
}
13861415
}
13871416

1388-
if (const auto heartbeat = sourceIdBatch.CanEmit()) {
1389-
LOG_INFO_S(
1390-
ctx, NKikimrServices::PERSQUEUE,
1391-
"Topic '" << TopicName() << "' partition " << Partition
1392-
<< " emit heartbeat " << heartbeat->Version
1393-
);
1394-
1395-
EmplaceRequest(TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
1396-
.SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
1397-
.SeqNo = 0, // we don't use SeqNo because we disable deduplication
1398-
.PartNo = 0,
1399-
.TotalParts = 1,
1400-
.TotalSize = static_cast<ui32>(heartbeat->Data.size()),
1401-
.CreateTimestamp = CurrentTimestamp.MilliSeconds(),
1402-
.ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
1403-
.DisableDeduplication = true,
1404-
.WriteTimestamp = CurrentTimestamp.MilliSeconds(),
1405-
.Data = heartbeat->Data,
1406-
.UncompressedSize = 0,
1407-
.PartitionKey = {},
1408-
.ExplicitHashKey = {},
1409-
.External = false,
1410-
.IgnoreQuotaDeadline = true,
1411-
.HeartbeatVersion = std::nullopt,
1412-
}}, ctx);
1413-
WriteInflightSize += heartbeat->Data.size();
1414-
}
1415-
14161417
if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
14171418
if (!sourceIdBatch.HasModifications()) {
14181419
return request->Record.CmdWriteSize() > 0

ydb/core/persqueue/sourceid.cpp

+71-26
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,6 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark,
7979
void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
8080
FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd);
8181
}
82-
THeartbeatProcessor::THeartbeatProcessor(
83-
const THashSet<TString>& sourceIdsWithHeartbeat,
84-
const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat)
85-
: SourceIdsWithHeartbeat(sourceIdsWithHeartbeat)
86-
, SourceIdsByHeartbeat(sourceIdsByHeartbeat)
87-
{
88-
}
8982

9083
void THeartbeatProcessor::ApplyHeartbeat(const TString& sourceId, const TRowVersion& version) {
9184
SourceIdsWithHeartbeat.insert(sourceId);
@@ -501,49 +494,101 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti
501494

502495
/// THeartbeatEmitter
503496
THeartbeatEmitter::THeartbeatEmitter(const TSourceIdStorage& storage)
504-
: THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat)
505-
, Storage(storage)
497+
: Storage(storage)
506498
{
507499
}
508500

509-
void THeartbeatEmitter::Process(const TString& sourceId, const THeartbeat& heartbeat) {
510-
Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(sourceId));
511-
const auto& sourceIdInfo = Storage.InMemorySourceIds.at(sourceId);
501+
void THeartbeatEmitter::Process(const TString& sourceId, THeartbeat&& heartbeat) {
502+
auto it = Storage.InMemorySourceIds.find(sourceId);
503+
if (it != Storage.InMemorySourceIds.end() && it->second.LastHeartbeat) {
504+
if (heartbeat.Version <= it->second.LastHeartbeat->Version) {
505+
return;
506+
}
507+
}
512508

513-
if (const auto& lastHeartbeat = sourceIdInfo.LastHeartbeat) {
514-
ForgetHeartbeat(sourceId, lastHeartbeat->Version);
509+
if (!Storage.SourceIdsWithHeartbeat.contains(sourceId)) {
510+
NewSourceIdsWithHeartbeat.insert(sourceId);
515511
}
516512

517-
if (LastHeartbeats.contains(sourceId)) {
518-
ForgetHeartbeat(sourceId, LastHeartbeats.at(sourceId).Version);
513+
if (Heartbeats.contains(sourceId)) {
514+
ForgetHeartbeat(sourceId, Heartbeats.at(sourceId).Version);
519515
}
520516

521517
ApplyHeartbeat(sourceId, heartbeat.Version);
522-
LastHeartbeats[sourceId] = heartbeat;
518+
Heartbeats[sourceId] = std::move(heartbeat);
523519
}
524520

525521
TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const {
526-
if (SourceIdsWithHeartbeat.size() != Storage.ExplicitSourceIds.size()) {
522+
if (Storage.ExplicitSourceIds.size() != (Storage.SourceIdsWithHeartbeat.size() + NewSourceIdsWithHeartbeat.size())) {
527523
return Nothing();
528524
}
529525

530526
if (SourceIdsByHeartbeat.empty()) {
531527
return Nothing();
532528
}
533529

534-
auto it = SourceIdsByHeartbeat.begin();
535-
if (Storage.SourceIdsByHeartbeat.empty() || it->first > Storage.SourceIdsByHeartbeat.begin()->first) {
536-
Y_ABORT_UNLESS(!it->second.empty());
537-
const auto& someSourceId = *it->second.begin();
530+
if (!NewSourceIdsWithHeartbeat.empty()) { // just got quorum
531+
if (!Storage.SourceIdsByHeartbeat.empty() && Storage.SourceIdsByHeartbeat.begin()->first < SourceIdsByHeartbeat.begin()->first) {
532+
return GetFromStorage(Storage.SourceIdsByHeartbeat.begin());
533+
} else {
534+
return GetFromDiff(SourceIdsByHeartbeat.begin());
535+
}
536+
} else if (SourceIdsByHeartbeat.begin()->first > Storage.SourceIdsByHeartbeat.begin()->first) {
537+
auto storage = Storage.SourceIdsByHeartbeat.begin();
538+
auto diff = SourceIdsByHeartbeat.begin();
539+
540+
TMaybe<TRowVersion> newVersion;
541+
while (storage != Storage.SourceIdsByHeartbeat.end()) {
542+
const auto& [version, sourceIds] = *storage;
543+
544+
auto rest = sourceIds.size();
545+
for (const auto& sourceId : sourceIds) {
546+
auto it = Heartbeats.find(sourceId);
547+
if (it != Heartbeats.end() && it->second.Version > version && version <= diff->first) {
548+
--rest;
549+
} else {
550+
break;
551+
}
552+
}
538553

539-
if (LastHeartbeats.contains(someSourceId)) {
540-
return LastHeartbeats.at(someSourceId);
541-
} else if (Storage.InMemorySourceIds.contains(someSourceId)) {
542-
return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
554+
if (!rest) {
555+
if (++storage != Storage.SourceIdsByHeartbeat.end()) {
556+
newVersion = storage->first;
557+
} else {
558+
newVersion = diff->first;
559+
}
560+
} else {
561+
break;
562+
}
563+
}
564+
565+
if (newVersion) {
566+
storage = Storage.SourceIdsByHeartbeat.find(*newVersion);
567+
if (storage != Storage.SourceIdsByHeartbeat.end()) {
568+
return GetFromStorage(storage);
569+
} else {
570+
return GetFromDiff(diff);
571+
}
543572
}
544573
}
545574

546575
return Nothing();
547576
}
548577

578+
TMaybe<THeartbeat> THeartbeatEmitter::GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const {
579+
Y_ABORT_UNLESS(!it->second.empty());
580+
const auto& someSourceId = *it->second.begin();
581+
582+
Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(someSourceId));
583+
return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
584+
}
585+
586+
TMaybe<THeartbeat> THeartbeatEmitter::GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const {
587+
Y_ABORT_UNLESS(!it->second.empty());
588+
const auto& someSourceId = *it->second.begin();
589+
590+
Y_ABORT_UNLESS(Heartbeats.contains(someSourceId));
591+
return Heartbeats.at(someSourceId);
592+
}
593+
549594
}

ydb/core/persqueue/sourceid.h

+11-8
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,17 @@ struct TSourceIdInfo {
5959
}; // TSourceIdInfo
6060

6161
class THeartbeatProcessor {
62-
public:
63-
THeartbeatProcessor() = default;
64-
explicit THeartbeatProcessor(
65-
const THashSet<TString>& sourceIdsWithHeartbeat,
66-
const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat);
62+
protected:
63+
using TSourceIdsByHeartbeat = TMap<TRowVersion, THashSet<TString>>;
6764

65+
public:
6866
void ApplyHeartbeat(const TString& sourceId, const TRowVersion& version);
6967
void ForgetHeartbeat(const TString& sourceId, const TRowVersion& version);
7068
void ForgetSourceId(const TString& sourceId);
7169

7270
protected:
7371
THashSet<TString> SourceIdsWithHeartbeat;
74-
TMap<TRowVersion, THashSet<TString>> SourceIdsByHeartbeat;
72+
TSourceIdsByHeartbeat SourceIdsByHeartbeat;
7573

7674
}; // THeartbeatProcessor
7775

@@ -151,12 +149,17 @@ class THeartbeatEmitter: private THeartbeatProcessor {
151149
public:
152150
explicit THeartbeatEmitter(const TSourceIdStorage& storage);
153151

154-
void Process(const TString& sourceId, const THeartbeat& heartbeat);
152+
void Process(const TString& sourceId, THeartbeat&& heartbeat);
155153
TMaybe<THeartbeat> CanEmit() const;
156154

155+
private:
156+
TMaybe<THeartbeat> GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const;
157+
TMaybe<THeartbeat> GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const;
158+
157159
private:
158160
const TSourceIdStorage& Storage;
159-
THashMap<TString, THeartbeat> LastHeartbeats;
161+
THashSet<TString> NewSourceIdsWithHeartbeat;
162+
THashMap<TString, THeartbeat> Heartbeats;
160163

161164
}; // THeartbeatEmitter
162165

0 commit comments

Comments
 (0)