Skip to content

Commit a874417

Browse files
recording a large amount of data in a transaction (#5902)
1 parent c7123e3 commit a874417

File tree

14 files changed

+550
-159
lines changed

14 files changed

+550
-159
lines changed

ydb/core/grpc_services/local_rpc/local_rpc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
123123
, RequestType(requestType)
124124
, InternalCall(internalCall)
125125
{
126-
if (token) {
126+
if (token && !token->empty()) {
127127
InternalToken = new NACLib::TUserToken(*token);
128128
}
129129
}

ydb/core/persqueue/blob.cpp

Lines changed: 83 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ TBlobIterator::TBlobIterator(const TKey& key, const TString& blob)
2424
void TBlobIterator::ParseBatch() {
2525
Y_ABORT_UNLESS(Data < End);
2626
Header = ExtractHeader(Data, End - Data);
27-
Y_ABORT_UNLESS(Header.GetOffset() == Offset);
27+
//Y_ABORT_UNLESS(Header.GetOffset() == Offset);
2828
Count += Header.GetCount();
2929
Offset += Header.GetCount();
3030
InternalPartsCount += Header.GetInternalPartsCount();
@@ -686,6 +686,13 @@ ui32 THead::FindPos(const ui64 offset, const ui16 partNo) const {
686686
return i - 1;
687687
}
688688

689+
TPartitionedBlob::TRenameFormedBlobInfo::TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size) :
690+
OldKey(oldKey),
691+
NewKey(newKey),
692+
Size(size)
693+
{
694+
}
695+
689696
TPartitionedBlob& TPartitionedBlob::operator=(const TPartitionedBlob& x)
690697
{
691698
Partition = x.Partition;
@@ -737,7 +744,8 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x)
737744
{}
738745

739746
TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
740-
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize)
747+
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize,
748+
const ui16 nextPartNo)
741749
: Partition(partition)
742750
, Offset(offset)
743751
, InternalPartsCount(0)
@@ -747,7 +755,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off
747755
, SeqNo(seqNo)
748756
, TotalParts(totalParts)
749757
, TotalSize(totalSize)
750-
, NextPartNo(0)
758+
, NextPartNo(nextPartNo)
751759
, HeadPartNo(0)
752760
, BlobsSize(0)
753761
, Head(head)
@@ -773,7 +781,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off
773781
if (HeadSize == 0) {
774782
StartOffset = offset;
775783
NewHead.Offset = offset;
776-
Y_ABORT_UNLESS(StartPartNo == 0);
784+
//Y_ABORT_UNLESS(StartPartNo == 0);
777785
}
778786
}
779787

@@ -804,58 +812,93 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe
804812
return valueD;
805813
}
806814

807-
std::optional<std::pair<TKey, TString>> TPartitionedBlob::Add(TClientBlob&& blob)
815+
auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optional<TFormedBlobInfo>
816+
{
817+
HeadPartNo = NextPartNo;
818+
ui32 count = (GlueHead ? Head.GetCount() : 0) + (GlueNewHead ? NewHead.GetCount() : 0);
819+
820+
Y_ABORT_UNLESS(Offset >= (GlueHead ? Head.Offset : NewHead.Offset));
821+
822+
Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));
823+
824+
TKey tmpKey(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);
825+
TKey dataKey(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);
826+
827+
StartOffset = Offset;
828+
StartPartNo = NextPartNo;
829+
InternalPartsCount = 0;
830+
831+
TString valueD = CompactHead(GlueHead, Head, GlueNewHead, NewHead, HeadSize + BlobsSize + (BlobsSize > 0 ? GetMaxHeaderSize() : 0));
832+
833+
GlueHead = GlueNewHead = false;
834+
if (!Blobs.empty()) {
835+
TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)};
836+
Blobs.clear();
837+
batch.Pack();
838+
Y_ABORT_UNLESS(batch.Packed);
839+
batch.SerializeTo(valueD);
840+
}
841+
842+
Y_ABORT_UNLESS(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize));
843+
HeadSize = 0;
844+
BlobsSize = 0;
845+
TClientBlob::CheckBlob(tmpKey, valueD);
846+
if (useRename) {
847+
FormedBlobs.emplace_back(tmpKey, dataKey, valueD.size());
848+
}
849+
Blobs.clear();
850+
851+
return {{useRename ? tmpKey : dataKey, valueD}};
852+
}
853+
854+
auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>
808855
{
809856
Y_ABORT_UNLESS(NewHead.Offset >= Head.Offset);
810857
ui32 size = blob.GetBlobSize();
811858
Y_ABORT_UNLESS(InternalPartsCount < 1000); //just check for future packing
812-
if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize)
859+
if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize) {
813860
NeedCompactHead = true;
861+
}
814862
if (HeadSize + BlobsSize == 0) { //if nothing to compact at all
815863
NeedCompactHead = false;
816864
}
817865

