Skip to content

Commit 0b3d29b

Browse files
PQ tablet removes blobs asynchronously (#13937)
1 parent 65d1e86 commit 0b3d29b

File tree

14 files changed

+542
-80
lines changed

14 files changed

+542
-80
lines changed

ydb/core/persqueue/blob_refcounter.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#pragma once
2+
3+
#include <util/generic/string.h>
4+
5+
#include <memory>
6+
#include <vector>
7+
#include <cstddef>
8+
9+
namespace NKikimr::NPQ {
10+
11+
// Stores the block key. `std::shared_ptr` is used for reference counting. To create a token, the constructor
12+
// method in `TPartition' is used. It creates a special destructor that places the key in the deletion queue
13+
// before deleting the token.
14+
struct TBlobKeyToken {
15+
TString Key;
16+
bool NeedDelete = true;
17+
};
18+
19+
using TBlobKeyTokenPtr = std::shared_ptr<TBlobKeyToken>;
20+
21+
// It is used for synchronization between deleting and reading blocks. You cannot delete the blocks that the client
22+
// reads from. The keys are placed in the collection for the duration of the reading.
23+
struct TBlobKeyTokens {
24+
void Append(TBlobKeyTokenPtr token) { Tokens.push_back(std::move(token)); }
25+
size_t Size() const { return Tokens.size(); }
26+
27+
std::vector<TBlobKeyTokenPtr> Tokens;
28+
};
29+
30+
}

ydb/core/persqueue/events/internal.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/base/row_version.h>
66
#include <ydb/core/protos/pqconfig.pb.h>
77
#include <ydb/core/persqueue/blob.h>
8+
#include <ydb/core/persqueue/blob_refcounter.h>
89
#include <ydb/core/persqueue/key.h>
910
#include <ydb/core/persqueue/metering_sink.h>
1011
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
@@ -73,9 +74,10 @@ namespace NPQ {
7374

7475
struct TDataKey {
7576
TKey Key;
76-
ui32 Size;
77+
ui32 Size = 0;
7778
TInstant Timestamp;
78-
ui64 CumulativeSize;
79+
ui64 CumulativeSize = 0;
80+
TBlobKeyTokenPtr BlobKeyToken = nullptr;
7981
};
8082

8183
struct TErrorInfo {

ydb/core/persqueue/partition.cpp

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
433433
const ui64 importantConsumerMinOffset = ImportantClientsMinOffset();
434434

435435
bool hasDrop = false;
436-
while(DataKeysBody.size() > 1) {
436+
while (DataKeysBody.size() > 1) {
437437
auto& nextKey = DataKeysBody[1].Key;
438438
if (importantConsumerMinOffset < nextKey.GetOffset()) {
439439
// The first message in the next blob was not read by an important consumer.
@@ -481,14 +481,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
481481
++StartOffset;
482482
}
483483

484-
TKey firstKey(TKeyPrefix::TypeData, Partition, 0, 0, 0, 0); //will drop all that could not be dropped before of case of full disks
485-
486-
auto del = request->Record.AddCmdDeleteRange();
487-
auto range = del->MutableRange();
488-
range->SetFrom(firstKey.Data(), firstKey.Size());
489-
range->SetIncludeFrom(true);
490-
range->SetTo(lastKey.Data(), lastKey.Size());
491-
range->SetIncludeTo(StartOffset == EndOffset);
484+
Y_UNUSED(request);
492485

493486
return true;
494487
}
@@ -1999,6 +1992,9 @@ void TPartition::RunPersist() {
19991992
EndHandleRequests(PersistRequest.Get(), ctx);
20001993
//haveChanges = true;
20011994
}
1995+
1996+
TryAddDeleteHeadKeysToPersistRequest();
1997+
20021998
if (haveChanges || TxIdHasChanged || !AffectedUsers.empty() || ChangeConfig) {
20031999
WriteCycleStartTime = now;
20042000
WriteStartTime = now;
@@ -2059,6 +2055,62 @@ void TPartition::RunPersist() {
20592055
PersistRequest = nullptr;
20602056
}
20612057

2058+
void TPartition::TryAddDeleteHeadKeysToPersistRequest()
2059+
{
2060+
while (!DeletedKeys.empty()) {
2061+
auto& k = DeletedKeys.back();
2062+
2063+
auto* cmd = PersistRequest->Record.AddCmdDeleteRange();
2064+
auto* range = cmd->MutableRange();
2065+
2066+
range->SetFrom(k.data(), k.size());
2067+
range->SetIncludeFrom(true);
2068+
range->SetTo(k.data(), k.size());
2069+
range->SetIncludeTo(true);
2070+
2071+
DeletedKeys.pop_back();
2072+
}
2073+
}
2074+
2075+
void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request)
2076+
{
2077+
PQ_LOG_D("=== DumpKeyValueRequest ===");
2078+
PQ_LOG_D("--- delete ----------------");
2079+
for (size_t i = 0; i < request.CmdDeleteRangeSize(); ++i) {
2080+
const auto& cmd = request.GetCmdDeleteRange(i);
2081+
const auto& range = cmd.GetRange();
2082+
PQ_LOG_D((range.GetIncludeFrom() ? '[' : '(') << range.GetFrom() <<
2083+
", " <<
2084+
range.GetTo() << (range.GetIncludeTo() ? ']' : ')'));
2085+
}
2086+
PQ_LOG_D("--- write -----------------");
2087+
for (size_t i = 0; i < request.CmdWriteSize(); ++i) {
2088+
const auto& cmd = request.GetCmdWrite(i);
2089+
PQ_LOG_D(cmd.GetKey());
2090+
}
2091+
PQ_LOG_D("--- rename ----------------");
2092+
for (size_t i = 0; i < request.CmdRenameSize(); ++i) {
2093+
const auto& cmd = request.GetCmdRename(i);
2094+
PQ_LOG_D(cmd.GetOldKey() << ", " << cmd.GetNewKey());
2095+
}
2096+
PQ_LOG_D("===========================");
2097+
}
2098+
2099+
TBlobKeyTokenPtr TPartition::MakeBlobKeyToken(const TString& key)
2100+
{
2101+
// The number of links counts is `std::shared_ptr'. It is possible to set its own destructor,
2102+
// which adds the key to the queue for deletion before freeing the memory.
2103+
auto ptr = std::make_unique<TBlobKeyToken>(key);
2104+
2105+
auto deleter = [this](TBlobKeyToken* token) {
2106+
if (token->NeedDelete) {
2107+
DeletedKeys.emplace_back(std::move(token->Key));
2108+
}
2109+
delete token;
2110+
};
2111+
2112+
return {ptr.release(), std::move(deleter)};
2113+
}
20622114

20632115
void TPartition::AnswerCurrentReplies(const TActorContext& ctx)
20642116
{
@@ -2289,9 +2341,10 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22892341
if (write && !write->Value.empty()) {
22902342
AddCmdWrite(write, PersistRequest.Get(), ctx);
22912343
CompactedKeys.emplace_back(write->Key, write->Value.size());
2292-
ClearOldHead(write->Key.GetOffset(), write->Key.GetPartNo(), PersistRequest.Get());
22932344
}
22942345
Parameters->CurOffset += k.Key.GetCount();
2346+
// The key does not need to be deleted, as it will be renamed
2347+
k.BlobKeyToken->NeedDelete = false;
22952348
}
22962349

22972350
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
@@ -3353,12 +3406,10 @@ void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) {
33533406
void TPartition::ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request) {
33543407
for (auto it = HeadKeys.rbegin(); it != HeadKeys.rend(); ++it) {
33553408
if (it->Key.GetOffset() > offset || it->Key.GetOffset() == offset && it->Key.GetPartNo() >= partNo) {
3356-
auto del = request->Record.AddCmdDeleteRange();
3357-
auto range = del->MutableRange();
3358-
range->SetFrom(it->Key.Data(), it->Key.Size());
3359-
range->SetIncludeFrom(true);
3360-
range->SetTo(it->Key.Data(), it->Key.Size());
3361-
range->SetIncludeTo(true);
3409+
// The repackaged blocks will be deleted after writing.
3410+
DefferedKeysForDeletion.push_back(std::move(it->BlobKeyToken));
3411+
3412+
Y_UNUSED(request);
33623413
} else {
33633414
break;
33643415
}

ydb/core/persqueue/partition.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
295295
const TActorContext& ctx);
296296
// will return rcount and rsize also
297297
TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount,
298-
const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset);
298+
const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset,
299+
TBlobKeyTokens* blobKeyTokens);
299300
TVector<TClientBlob> GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount,
300301
const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount,
301302
ui32* rsize, ui64* insideHeadOffset, ui64 lastOffset);
@@ -687,6 +688,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
687688
TMessageQueue PendingRequests;
688689
TMessageQueue QuotaWaitingRequests;
689690

691+
std::deque<TString> DeletedKeys;
692+
std::deque<TBlobKeyTokenPtr> DefferedKeysForDeletion;
693+
690694
THead Head;
691695
THead NewHead;
692696
TPartitionedBlob PartitionedBlob;
@@ -977,6 +981,11 @@ class TPartition : public TActorBootstrapped<TPartition> {
977981
void UpdateAvgWriteBytes(ui64 size, const TInstant& now);
978982

979983
size_t WriteNewSizeFromSupportivePartitions = 0;
984+
985+
void TryAddDeleteHeadKeysToPersistRequest();
986+
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request);
987+
988+
TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key);
980989
};
981990

