@@ -126,62 +126,66 @@ namespace NBalancing {
126
126
return ;
127
127
}
128
128
129
+ const auto & top = GInfo->GetTopology ();
130
+ TPartsCollectorMerger merger (top.GType );
129
131
THPTimer timer;
130
132
131
133
for (ui32 cnt = 0 ; It.Valid (); It.Next (), ++cnt) {
132
- if (cnt % 128 == 127 && TDuration::Seconds (timer.Passed ()) > JOB_GRANULARITY ) {
134
+ if (cnt % 128 == 127 && TDuration::Seconds (timer.Passed ()) > Ctx-> Cfg . JobGranularity ) {
133
135
// actor should not block the thread for a long time, so we should yield
134
136
STLOG (PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " Collect keys" ), (collected, cnt), (passed, timer.Passed ()));
135
137
Send (SelfId (), new NActors::TEvents::TEvWakeup ());
136
138
return ;
137
139
}
138
140
139
- const auto & top = GInfo->GetTopology ();
140
141
const auto & key = It.GetCurKey ().LogoBlobID ();
141
142
142
- if (BALANCE_ONLY_HUGE_BLOBS && !Ctx->HugeBlobCtx ->IsHugeBlob (GInfo->Type , key, Ctx->MinREALHugeBlobInBytes )) {
143
+ if (Ctx-> Cfg . BalanceOnlyHugeBlobs && !Ctx->HugeBlobCtx ->IsHugeBlob (GInfo->Type , key, Ctx->MinREALHugeBlobInBytes )) {
143
144
// skip non huge blobs
144
145
continue ;
145
146
}
146
147
147
- TPartsCollectorMerger merger (top.GType );
148
148
It.PutToMerger (&merger);
149
149
150
150
auto [moveMask, delMask] = merger.Ingress .HandoffParts (&top, Ctx->VCtx ->ShortSelfVDisk , key);
151
151
152
- if (auto partsToSend = merger.Ingress .LocalParts (top.GType ) & moveMask; !partsToSend.Empty () && SendOnMainParts.Size () < MAX_TO_SEND_PER_EPOCH) {
153
- // collect parts to send on main
154
- for (const auto & [parts, data]: merger.Parts ) {
155
- if (!(partsToSend & parts).Empty ()) {
156
- SendOnMainParts.Data .emplace_back (TPartInfo{
157
- .Key =It.GetCurKey ().LogoBlobID (),
158
- .PartsMask =parts,
159
- .PartData =data
160
- });
152
+ // collect parts to send on main
153
+ if (Ctx->Cfg .EnableSend && SendOnMainParts.Size () < Ctx->Cfg .MaxToSendPerEpoch ) {
154
+ if (auto partsToSend = merger.Ingress .LocalParts (top.GType ) & moveMask; !partsToSend.Empty ()) {
155
+ for (const auto & [parts, data]: merger.Parts ) {
156
+ if (!(partsToSend & parts).Empty ()) {
157
+ SendOnMainParts.Data .emplace_back (TPartInfo{
158
+ .Key =It.GetCurKey ().LogoBlobID (),
159
+ .PartsMask =parts,
160
+ .PartData =data
161
+ });
162
+ }
161
163
}
162
164
}
163
165
}
164
166
165
- if (auto partsToDelete = merger.Ingress .LocalParts (top.GType ) & delMask; !partsToDelete.Empty () && TryDeleteParts.Size () < MAX_TO_DELETE_PER_EPOCH) {
166
- // collect parts to delete
167
- auto key = It.GetCurKey ().LogoBlobID ();
168
- for (ui8 partIdx = partsToDelete.FirstPosition (); partIdx < partsToDelete.GetSize (); partIdx = partsToDelete.NextPosition (partIdx)) {
169
- TryDeleteParts.Data .emplace_back (TLogoBlobID (key, partIdx + 1 ));
170
- STLOG (PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP (Ctx->VCtx , " Delete" ), (LogoBlobId, TryDeleteParts.Data .back ().ToString ()));
171
- }
167
+ // collect parts to delete
168
+ if (Ctx->Cfg .EnableDelete && TryDeleteParts.Size () < Ctx->Cfg .MaxToDeletePerEpoch ) {
169
+ if (auto partsToDelete = merger.Ingress .LocalParts (top.GType ) & delMask; !partsToDelete.Empty ()) {
170
+ auto key = It.GetCurKey ().LogoBlobID ();
171
+ for (ui8 partIdx = partsToDelete.FirstPosition (); partIdx < partsToDelete.GetSize (); partIdx = partsToDelete.NextPosition (partIdx)) {
172
+ TryDeleteParts.Data .emplace_back (TLogoBlobID (key, partIdx + 1 ));
173
+ STLOG (PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP (Ctx->VCtx , " Delete" ), (LogoBlobId, TryDeleteParts.Data .back ().ToString ()));
174
+ }
172
175
173
- for (const auto & [parts, data]: merger.Parts ) {
174
- if (!(partsToDelete & parts).Empty ()) {
175
- TryDeletePartsFullData[key].emplace_back (TPartInfo{
176
- .Key =key, .PartsMask =parts, .PartData =data
177
- });
176
+ for (const auto & [parts, data]: merger.Parts ) {
177
+ if (!(partsToDelete & parts).Empty ()) {
178
+ TryDeletePartsFullData[key].emplace_back (TPartInfo{
179
+ .Key =key, .PartsMask =parts, .PartData =data
180
+ });
181
+ }
178
182
}
179
183
}
180
184
}
181
185
182
186
merger.Clear ();
183
187
184
- if (SendOnMainParts.Size () >= MAX_TO_SEND_PER_EPOCH && TryDeleteParts.Size () >= MAX_TO_DELETE_PER_EPOCH ) {
188
+ if (SendOnMainParts.Size () >= Ctx-> Cfg . MaxToSendPerEpoch && TryDeleteParts.Size () >= Ctx-> Cfg . MaxToDeletePerEpoch ) {
185
189
// reached the limit of parts to send and delete
186
190
break ;
187
191
}
@@ -198,7 +202,7 @@ namespace NBalancing {
198
202
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " TEvCompleted" ), (Type, ev->Type ));
199
203
BatchManager.Handle (ev);
200
204
201
- if (StartTime + EPOCH_TIMEOUT < TlsActivationContext->Now ()) {
205
+ if (StartTime + Ctx-> Cfg . EpochTimeout < TlsActivationContext->Now ()) {
202
206
Ctx->MonGroup .EpochTimeouts ()++;
203
207
Send (MakeBlobStorageReplBrokerID (), new TEvReleaseReplToken);
204
208
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " Epoch timeout" ));
@@ -309,8 +313,8 @@ namespace NBalancing {
309
313
, Ctx(ctx)
310
314
, GInfo(ctx->GInfo)
311
315
, It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap)
312
- , SendOnMainParts(BATCH_SIZE )
313
- , TryDeleteParts(BATCH_SIZE )
316
+ , SendOnMainParts(Ctx->Cfg.BatchSize )
317
+ , TryDeleteParts(Ctx->Cfg.BatchSize )
314
318
, StartTime(TlsActivationContext->Now ())
315
319
{
316
320
}
0 commit comments