818-
std::optional<std::pair<TKey, TString>> res;
866+
std::optional<TFormedBlobInfo> res;
819867
if (NeedCompactHead) { // need form blob without last chunk, on start or in case of big head
820868
NeedCompactHead = false;
821-
HeadPartNo = NextPartNo;
822-
ui32 count = (GlueHead ? Head.GetCount() : 0) + (GlueNewHead ? NewHead.GetCount() : 0);
823-
824-
Y_ABORT_UNLESS(Offset >= (GlueHead ? Head.Offset : NewHead.Offset));
825-
826-
Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));
827-
828-
TKey key(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);
829-
830-
StartOffset = Offset;
831-
StartPartNo = NextPartNo;
832-
InternalPartsCount = 0;
833-
834-
TString valueD = CompactHead(GlueHead, Head, GlueNewHead, NewHead, HeadSize + BlobsSize + (BlobsSize > 0 ? GetMaxHeaderSize() : 0));
835-
836-
GlueHead = GlueNewHead = false;
837-
if (!Blobs.empty()) {
838-
TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)};
839-
Blobs.clear();
840-
batch.Pack();
841-
Y_ABORT_UNLESS(batch.Packed);
842-
batch.SerializeTo(valueD);
843-
}
844-
845-
Y_ABORT_UNLESS(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize));
846-
HeadSize = 0;
847-
BlobsSize = 0;
848-
TClientBlob::CheckBlob(key, valueD);
849-
FormedBlobs.emplace_back(key, valueD.size());
850-
Blobs.clear();
851-
852-
res = {key, valueD};
869+
res = CreateFormedBlob(size, true);
853870
}
854871
BlobsSize += size + GetMaxHeaderSize();
855872
++NextPartNo;
856873
Blobs.push_back(blob);
857-
if (!IsComplete())
874+
if (!IsComplete()) {
858875
++InternalPartsCount;
876+
}
877+
return res;
878+
}
879+
880+
auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TFormedBlobInfo>
881+
{
882+
std::optional<TFormedBlobInfo> res;
883+
if (NeedCompactHead) {
884+
NeedCompactHead = false;
885+
GlueNewHead = false;
886+
res = CreateFormedBlob(0, false);
887+
}
888+
889+
TKey newKey(TKeyPrefix::TypeData,
890+
Partition,
891+
NewHead.Offset + oldKey.GetOffset(),
892+
oldKey.GetPartNo(),
893+
oldKey.GetCount(),
894+
oldKey.GetInternalPartsCount(),
895+
oldKey.IsHead());
896+
897+
FormedBlobs.emplace_back(oldKey, newKey, size);
898+
899+
StartOffset += oldKey.GetCount();
900+
//NewHead.Offset += oldKey.GetOffset() + oldKey.GetCount();
901+
859902
return res;
860903
}
861904

ydb/core/persqueue/blob.h

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ struct TClientBlob {
8484
return PartData ? PartData->PartNo : 0;
8585
}
8686