982991
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_init.cpp

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,70 @@ void TInitDataRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
502502
};
503503
}
504504

505+
THashSet<TString> FilterBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range,
506+
const TPartitionId& partitionId)
507+
{
508+
TVector<TKey> source;
509+
510+
for (ui32 i = 0; i < range.PairSize(); ++i) {
511+
const auto& pair = range.GetPair(i);
512+
Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here
513+
source.push_back(MakeKeyFromString(pair.GetKey(), partitionId));
514+
}
515+
516+
auto isKeyLess = [](const TKey& lhs, const TKey& rhs) {
517+
auto makeOffset = [](const TKey& k) {
518+
return std::make_tuple(k.GetOffset(), k.GetPartNo());
519+
};
520+
521+
auto makeCount = [](const TKey& k) {
522+
return k.GetCount() + k.GetInternalPartsCount();
523+
};
524+
525+
const auto leftOffset = makeOffset(lhs);
526+
const auto rightOffset = makeOffset(rhs);
527+
528+
if (leftOffset < rightOffset) {
529+
return true;
530+
}
531+
532+
if (rightOffset < leftOffset) {
533+
return false;
534+
}
535+
536+
return makeCount(lhs) > makeCount(rhs);
537+
};
538+
539+
std::sort(source.begin(), source.end(), isKeyLess);
540+
541+
THashSet<TString> filtered;
542+
543+
size_t partsCount = 0;
544+
ui64 nextOffset = 0;
545+
546+
for (const auto& k : source) {
547+
if (filtered.empty() || k.GetOffset() >= nextOffset) {
548+
filtered.insert(k.ToString());
549+
partsCount = k.GetCount() + k.GetInternalPartsCount();
550+
nextOffset = k.GetOffset() + k.GetCount();
551+
} else {
552+
//Y_ABORT_UNLESS(partsCount >= k.GetCount() + k.GetInternalPartsCount(),
553+
// "Key: %s, "
554+
// "partsCount: %" PRISZT ", Count: %" PRIu32 ", InternalPartsCount: %" PRIu32
555+
// ", nextOffset: %" PRIu64,
556+
// k.ToString().data(),
557+
// partsCount, k.GetCount(), k.GetInternalPartsCount(),
558+
// nextOffset);
559+
560+
partsCount -= k.GetCount() + k.GetInternalPartsCount();
561+
562+
Y_UNUSED(partsCount);
563+
}
564+
}
565+
566+
return filtered;
567+
}
568+
505569
void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext&) {
506570
auto& endOffset = Partition()->EndOffset;
507571
auto& startOffset = Partition()->StartOffset;
@@ -511,13 +575,25 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
511575
auto& gapSize = Partition()->GapSize;
512576
auto& bodySize = Partition()->BodySize;
513577

578+
// If there are multiple keys for a message, then only the key that contains more messages remains.
579+
//
580+
// Extra keys will be added to the queue for deletion.
581+
const auto actualKeys = FilterBlobsMetaData(range,
582+
PartitionId());
583+
514584
for (ui32 i = 0; i < range.PairSize(); ++i) {
515-
auto pair = range.GetPair(i);
585+
const auto& pair = range.GetPair(i);
516586
Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here
517587
TKey k = MakeKeyFromString(pair.GetKey(), PartitionId());
588+
if (!actualKeys.contains(pair.GetKey())) {
589+
Partition()->DeletedKeys.emplace_back(k.ToString());
590+
continue;
591+
}
518592
if (dataKeysBody.empty()) { //no data - this is first pair of first range
519593
head.Offset = endOffset = startOffset = k.GetOffset();
520-
if (k.GetPartNo() > 0) ++startOffset;
594+
if (k.GetPartNo() > 0) {
595+
++startOffset;
596+
}
521597
head.PartNo = 0;
522598
} else {
523599
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%" PRIu64 " <= %" PRIu64 " %s", endOffset, k.GetOffset(), pair.GetKey().c_str());
@@ -536,9 +612,11 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
536612
PQ_LOG_D("Got data offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize()
537613
<< " so " << startOffset << " eo " << endOffset << " " << pair.GetKey()
538614
);
539-
dataKeysBody.push_back({k, pair.GetValueSize(),
540-
TInstant::Seconds(pair.GetCreationUnixTime()),
541-
dataKeysBody.empty() ? 0 : dataKeysBody.back().CumulativeSize + dataKeysBody.back().Size});
615+
dataKeysBody.emplace_back(k,
616+
pair.GetValueSize(),
617+
TInstant::Seconds(pair.GetCreationUnixTime()),
618+
dataKeysBody.empty() ? 0 : dataKeysBody.back().CumulativeSize + dataKeysBody.back().Size,
619+
Partition()->MakeBlobKeyToken(k.ToString()));
542620
}
543621

544622
Y_ABORT_UNLESS(endOffset >= startOffset);

ydb/core/persqueue/partition_read.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,8 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
591591

592592

593593
TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(
594-
const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset
594+
const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset,
595+
TBlobKeyTokens* blobKeyTokens
595596
) {
596597
Y_ABORT_UNLESS(rcount && rsize);
597598
ui32& count = *rcount;
@@ -630,6 +631,8 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(
630631
it->Key.GetInternalPartsCount(), it->Size, TString(), it->Key);
631632
blobs.push_back(reqBlob);
632633

634+
blobKeyTokens->Append(it->BlobKeyToken);
635+
633636
++it;
634637
if (it == DataKeysBody.end())
635638
break;
@@ -998,7 +1001,8 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
9981001
}
9991002

10001003
TVector<TRequestedBlob> blobs = GetReadRequestFromBody(
1001-
info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset
1004+
info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset,
1005+
&info.BlobKeyTokens
10021006
);
10031007
info.Blobs = blobs;
10041008
ui64 lastOffset = info.Offset + Min(count, info.Count);
@@ -1014,6 +1018,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10141018
);
10151019
info.CachedOffset = insideHeadOffset;
10161020
}
1021+
Y_ABORT_UNLESS(info.BlobKeyTokens.Size() == info.Blobs.size());
10171022
if (info.Destination != 0) {
10181023
++userInfo.ActiveReads;
10191024
userInfo.UpdateReadingTimeAndState(EndOffset, ctx.Now());
@@ -1044,7 +1049,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10441049
}
10451050

10461051
const TString user = info.User;
1047-
bool res = ReadInfo.insert({cookie, std::move(info)}).second;
1052+
bool res = ReadInfo.emplace(cookie, std::move(info)).second;
10481053
PQ_LOG_D("Reading cookie " << cookie << ". Send blob request.");
10491054
Y_ABORT_UNLESS(res);
10501055

0 commit comments

Comments
 (0)