Skip to content

Commit b399caf

Browse files
authored
Add patching to keyvalue (#1549)
* Add patching to keyvalue * Improve patching in kv * fix incorrect cookie choosing
1 parent 4ac856a commit b399caf

File tree

10 files changed

+522
-32
lines changed

10 files changed

+522
-32
lines changed

ydb/core/base/blobstorage.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,10 @@ struct TEvBlobStorage {
14681468
}
14691469
}
14701470

1471+
static ui8 BlobPlacementKind(const TLogoBlobID &blob) {
1472+
return blob.Hash() % BaseDomainsCount;
1473+
}
1474+
14711475
static bool GetBlobIdWithSamePlacement(const TLogoBlobID &originalId, TLogoBlobID *patchedId,
14721476
ui32 bitsForBruteForce, ui32 originalGroupId, ui32 currentGroupId)
14731477
{
@@ -2407,4 +2411,19 @@ inline bool SendPutToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorag
24072411
// TODO(alexvru): check if return status is actually needed?
24082412
}
24092413

2414+
inline bool SendPatchToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorageInfo *storage,
2415+
THolder<TEvBlobStorage::TEvPatch> event, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
2416+
auto checkGroupId = [&] {
2417+
const TLogoBlobID &id = event->PatchedId;
2418+
const ui32 expectedGroupId = storage->GroupFor(id.Channel(), id.Generation());
2419+
const TLogoBlobID &originalId = event->OriginalId;
2420+
const ui32 expectedOriginalGroupId = storage->GroupFor(originalId.Channel(), originalId.Generation());
2421+
return id.TabletID() == storage->TabletID && expectedGroupId != Max<ui32>() && groupId == expectedGroupId && event->OriginalGroupId == expectedOriginalGroupId;
2422+
};
2423+
Y_VERIFY_S(checkGroupId(), "groupIds# (" << event->OriginalGroupId << ',' << groupId << ") does not match actual ones LogoBlobIds# (" <<
2424+
event->OriginalId.ToString() << ',' << event->PatchedId.ToString() << ')');
2425+
return SendToBSProxy(ctx, groupId, event.Release(), cookie, std::move(traceId));
2426+
// TODO(alexvru): check if return status is actually needed?
2427+
}
2428+
24102429
} // NKikimr

ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ namespace NKikimr {
5555
"not implemented")), 0, ev->Cookie);
5656
}
5757

