Skip to content

Commit 026de84

Browse files
authored
Merge 5c64578 into 4c31460
2 parents 4c31460 + 5c64578 commit 026de84

23 files changed

+378
-27
lines changed

ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

+9-2
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,15 @@ void TNodeWarden::Bootstrap() {
169169
DsProxyPerPoolCounters = new TDsProxyPerPoolCounters(AppData()->Counters);
170170

171171
if (actorSystem && actorSystem->AppData<TAppData>() && actorSystem->AppData<TAppData>()->Icb) {
172-
actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
173-
actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
172+
const TIntrusivePtr<NKikimr::TControlBoard>& icb = actorSystem->AppData<TAppData>()->Icb;
173+
174+
icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
175+
icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
176+
icb->RegisterSharedControl(EnableLocalSyncLogDataCutting, "VDiskControls.EnableLocalSyncLogDataCutting");
177+
icb->RegisterSharedControl(EnableSyncLogChunkCompressionHDD, "VDiskControls.EnableSyncLogChunkCompressionHDD");
178+
icb->RegisterSharedControl(EnableSyncLogChunkCompressionSSD, "VDiskControls.EnableSyncLogChunkCompressionSSD");
179+
icb->RegisterSharedControl(MaxSyncLogChunksInFlightHDD, "VDiskControls.MaxSyncLogChunksInFlightHDD");
180+
icb->RegisterSharedControl(MaxSyncLogChunksInFlightSSD, "VDiskControls.MaxSyncLogChunksInFlightSSD");
174181
}
175182

176183
// start replication broker

ydb/core/blobstorage/nodewarden/node_warden_impl.h

+11
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ namespace NKikimr::NStorage {
133133
TControlWrapper EnablePutBatching;
134134
TControlWrapper EnableVPatch;
135135

136+
TControlWrapper EnableLocalSyncLogDataCutting;
137+
TControlWrapper EnableSyncLogChunkCompressionHDD;
138+
TControlWrapper EnableSyncLogChunkCompressionSSD;
139+
TControlWrapper MaxSyncLogChunksInFlightHDD;
140+
TControlWrapper MaxSyncLogChunksInFlightSSD;
141+
136142
TReplQuoter::TPtr ReplNodeRequestQuoter;
137143
TReplQuoter::TPtr ReplNodeResponseQuoter;
138144

@@ -148,6 +154,11 @@ namespace NKikimr::NStorage {
148154
: Cfg(cfg)
149155
, EnablePutBatching(Cfg->FeatureFlags.GetEnablePutBatchingForBlobStorage(), false, true)
150156
, EnableVPatch(Cfg->FeatureFlags.GetEnableVPatch(), false, true)
157+
, EnableLocalSyncLogDataCutting(0, 0, 1)
158+
, EnableSyncLogChunkCompressionHDD(1, 0, 1)
159+
, EnableSyncLogChunkCompressionSSD(0, 0, 1)
160+
, MaxSyncLogChunksInFlightHDD(10, 1, 1024)
161+
, MaxSyncLogChunksInFlightSSD(10, 1, 1024)
151162
{
152163
Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1);
153164
AvailDomainId = 1;

ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,16 @@ namespace NKikimr::NStorage {
172172
vdiskConfig->EnableVDiskCooldownTimeout = Cfg->EnableVDiskCooldownTimeout;
173173
vdiskConfig->ReplPausedAtStart = Cfg->VDiskReplPausedAtStart;
174174
vdiskConfig->EnableVPatch = EnableVPatch;
175+
176+
vdiskConfig->EnableLocalSyncLogDataCutting = EnableLocalSyncLogDataCutting;
177+
if (deviceType == NPDisk::EDeviceType::DEVICE_TYPE_ROT) {
178+
vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionHDD;
179+
vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightHDD;
180+
} else {
181+
vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionSSD;
182+
vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightSSD;
183+
}
184+
175185
vdiskConfig->FeatureFlags = Cfg->FeatureFlags;
176186

177187
if (Cfg->BlobStorageConfig.HasCostMetricsSettings()) {

ydb/core/blobstorage/ut_blobstorage/sync.cpp

+101
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,108 @@
1+
#include <ydb/core/blobstorage/ut_blobstorage/ut_helpers.h>
12
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
23
#include <ydb/core/blobstorage/vdisk/common/vdisk_private_events.h>
4+
#include <util/random/random.h>
35

46
Y_UNIT_TEST_SUITE(BlobStorageSync) {
7+
8+
void TestCutting(TBlobStorageGroupType groupType) {
9+
const ui32 groupSize = groupType.BlobSubgroupSize();
10+
11+
// for (ui32 mask = 0; mask < (1 << groupSize); ++mask) { // TIMEOUT
12+
{
13+
ui32 mask = RandomNumber(1ull << groupSize);
14+
for (bool compressChunks : { true, false }) {
15+
TEnvironmentSetup env{{
16+
.NodeCount = groupSize,
17+
.Erasure = groupType,
18+
}};
19+
20+
env.CreateBoxAndPool(1, 1);
21+
std::vector<ui32> groups = env.GetGroups();
22+
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
23+
ui32 groupId = groups[0];
24+
25+
const ui64 tabletId = 5000;
26+
const ui32 channel = 10;
27+
ui32 gen = 1;
28+
ui32 step = 1;
29+
ui64 cookie = 1;
30+
31+
ui64 totalSize = 0;
32+
33+
std::vector<TControlWrapper> cutLocalSyncLogControls;
34+
std::vector<TControlWrapper> compressChunksControls;
35+
std::vector<TActorId> edges;
36+
37+
for (ui32 nodeId = 1; nodeId <= groupSize; ++nodeId) {
38+
cutLocalSyncLogControls.emplace_back(0, 0, 1);
39+
compressChunksControls.emplace_back(1, 0, 1);
40+
TAppData* appData = env.Runtime->GetNode(nodeId)->AppData.get();
41+
appData->Icb->RegisterSharedControl(cutLocalSyncLogControls.back(), "VDiskControls.EnableLocalSyncLogDataCutting");
42+
appData->Icb->RegisterSharedControl(compressChunksControls.back(), "VDiskControls.EnableSyncLogChunkCompressionHDD");
43+
edges.push_back(env.Runtime->AllocateEdgeActor(nodeId));
44+
}
45+
46+
for (ui32 i = 0; i < groupSize; ++i) {
47+
env.Runtime->WrapInActorContext(edges[i], [&] {
48+
SendToBSProxy(edges[i], groupId, new TEvBlobStorage::TEvStatus(TInstant::Max()));
49+
});
50+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(edges[i], false);
51+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
52+
}
53+
54+
auto writeBlob = [&](ui32 nodeId, ui32 blobSize) {
55+
TLogoBlobID blobId(tabletId, gen, step, channel, blobSize, ++cookie);
56+
totalSize += blobSize;
57+
TString data = MakeData(blobSize);
58+
59+
const TActorId& sender = edges[nodeId - 1];
60+
env.Runtime->WrapInActorContext(sender, [&] () {
61+
SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvPut(blobId, std::move(data), TInstant::Max()));
62+
});
63+
};
64+
65+
env.Runtime->FilterFunction = [&](ui32/* nodeId*/, std::unique_ptr<IEventHandle>& ev) {
66+
switch(ev->Type) {
67+
case TEvBlobStorage::TEvPutResult::EventType:
68+
UNIT_ASSERT_VALUES_EQUAL(ev->Get<TEvBlobStorage::TEvPutResult>()->Status, NKikimrProto::OK);
69+
return false;
70+
default:
71+
return true;
72+
}
73+
};
74+
75+
while (totalSize < 16_MB) {
76+
writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
77+
}
78+
env.Sim(TDuration::Minutes(5));
79+
80+
for (ui32 i = 0; i < groupSize; ++i) {
81+
cutLocalSyncLogControls[i] = !!(mask & (1 << i));
82+
compressChunksControls[i] = compressChunks;
83+
}
84+
85+
while (totalSize < 32_MB) {
86+
writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
87+
}
88+
89+
env.Sim(TDuration::Minutes(5));
90+
}
91+
}
92+
}
93+
94+
Y_UNIT_TEST(TestSyncLogCuttingMirror3dc) {
95+
TestCutting(TBlobStorageGroupType::ErasureMirror3dc);
96+
}
97+
98+
Y_UNIT_TEST(TestSyncLogCuttingMirror3of4) {
99+
TestCutting(TBlobStorageGroupType::ErasureMirror3of4);
100+
}
101+
102+
Y_UNIT_TEST(TestSyncLogCuttingBlock4Plus2) {
103+
TestCutting(TBlobStorageGroupType::Erasure4Plus2Block);
104+
}
105+
5106
Y_UNIT_TEST(SyncWhenDiskGetsDown) {
6107
return; // re-enable when protocol issue is resolved
7108

ydb/core/blobstorage/ut_blobstorage/ut_helpers.h

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ namespace NKikimr {
77

88
TString MakeData(ui32 dataSize);
99

10+
template<typename Int1 = ui32, typename Int2 = ui32>
11+
inline Int1 GenerateRandom(Int1 min, Int2 max) {
12+
return min + RandomNumber(max - min);
13+
}
14+
1015
class TInflightActor : public TActorBootstrapped<TInflightActor> {
1116
public:
1217
struct TSettings {

ydb/core/blobstorage/vdisk/common/vdisk_config.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ namespace NKikimr {
6363
SyncLogAdvisedIndexedBlockSize = ui32(1) << ui32(20); // 1 MB
6464
SyncLogMaxMemAmount = ui64(64) << ui64(20); // 64 MB
6565

66+
MaxSyncLogChunkSize = ui32(16) << ui32(10); // 32 Kb
67+
6668
ReplTimeInterval = TDuration::Seconds(60); // 60 seconds
6769
ReplRequestTimeout = TDuration::Seconds(10); // 10 seconds
6870
ReplPlanQuantum = TDuration::MilliSeconds(100); // 100 ms

ydb/core/blobstorage/vdisk/common/vdisk_config.h

+5
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ namespace NKikimr {
153153
ui32 SyncLogAdvisedIndexedBlockSize;
154154
ui64 SyncLogMaxMemAmount;
155155

156+
TControlWrapper EnableLocalSyncLogDataCutting;
157+
TControlWrapper EnableSyncLogChunkCompression;
158+
TControlWrapper MaxSyncLogChunksInFlight;
159+
ui32 MaxSyncLogChunkSize;
160+
156161
///////////// REPL SETTINGS /////////////////////////
157162
TDuration ReplTimeInterval;
158163
TDuration ReplRequestTimeout;

ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -513,11 +513,13 @@ namespace NKikimr {
513513

514514
///////////////// SYNC //////////////////////////////////////////////////////
515515
TLsnSeg THull::AllocateLsnForSyncDataCmd(const TString &data) {
516+
NSyncLog::TFragmentReader fragment(data);
517+
516518
// count number of elements
517519
ui32 counter = 0;
518520
auto count = [&counter] (const void *) { counter++; };
519521
// do job - count all elements
520-
NSyncLog::TFragmentReader(data).ForEach(count, count, count, count);
522+
fragment.ForEach(count, count, count, count);
521523

522524
// allocate LsnSeg; we reserve a diapason of lsns since we put multiple records
523525
ui64 lsnAdvance = counter;
@@ -536,7 +538,7 @@ namespace NKikimr {
536538
curLsn++;
537539
};
538540
// do job - update blocks cache
539-
NSyncLog::TFragmentReader(data).ForEach(otherHandler, blockHandler, otherHandler, blockHandlerV2);
541+
fragment.ForEach(otherHandler, blockHandler, otherHandler, blockHandlerV2);
540542
// check that all records are applied
541543
Y_DEBUG_ABORT_UNLESS(curLsn == seg.Last + 1);
542544

ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp

+117
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "blobstorage_syncer_localwriter.h"
22
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h>
3+
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h>
34

45
namespace NKikimr {
56

@@ -186,5 +187,121 @@ namespace NKikimr {
186187
return new TLocalSyncDataExtractorActor(vctx, skeletonId, parentId, std::move(ev));
187188
}
188189

190+
///////////////////////////////////////////////////////////////////////////////////////////////
191+
// TLocalSyncDataCutterActor -- actor extracts data from TEvLocalSyncData, cuts it into
192+
// smaller chunks and sends in multiple messages to Skeleton
193+
///////////////////////////////////////////////////////////////////////////////////////////////
194+
class TLocalSyncDataCutterActor : public TActorBootstrapped<TLocalSyncDataCutterActor> {
195+
TIntrusivePtr<TVDiskConfig> VConfig;
196+
TIntrusivePtr<TVDiskContext> VCtx;
197+
TActorId SkeletonId;
198+
TActorId ParentId;
199+
std::unique_ptr<TEvLocalSyncData> Ev;
200+
std::vector<TString> Chunks;
201+
202+
ui32 ChunksInFlight = 0;
203+
bool CompressChunks;
204+
ui32 MaxChunksInFlight;
205+
ui32 MaxChunksSize;
206+
207+
public:
208+
void Bootstrap(const TActorContext &ctx) {
209+
TInstant startTime = TAppData::TimeProvider->Now();
210+
std::unique_ptr<NSyncLog::TNaiveFragmentWriter> fragmentWriter;
211+
212+
if (CompressChunks) {
213+
fragmentWriter.reset(new NSyncLog::TLz4FragmentWriter);
214+
} else {
215+
fragmentWriter.reset(new NSyncLog::TNaiveFragmentWriter);
216+
}
217+
218+
auto addChunk = [&]() {
219+
if (fragmentWriter->GetSize()) {
220+
TString chunk;
221+
fragmentWriter->Finish(&chunk);
222+
Chunks.emplace_back(std::move(chunk));
223+
fragmentWriter->Clear();
224+
}
225+
};
226+
227+
auto copy = [&] (const void* ptr) {
228+
const NSyncLog::TRecordHdr* rec = (const NSyncLog::TRecordHdr*)((char*)ptr - sizeof(NSyncLog::TRecordHdr));
229+
if (fragmentWriter->GetSize() + rec->GetSize() > MaxChunksSize) {
230+
addChunk();
231+
}
232+
fragmentWriter->Push(rec, rec->GetSize());
233+
};
234+
235+
NSyncLog::TFragmentReader fragmentReader(Ev->Data);
236+
fragmentReader.ForEach(copy, copy, copy, copy);
237+
238+
addChunk();
239+
240+
TInstant finishTime = TAppData::TimeProvider->Now();
241+
LOG_DEBUG_S(ctx, NKikimrServices::BS_SYNCER, VCtx->VDiskLogPrefix
242+
<< "TLocalSyncDataCutterActor: VDiskId# " << Ev->VDiskID.ToString()
243+
<< " dataSize# " << Ev->Data.size()
244+
<< " duration# %s" << (finishTime - startTime));
245+
246+
Become(&TThis::StateFunc);
247+
}
248+
249+
void Finish(const TActorContext& ctx, const NKikimrProto::EReplyStatus& status) {
250+
ctx.Send(ParentId, new TEvLocalSyncDataResult(status, TAppData::TimeProvider->Now(), nullptr, nullptr));
251+
PassAway();
252+
}
253+
254+
void Handle(const TEvLocalSyncDataResult::TPtr& ev, const TActorContext& ctx) {
255+
if (ev->Get()->Status == NKikimrProto::OK) {
256+
--ChunksInFlight;
257+
if (Chunks.empty() && ChunksInFlight == 0) {
258+
Finish(ctx, NKikimrProto::OK);
259+
} else {
260+
SendChunks(ctx);
261+
}
262+
} else {
263+
Finish(ctx, ev->Get()->Status);
264+
}
265+
}
266+
267+
void SendChunks(const TActorContext& ctx) {
268+
while (ChunksInFlight < MaxChunksInFlight && !Chunks.empty()) {
269+
ctx.Send(SkeletonId, new TEvLocalSyncData(Ev->VDiskID, Ev->SyncState, std::move(Chunks.back())));
270+
Chunks.pop_back();
271+
++ChunksInFlight;
272+
}
273+
}
274+
275+
public:
276+
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
277+
return NKikimrServices::TActivity::VDISK_LOCALSYNCDATA_CUTTER;
278+
}
279+
280+
TLocalSyncDataCutterActor(
281+
const TIntrusivePtr<TVDiskConfig>& vconfig,
282+
const TIntrusivePtr<TVDiskContext>& vctx,
283+
const TActorId& skeletonId,
284+
const TActorId& parentId,
285+
std::unique_ptr<TEvLocalSyncData> ev)
286+
: VCtx(vctx)
287+
, SkeletonId(skeletonId)
288+
, ParentId(parentId)
289+
, Ev(std::move(ev))
290+
, CompressChunks(vconfig->MaxSyncLogChunksInFlight)
291+
, MaxChunksInFlight(vconfig->MaxSyncLogChunksInFlight)
292+
, MaxChunksSize(vconfig->MaxSyncLogChunkSize)
293+
{}
294+
295+
STRICT_STFUNC(StateFunc, {
296+
HFunc(TEvLocalSyncDataResult, Handle);
297+
})
298+
299+
};
300+
301+
IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx,
302+
const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev) {
303+
return new TLocalSyncDataCutterActor(vconfig, vctx, skeletonId, parentId, std::move(ev));
304+
}
305+
189306

190307
} // NKikimr

ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h

+6
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,10 @@ namespace NKikimr {
6969
IActor *CreateLocalSyncDataExtractor(const TIntrusivePtr<TVDiskContext> &vctx, const TActorId &skeletonId,
7070
const TActorId &parentId, std::unique_ptr<TEvLocalSyncData> ev);
7171

72+
///////////////////////////////////////////////////////////////////////////////////////////////
73+
// CreateLocalSyncDataCutter
74+
///////////////////////////////////////////////////////////////////////////////////////////////
75+
IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx,
76+
const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev);
77+
7278
} // NKikimr

0 commit comments

Comments
 (0)