87+
ui16 GetTotalParts() const {
88+
return PartData ? PartData->TotalParts : 1;
89+
}
90+
91+
ui16 GetTotalSize() const {
92+
return PartData ? PartData->TotalSize : UncompressedSize;
93+
}
94+
8795
bool IsLastPart() const {
8896
return !PartData || PartData->PartNo + 1 == PartData->TotalParts;
8997
}
@@ -184,7 +192,8 @@ struct TBatch {
184192
: Packed(true)
185193
, Header(header)
186194
, PackedData(data, header.GetPayloadSize())
187-
{}
195+
{
196+
}
188197

189198
ui32 GetPackedSize() const { Y_ABORT_UNLESS(Packed); return sizeof(ui16) + PackedData.size() + Header.ByteSize(); }
190199
void Pack();
@@ -265,9 +274,16 @@ class TPartitionedBlob {
265274
TPartitionedBlob(const TPartitionedBlob& x);
266275

267276
TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
268-
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize);
277+
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize,
278+
ui16 nextPartNo = 0);
279+
280+
struct TFormedBlobInfo {
281+
TKey Key;
282+
TString Value;
283+
};
269284

270-
std::optional<std::pair<TKey, TString>> Add(TClientBlob&& blob);
285+
std::optional<TFormedBlobInfo> Add(TClientBlob&& blob);
286+
std::optional<TFormedBlobInfo> Add(const TKey& key, ui32 size);
271287

272288
bool IsInited() const { return !SourceId.empty(); }
273289

@@ -280,11 +296,21 @@ class TPartitionedBlob {
280296

281297
bool IsNextPart(const TString& sourceId, const ui64 seqNo, const ui16 partNo, TString *reason) const;
282298

299+
struct TRenameFormedBlobInfo {
300+
TRenameFormedBlobInfo() = default;
301+
TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size);
302+
303+
TKey OldKey;
304+
TKey NewKey;
305+
ui32 Size;
306+
};
307+
283308
const std::deque<TClientBlob>& GetClientBlobs() const { return Blobs; }
284-
const std::deque<std::pair<TKey, ui32>> GetFormedBlobs() const { return FormedBlobs; }
309+
const std::deque<TRenameFormedBlobInfo>& GetFormedBlobs() const { return FormedBlobs; }
285310

286311
private:
287312
TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize);
313+
std::optional<TFormedBlobInfo> CreateFormedBlob(ui32 size, bool useRename);
288314

289315
private:
290316
TPartitionId Partition;
@@ -300,7 +326,7 @@ class TPartitionedBlob {
300326
ui16 HeadPartNo;
301327
std::deque<TClientBlob> Blobs;
302328
ui32 BlobsSize;
303-
std::deque<std::pair<TKey, ui32>> FormedBlobs;
329+
std::deque<TRenameFormedBlobInfo> FormedBlobs;
304330
THead &Head;
305331
THead &NewHead;
306332
ui32 HeadSize;

ydb/core/persqueue/key.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,18 @@ TKey MakeKeyFromString(const TString& s, const TPartitionId& partition)
2222
t.IsHead());
2323
}
2424

25+
bool TKeyPrefix::HasServiceType() const
26+
{
27+
switch (*PtrType()) {
28+
case ServiceTypeInfo:
29+
case ServiceTypeData:
30+
case ServiceTypeTmpData:
31+
case ServiceTypeMeta:
32+
case ServiceTypeTxMeta:
33+
return true;
34+
default:
35+
return false;
36+
}
37+
}
38+
2539
}

ydb/core/persqueue/key.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class TKeyPrefix : public TBuffer
5353
virtual ~TKeyPrefix()
5454
{}
5555

56+
TString ToString() const {
57+
return TString(Data(), Size());
58+
}
59+
5660
bool Marked(EMark mark) {
5761
if (Size() >= MarkedSize())
5862
return *PtrMark() == mark;
@@ -64,7 +68,7 @@ class TKeyPrefix : public TBuffer
6468

6569

6670
void SetType(EType type) {
67-
if (!IsServicePartition()) {
71+
if (!IsServicePartition() && !HasServiceType()) {
6872
*PtrType() = type;
6973
return;
7074
}
@@ -92,7 +96,6 @@ class TKeyPrefix : public TBuffer
9296
}
9397
}
9498

95-
9699
EType GetType() const {
97100
switch (*PtrType()) {
98101
case TypeNone:
@@ -130,6 +133,7 @@ class TKeyPrefix : public TBuffer
130133
Partition.InternalPartitionId = Partition.OriginalPartitionId;
131134
}
132135

136+
bool HasServiceType() const;
133137

134138
private:
135139
enum EServiceType : char {

0 commit comments

Comments
 (0)