58+
void Handle(TEvBlobStorage::TEvPatch::TPtr& ev) {
59+
STLOG(PRI_DEBUG, BS_PROXY, BSPM10, "TEvPatch", (Msg, ev->Get()->ToString()));
60+
Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
61+
}
62+
5863
template<typename TOut, typename TIn>
5964
TOut *CopyExecutionRelay(TIn *in, TOut *out) {
6065
out->ExecutionRelay = std::move(in->ExecutionRelay);
@@ -80,6 +85,7 @@ namespace NKikimr {
8085
hFunc(TEvBlobStorage::TEvRange, Handle);
8186
hFunc(TEvBlobStorage::TEvCollectGarbage, Handle);
8287
hFunc(TEvBlobStorage::TEvStatus, Handle);
88+
hFunc(TEvBlobStorage::TEvPatch, Handle);
8389

8490
hFunc(TEvents::TEvPoisonPill, HandlePoison);
8591
hFunc(TEvBlobStorage::TEvConfigureProxy, Handle);

ydb/core/blobstorage/dsproxy/mock/model.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <cstring>
34
#include <ydb/core/base/blobstorage.h>
45
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_space_color.h>
56

@@ -149,6 +150,64 @@ namespace NFake {
149150
return result.release();
150151
}
151152

153+
TEvBlobStorage::TEvPatchResult* Handle(TEvBlobStorage::TEvPatch *msg) {
154+
// ensure we have full blob id, with PartId set to zero
155+
const TLogoBlobID& id = msg->PatchedId;
156+
Y_ABORT_UNLESS(id == id.FullID());
157+
158+
// validate put against set blocks
159+
if (IsBlocked(id.TabletID(), id.Generation())) {
160+
return new TEvBlobStorage::TEvPatchResult(NKikimrProto::BLOCKED, id, GetStorageStatusFlags(), GroupId, 0.f);
161+
}
162+
163+
// check if this blob is not being collected -- writing such blob is a violation of BS contract
164+
Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data());
165+
166+
167+
const TLogoBlobID& originalId = msg->OriginalId;
168+
auto it = Blobs.find(originalId);
169+
if (it == Blobs.end()) {
170+
// ensure this blob is not under GC
171+
Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data());
172+
return new TEvBlobStorage::TEvPatchResult(NKikimrProto::ERROR, id, GetStorageStatusFlags(), GroupId, 0.f);
173+
}
174+
175+
auto& data = it->second;
176+
// TODO(kruall): check bad diffs
177+
TString buffer = TString::Uninitialized(data.Buffer.GetSize());
178+
auto originalBuffer = data.Buffer.GetContiguousSpan();
179+
memcpy(buffer.Detach(), originalBuffer.data(), buffer.size());
180+
for (ui32 diffIdx = 0; diffIdx < msg->DiffCount; ++diffIdx) {
181+
auto &diff = msg->Diffs[diffIdx];
182+
auto diffBuffer = diff.Buffer.GetContiguousSpan();
183+
memcpy(buffer.Detach() + diff.Offset, diffBuffer.data(), diffBuffer.size());
184+
}
185+
186+
187+
// validate that there are no blobs with the same gen/step, channel, cookie, but with different size
188+
const TLogoBlobID base(id.TabletID(), id.Generation(), id.Step(), id.Channel(), 0, id.Cookie());
189+
auto iter = Blobs.lower_bound(base);
190+
if (iter != Blobs.end()) {
191+
const TLogoBlobID& existing = iter->first;
192+
Y_ABORT_UNLESS(
193+
id.TabletID() != existing.TabletID() ||
194+
id.Generation() != existing.Generation() ||
195+
id.Step() != existing.Step() ||
196+
id.Cookie() != existing.Cookie() ||
197+
id.Channel() != existing.Channel() ||
198+
id == existing,
199+
"id# %s existing# %s", id.ToString().data(), existing.ToString().data());
200+
if (id == existing) {
201+
Y_ABORT_UNLESS(iter->second.Buffer == buffer);
202+
}
203+
}
204+
205+
// put an entry into logo blobs database and reply with success
206+
Blobs.emplace(id, TRope(buffer));
207+
208+
return new TEvBlobStorage::TEvPatchResult(NKikimrProto::OK, id, GetStorageStatusFlags(), GroupId, 0.f);
209+
}
210+
152211
TEvBlobStorage::TEvBlockResult* Handle(TEvBlobStorage::TEvBlock *msg) {
153212
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
154213

ydb/core/keyvalue/keyvalue_intermediate.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,30 @@ struct TIntermediate {
102102
struct TSetExecutorFastLogPolicy {
103103
bool IsAllowed;
104104
};
105+
struct TPatch {
106+
struct TDiff {
107+
ui32 Offset;
108+
TRope Buffer;
109+
};
110+
111+
TString OriginalKey;
112+
TLogoBlobID OriginalBlobId;
113+
TString PatchedKey;
114+
TLogoBlobID PatchedBlobId;
115+
116+
NKikimrProto::EReplyStatus Status;
117+
TStorageStatusFlags StatusFlags;
118+
119+
TVector<TDiff> Diffs;
120+
};
105121

106-
using TCmd = std::variant<TWrite, TDelete, TRename, TCopyRange, TConcat>;
122+
using TCmd = std::variant<TWrite, TDelete, TRename, TCopyRange, TConcat, TPatch>;
107123
using TReadCmd = std::variant<TRead, TRangeRead>;
108124

109125
TDeque<TRead> Reads;
110126
TDeque<TRangeRead> RangeReads;
111127
TDeque<TWrite> Writes;
128+
TDeque<TPatch> Patches;
112129
TDeque<TDelete> Deletes;
113130
TDeque<TRename> Renames;
114131
TDeque<TCopyRange> CopyRanges;
@@ -120,6 +137,7 @@ struct TIntermediate {
120137

121138
TStackVec<TCmd, 1> Commands;
122139
TStackVec<ui32, 1> WriteIndices;
140+
TStackVec<ui32, 1> PatchIndices;
123141
std::optional<TReadCmd> ReadCommand;
124142

125143
ui64 WriteCount = 0;

0 commit comments

Comments
 (0)