Skip to content

Commit 0ef6729

Browse files
authored
do not check explicit sources every cleanup (#3674)
1 parent e2f14c4 commit 0ef6729

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

ydb/core/persqueue/sourceid.cpp

+11-9
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) {
253253
ExplicitSourceIds.erase(sourceId);
254254
}
255255

256-
SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId));
256+
SourceIdsByOffset[it->second.Explicit].erase(std::make_pair(it->second.Offset, sourceId));
257257
InMemorySourceIds.erase(it);
258258

259259
auto jt = SourceIdOwners.find(sourceId);
@@ -277,7 +277,7 @@ bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInsta
277277
const auto ttl = TDuration::Seconds(config.GetSourceIdLifetimeSeconds());
278278
ui32 size = request->Record.ByteSize();
279279

280-
for (const auto& [offset, sourceId] : SourceIdsByOffset) {
280+
for (const auto& [offset, sourceId] : SourceIdsByOffset[0]) {
281281
if (offset >= startOffset && toDelOffsets.size() >= maxDeleteSourceIds) {
282282
break;
283283
}
@@ -323,7 +323,7 @@ bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInsta
323323
size_t res = InMemorySourceIds.erase(t.second);
324324
Y_ABORT_UNLESS(res == 1);
325325
// delete sourceID from offsets
326-
res = SourceIdsByOffset.erase(t);
326+
res = SourceIdsByOffset[0].erase(t);
327327
Y_ABORT_UNLESS(res == 1);
328328
// save owners to drop and delete records from map
329329
auto it = SourceIdOwners.find(t.second);
@@ -372,14 +372,14 @@ void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdIn
372372
auto it = InMemorySourceIds.find(sourceId);
373373
if (it != InMemorySourceIds.end()) {
374374
if (!load || it->second.Offset < sourceIdInfo.Offset) {
375-
const auto res = SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId));
375+
const auto res = SourceIdsByOffset[sourceIdInfo.Explicit].erase(std::make_pair(it->second.Offset, sourceId));
376376
Y_ABORT_UNLESS(res == 1);
377377
} else {
378378
return;
379379
}
380380
}
381381

382-
const bool res = SourceIdsByOffset.emplace(sourceIdInfo.Offset, sourceId).second;
382+
const bool res = SourceIdsByOffset[sourceIdInfo.Explicit].emplace(sourceIdInfo.Offset, sourceId).second;
383383
Y_ABORT_UNLESS(res);
384384

385385
if (sourceIdInfo.Explicit) {
@@ -421,10 +421,12 @@ void TSourceIdStorage::MarkOwnersForDeletedSourceId(THashMap<TString, TOwnerInfo
421421

422422
TInstant TSourceIdStorage::MinAvailableTimestamp(TInstant now) const {
423423
TInstant ds = now;
424-
if (!SourceIdsByOffset.empty()) {
425-
auto it = InMemorySourceIds.find(SourceIdsByOffset.begin()->second);
426-
Y_ABORT_UNLESS(it != InMemorySourceIds.end());
427-
ds = Min(ds, it->second.WriteTimestamp);
424+
for (ui32 i = 0 ; i < 2; ++i) {
425+
if (!SourceIdsByOffset[i].empty()) {
426+
auto it = InMemorySourceIds.find(SourceIdsByOffset[i].begin()->second);
427+
Y_ABORT_UNLESS(it != InMemorySourceIds.end());
428+
ds = Min(ds, it->second.WriteTimestamp);
429+
}
428430
}
429431

430432
return ds;

ydb/core/persqueue/sourceid.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class TSourceIdStorage: private THeartbeatProcessor {
108108
TSourceIdMap InMemorySourceIds;
109109
THashMap<TString, TString> SourceIdOwners;
110110
TVector<TString> OwnersToDrop;
111-
TSet<std::pair<ui64, TString>> SourceIdsByOffset;
111+
TSet<std::pair<ui64, TString>> SourceIdsByOffset[2];
112112
// used to track heartbeats
113113
THashSet<TString> ExplicitSourceIds;
114114

ydb/core/persqueue/ut/sourceid_ut.cpp

+20
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,26 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
460460
}
461461
}
462462

463+
Y_UNIT_TEST(ExpensiveCleanup) {
464+
TSourceIdStorage storage;
465+
ui64 offset = 0;
466+
467+
// initial info w/o heartbeats
468+
for (ui32 i = 1; i <= 100000; ++i) {
469+
storage.RegisterSourceId(TestSourceId(i), MakeExplicitSourceIdInfo(++offset));
470+
}
471+
472+
NKikimrPQ::TPartitionConfig config;
473+
config.SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds());
474+
475+
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
476+
for (ui32 i = 0; i < 1000; ++i) {
477+
Cerr << "Iteration " << i << "\n";
478+
const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 1'000'000, TPartitionId(TestPartition), config);
479+
UNIT_ASSERT_EQUAL(dropped, false);
480+
}
481+
482+
}
463483
} // TSourceIdTests
464484

465485
} // namespace NKikimr::NPQ

0 commit comments

Comments
 (0)