@@ -87,6 +87,9 @@ namespace NBalancing {
87
87
// /////////////////////////////////////////////////////////////////////////////////////////
88
88
89
89
void ContinueBalancing () {
90
+ Ctx->MonGroup .PlannedToSendOnMain () = SendOnMainParts.Data .size ();
91
+ Ctx->MonGroup .CandidatesToDelete () = TryDeleteParts.Data .size ();
92
+
90
93
if (SendOnMainParts.Empty () && TryDeleteParts.Empty ()) {
91
94
// no more parts to send or delete
92
95
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP (Ctx->VCtx , " Balancing completed" ));
@@ -101,8 +104,6 @@ namespace NBalancing {
101
104
102
105
void ScheduleJobQuant () {
103
106
Ctx->MonGroup .ReplTokenAquired ()++;
104
- Ctx->MonGroup .PlannedToSendOnMain () = SendOnMainParts.Data .size ();
105
- Ctx->MonGroup .CandidatesToDelete () = TryDeleteParts.Data .size ();
106
107
107
108
// once repl token received, start balancing - waking up sender and deleter
108
109
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP (Ctx->VCtx , " Schedule job quant" ),
@@ -125,57 +126,65 @@ namespace NBalancing {
125
126
return ;
126
127
}
127
128
129
+ const auto & top = GInfo->GetTopology ();
130
+ TPartsCollectorMerger merger (top.GType );
128
131
THPTimer timer;
129
132
130
133
for (ui32 cnt = 0 ; It.Valid (); It.Next (), ++cnt) {
131
- if (cnt % 100 == 99 && TDuration::Seconds (timer.Passed ()) > JOB_GRANULARITY ) {
134
+ if (cnt % 128 == 127 && TDuration::Seconds (timer.Passed ()) > Ctx-> Cfg . JobGranularity ) {
132
135
// actor should not block the thread for a long time, so we should yield
133
- // STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
136
+ STLOG (PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " Collect keys" ), (collected, cnt), (passed, timer.Passed ()));
134
137
Send (SelfId (), new NActors::TEvents::TEvWakeup ());
135
138
return ;
136
139
}
137
140
138
- const auto & top = GInfo->GetTopology ();
139
141
const auto & key = It.GetCurKey ().LogoBlobID ();
140
142
141
- TPartsCollectorMerger merger (top.GType );
143
+ if (Ctx->Cfg .BalanceOnlyHugeBlobs && !Ctx->HugeBlobCtx ->IsHugeBlob (GInfo->Type , key, Ctx->MinREALHugeBlobInBytes )) {
144
+ // skip non huge blobs
145
+ continue ;
146
+ }
147
+
148
+ merger.Clear ();
142
149
It.PutToMerger (&merger);
143
150
144
151
auto [moveMask, delMask] = merger.Ingress .HandoffParts (&top, Ctx->VCtx ->ShortSelfVDisk , key);
145
152
146
- if (auto partsToSend = merger.Ingress .LocalParts (top.GType ) & moveMask; !partsToSend.Empty () && SendOnMainParts.Size () < MAX_TO_SEND_PER_EPOCH) {
147
- // collect parts to send on main
148
- for (const auto & [parts, data]: merger.Parts ) {
149
- if (!(partsToSend & parts).Empty ()) {
150
- SendOnMainParts.Data .emplace_back (TPartInfo{
151
- .Key =It.GetCurKey ().LogoBlobID (),
152
- .PartsMask =parts,
153
- .PartData =data
154
- });
153
+ // collect parts to send on main
154
+ if (Ctx->Cfg .EnableSend && SendOnMainParts.Size () < Ctx->Cfg .MaxToSendPerEpoch ) {
155
+ if (auto partsToSend = merger.Ingress .LocalParts (top.GType ) & moveMask; !partsToSend.Empty ()) {
156
+ for (const auto & [parts, data]: merger.Parts ) {
157
+ if (!(partsToSend & parts).Empty ()) {
158
+ SendOnMainParts.Data .emplace_back (TPartInfo{
159
+ .Key =It.GetCurKey ().LogoBlobID (),
160
+ .PartsMask =parts,
161
+ .PartData =data
162
+ });
163
+ }
155
164
}
156
165
}
157
166
}
158
167
159
- if (auto partsToDelete = merger.Ingress .LocalParts (top.GType ) & delMask; !partsToDelete.Empty () && TryDeleteParts.Size () < MAX_TO_DELETE_PER_EPOCH) {
160
- // collect parts to delete
161
- auto key = It.GetCurKey ().LogoBlobID ();
162
- for (ui8 partIdx = partsToDelete.FirstPosition (); partIdx < partsToDelete.GetSize (); partIdx = partsToDelete.NextPosition (partIdx)) {
163
- TryDeleteParts.Data .emplace_back (TLogoBlobID (key, partIdx + 1 ));
164
- STLOG (PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP (Ctx->VCtx , " Delete" ), (LogoBlobId, TryDeleteParts.Data .back ().ToString ()));
165
- }
168
+ // collect parts to delete
169
+ if (Ctx->Cfg .EnableDelete && TryDeleteParts.Size () < Ctx->Cfg .MaxToDeletePerEpoch ) {
170
+ if (auto partsToDelete = merger.Ingress .LocalParts (top.GType ) & delMask; !partsToDelete.Empty ()) {
171
+ auto key = It.GetCurKey ().LogoBlobID ();
172
+ for (ui8 partIdx = partsToDelete.FirstPosition (); partIdx < partsToDelete.GetSize (); partIdx = partsToDelete.NextPosition (partIdx)) {
173
+ TryDeleteParts.Data .emplace_back (TLogoBlobID (key, partIdx + 1 ));
174
+ STLOG (PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP (Ctx->VCtx , " Delete" ), (LogoBlobId, TryDeleteParts.Data .back ().ToString ()));
175
+ }
166
176
167
- for (const auto & [parts, data]: merger.Parts ) {
168
- if (!(partsToDelete & parts).Empty ()) {
169
- TryDeletePartsFullData[key].emplace_back (TPartInfo{
170
- .Key =key, .PartsMask =parts, .PartData =data
171
- });
177
+ for (const auto & [parts, data]: merger.Parts ) {
178
+ if (!(partsToDelete & parts).Empty ()) {
179
+ TryDeletePartsFullData[key].emplace_back (TPartInfo{
180
+ .Key =key, .PartsMask =parts, .PartData =data
181
+ });
182
+ }
172
183
}
173
184
}
174
185
}
175
186
176
- merger.Clear ();
177
-
178
- if (SendOnMainParts.Size () >= MAX_TO_SEND_PER_EPOCH && TryDeleteParts.Size () >= MAX_TO_DELETE_PER_EPOCH) {
187
+ if (SendOnMainParts.Size () >= Ctx->Cfg .MaxToSendPerEpoch && TryDeleteParts.Size () >= Ctx->Cfg .MaxToDeletePerEpoch ) {
179
188
// reached the limit of parts to send and delete
180
189
break ;
181
190
}
@@ -192,11 +201,12 @@ namespace NBalancing {
192
201
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " TEvCompleted" ), (Type, ev->Type ));
193
202
BatchManager.Handle (ev);
194
203
195
- if (StartTime + EPOCH_TIMEOUT < TlsActivationContext->Now ()) {
204
+ if (StartTime + Ctx-> Cfg . EpochTimeout < TlsActivationContext->Now ()) {
196
205
Ctx->MonGroup .EpochTimeouts ()++;
197
206
Send (MakeBlobStorageReplBrokerID (), new TEvReleaseReplToken);
198
207
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " Epoch timeout" ));
199
208
PassAway ();
209
+ return ;
200
210
}
201
211
202
212
if (BatchManager.IsBatchCompleted ()) {
@@ -242,8 +252,8 @@ namespace NBalancing {
242
252
243
253
void Handle (NActors::TEvents::TEvUndelivered::TPtr ev) {
244
254
if (ev.Get ()->Type == TEvReplToken::EventType) {
245
- STLOG (PRI_WARN , BS_VDISK_BALANCING, BSVB06, VDISKP (Ctx->VCtx , " Ask repl token msg not delivered" ), (SelfId, SelfId ()), (PDiskId, Ctx->VDiskCfg ->BaseInfo .PDiskId ));
246
- ScheduleJobQuant ();
255
+ STLOG (PRI_ERROR , BS_VDISK_BALANCING, BSVB06, VDISKP (Ctx->VCtx , " Ask repl token msg not delivered" ), (SelfId, SelfId ()), (PDiskId, Ctx->VDiskCfg ->BaseInfo .PDiskId ));
256
+ ContinueBalancing ();
247
257
}
248
258
}
249
259
@@ -302,8 +312,8 @@ namespace NBalancing {
302
312
, Ctx(ctx)
303
313
, GInfo(ctx->GInfo)
304
314
, It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap)
305
- , SendOnMainParts(BATCH_SIZE )
306
- , TryDeleteParts(BATCH_SIZE )
315
+ , SendOnMainParts(Ctx->Cfg.BatchSize )
316
+ , TryDeleteParts(Ctx->Cfg.BatchSize )
307
317
, StartTime(TlsActivationContext->Now ())
308
318
{
309
319
}
0 commit comments