@@ -19,7 +19,6 @@ namespace NKikimr {
19
19
std::shared_ptr<TDefragCtx> DCtx;
20
20
const TVDiskID SelfVDiskId;
21
21
std::optional<TChunksToDefrag> ChunksToDefrag;
22
- const ui32 MinHugeBlobInBytes;
23
22
24
23
enum {
25
24
EvResume = EventSpaceBegin (TEvents::ES_PRIVATE)
@@ -29,12 +28,11 @@ namespace NKikimr {
29
28
30
29
public:
31
30
TDefragQuantum (const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId,
32
- std::optional<TChunksToDefrag> chunksToDefrag, ui32 minHugeBlobInBytes )
31
+ std::optional<TChunksToDefrag> chunksToDefrag)
33
32
: TActorCoroImpl(64_KB, true )
34
33
, DCtx(dctx)
35
34
, SelfVDiskId(selfVDiskId)
36
35
, ChunksToDefrag(std::move(chunksToDefrag))
37
- , MinHugeBlobInBytes(minHugeBlobInBytes)
38
36
{}
39
37
40
38
void ProcessUnexpectedEvent (TAutoPtr<IEventHandle> ev) {
@@ -66,8 +64,7 @@ namespace NKikimr {
66
64
STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD07, DCtx->VCtx ->VDiskLogPrefix << " going to find chunks to defrag" ,
67
65
(ActorId, SelfActorId));
68
66
69
- TDefragQuantumFindChunks findChunks (GetSnapshot (), DCtx->HugeBlobCtx , ChunksToDefrag ?
70
- std::make_optional (std::move (ChunksToDefrag->ChunksToShred )) : std::nullopt);
67
+ TDefragQuantumFindChunks findChunks (GetSnapshot (), DCtx->HugeBlobCtx );
71
68
const ui64 endTime = GetCycleCountFast () + DurationToCycles (NDefrag::MaxSnapshotHoldDuration);
72
69
while (findChunks.Scan (NDefrag::WorkQuantum)) {
73
70
if (GetCycleCountFast () >= endTime) {
@@ -83,33 +80,67 @@ namespace NKikimr {
83
80
Y_ABORT_UNLESS (*ChunksToDefrag || ChunksToDefrag->IsShred ());
84
81
}
85
82
if (*ChunksToDefrag || ChunksToDefrag->IsShred ()) {
86
- STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD09, DCtx->VCtx ->VDiskLogPrefix
87
- << " commencing defragmentation" , (ActorId, SelfActorId), (ChunksToDefrag, *ChunksToDefrag));
83
+ const bool isShred = ChunksToDefrag->IsShred ();
88
84
89
- if (!ChunksToDefrag->IsShred ()) {
90
- stat.FoundChunksToDefrag = ChunksToDefrag->FoundChunksToDefrag ;
91
- stat.Eof = stat.FoundChunksToDefrag < DCtx->MaxChunksToDefrag ;
85
+ TDefragChunks lockedChunks;
92
86
93
- STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD10, DCtx->VCtx ->VDiskLogPrefix << " locking chunks" ,
94
- (ActorId, SelfActorId));
87
+ if (!isShred) {
88
+ STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD09, DCtx->VCtx ->VDiskLogPrefix
89
+ << " commencing defragmentation" , (ActorId, SelfActorId), (ChunksToDefrag, *ChunksToDefrag));
95
90
96
- auto lockedChunks = LockChunks (* ChunksToDefrag) ;
97
- ChunksToDefrag-> Chunks = std::move (lockedChunks) ;
91
+ stat. FoundChunksToDefrag = ChunksToDefrag-> FoundChunksToDefrag ;
92
+ stat. Eof = stat. FoundChunksToDefrag < DCtx-> MaxChunksToDefrag ;
98
93
stat.FreedChunks = ChunksToDefrag->Chunks ;
99
94
95
+ lockedChunks = LockChunks (*ChunksToDefrag);
96
+
100
97
STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD11, DCtx->VCtx ->VDiskLogPrefix << " locked chunks" ,
101
- (ActorId, SelfActorId), (LockedChunks, ChunksToDefrag->Chunks ));
98
+ (ActorId, SelfActorId), (LockedChunks, lockedChunks));
99
+ } else {
100
+ STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD14, DCtx->VCtx ->VDiskLogPrefix
101
+ << " commencing shredding" , (ActorId, SelfActorId), (ChunksToShred, ChunksToDefrag->ChunksToShred ));
102
102
}
103
103
104
- TDefragQuantumFindRecords findRecords (std::move (*ChunksToDefrag), DCtx->VCtx ->Top ->GType ,
105
- DCtx->AddHeader , DCtx->HugeBlobCtx , MinHugeBlobInBytes);
104
+ TDefragQuantumFindRecords findRecords (std::move (*ChunksToDefrag), lockedChunks);
106
105
while (findRecords.Scan (NDefrag::WorkQuantum, GetSnapshot ())) {
107
106
Yield ();
108
107
}
109
108
110
109
if (auto records = findRecords.GetRecordsToRewrite (); !records.empty ()) {
110
+ THashMap<TChunkIdx, ui32> heatmap;
111
+ for (const auto & record : records) {
112
+ ++heatmap[record.OldDiskPart .ChunkIdx ];
113
+ }
114
+
115
+ std::vector<std::tuple<TChunkIdx, ui32>> chunks (heatmap.begin (), heatmap.end ());
116
+ std::ranges::sort (chunks, std::less<ui32>(), [](const auto & x) { return std::get<1 >(x); });
117
+
118
+ const size_t numRecordsTotal = records.size ();
119
+
120
+ if (isShred && chunks.size () > DCtx->MaxChunksToDefrag ) {
121
+ chunks.resize (DCtx->MaxChunksToDefrag );
122
+ THashSet<TChunkIdx> set;
123
+ for (const auto & [chunkIdx, usage] : chunks) {
124
+ set.insert (chunkIdx);
125
+ }
126
+ auto pred = [&](const auto & record) { return !set.contains (record.OldDiskPart .ChunkIdx ); };
127
+ auto range = std::ranges::remove_if (records, pred);
128
+ records.erase (range.begin (), range.end ());
129
+ }
130
+
131
+ auto getSortedChunks = [&] {
132
+ std::vector<TChunkIdx> temp;
133
+ temp.reserve (chunks.size ());
134
+ for (const auto & [chunkIdx, usage] : chunks) {
135
+ temp.push_back (chunkIdx);
136
+ }
137
+ std::ranges::sort (temp);
138
+ return temp;
139
+ };
140
+
111
141
STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD12, DCtx->VCtx ->VDiskLogPrefix << " rewriting records" ,
112
- (ActorId, SelfActorId), (NumRecordsToRewrite, records.size ()));
142
+ (ActorId, SelfActorId), (NumRecordsToRewrite, records.size ()), (NumRecordsTotal, numRecordsTotal),
143
+ (Chunks, getSortedChunks ()));
113
144
114
145
const TActorId rewriterActorId = Register (CreateDefragRewriter (DCtx, SelfVDiskId, SelfActorId,
115
146
std::move (records)));
@@ -122,19 +153,32 @@ namespace NKikimr {
122
153
}
123
154
stat.RewrittenRecs = ev->Get ()->RewrittenRecs ;
124
155
stat.RewrittenBytes = ev->Get ()->RewrittenBytes ;
125
- } else if (ChunksToDefrag-> IsShred () ) {
126
- // no records to rewrite found
127
- stat. Eof = true ;
156
+ if (isShred ) {
157
+ stat. Eof = false ;
158
+ }
128
159
}
129
160
130
- auto tablesToCompact = std::move (findRecords.GetTablesToCompact ());
161
+ // scan index again to find tables we have to compact
162
+ for (findRecords.StartFindingTablesToCompact (); findRecords.Scan (NDefrag::WorkQuantum, GetSnapshot ()); Yield ()) {}
163
+ if (auto records = findRecords.GetRecordsToRewrite (); !records.empty ()) {
164
+ for (const auto & item : records) {
165
+ STLOG (PRI_WARN, BS_VDISK_DEFRAG, BSVDD16, DCtx->VCtx ->VDiskLogPrefix
166
+ << " blob found again after rewriting" , (ActorId, SelfActorId), (Id, item.LogoBlobId ),
167
+ (Location, item.OldDiskPart ));
168
+ }
169
+ }
131
170
171
+ auto tablesToCompact = findRecords.GetTablesToCompact ();
172
+ const bool needsFreshCompaction = findRecords.GetNeedsFreshCompaction ();
132
173
STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD13, DCtx->VCtx ->VDiskLogPrefix << " compacting" ,
133
- (ActorId, SelfActorId), (TablesToCompact, tablesToCompact));
134
-
135
- Compact (tablesToCompact);
174
+ (ActorId, SelfActorId), (TablesToCompact, tablesToCompact),
175
+ (NeedsFreshCompaction, needsFreshCompaction));
176
+ Compact (std::move ( tablesToCompact), needsFreshCompaction );
136
177
}
137
178
179
+ STLOG (PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD15, DCtx->VCtx ->VDiskLogPrefix << " quantum finished" ,
180
+ (ActorId, SelfActorId), (Stat, stat));
181
+
138
182
Send (ParentActorId, new TEvDefragQuantumResult (std::move (stat)));
139
183
}
140
184
@@ -154,15 +198,21 @@ namespace NKikimr {
154
198
return res->Get ()->LockedChunks ;
155
199
}
156
200
157
- void Compact (THashSet<ui64> tablesToCompact) {
158
- Send (DCtx->SkeletonId , TEvCompactVDisk::Create (EHullDbType::LogoBlobs, tablesToCompact));
201
+ void Compact (THashSet<ui64> tablesToCompact, bool needsFreshCompaction) {
202
+ if (tablesToCompact) {
203
+ Send (DCtx->SkeletonId , TEvCompactVDisk::Create (EHullDbType::LogoBlobs, std::move (tablesToCompact)));
204
+ } else if (needsFreshCompaction) {
205
+ Send (DCtx->SkeletonId , TEvCompactVDisk::Create (EHullDbType::LogoBlobs, TEvCompactVDisk::EMode::FRESH_ONLY));
206
+ } else {
207
+ return ; // nothing to do
208
+ }
159
209
WaitForSpecificEvent<TEvCompactVDiskResult>(&TDefragQuantum::ProcessUnexpectedEvent);
160
210
}
161
211
};
162
212
163
213
IActor *CreateDefragQuantumActor (const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId,
164
- std::optional<TChunksToDefrag> chunksToDefrag, ui32 minHugeBlobInBytes ) {
165
- return new TActorCoro (MakeHolder<TDefragQuantum>(dctx, selfVDiskId, std::move (chunksToDefrag), minHugeBlobInBytes ),
214
+ std::optional<TChunksToDefrag> chunksToDefrag) {
215
+ return new TActorCoro (MakeHolder<TDefragQuantum>(dctx, selfVDiskId, std::move (chunksToDefrag)),
166
216
NKikimrServices::TActivity::BS_DEFRAG_QUANTUM);
167
217
}
168
218
0 commit comments