@@ -111,16 +111,16 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
111
111
} ;
112
112
let name = index_metadata. name ;
113
113
114
- let needs_compaction = match & config. on_disk_state {
114
+ let maybe_segments_to_compact = match & config. on_disk_state {
115
115
SearchOnDiskState :: Backfilling ( BackfillState {
116
116
segments,
117
117
backfill_snapshot_ts,
118
118
..
119
119
} ) => {
120
120
if backfill_snapshot_ts. is_none ( ) {
121
- false
121
+ continue ;
122
122
} else {
123
- Self :: segments_need_compaction (
123
+ Self :: find_segments_to_compact (
124
124
segments,
125
125
& config. developer_config ,
126
126
& self . config ,
@@ -134,14 +134,14 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
134
134
| SearchOnDiskState :: Backfilled ( SearchSnapshot {
135
135
data : SnapshotData :: MultiSegment ( segments) ,
136
136
..
137
- } ) => Self :: segments_need_compaction (
137
+ } ) => Self :: find_segments_to_compact (
138
138
segments,
139
139
& config. developer_config ,
140
140
& self . config ,
141
141
) ?,
142
- _ => false ,
142
+ _ => continue ,
143
143
} ;
144
- if needs_compaction {
144
+ if let Some ( ( segments_to_compact , compaction_reason ) ) = maybe_segments_to_compact {
145
145
tracing:: info!(
146
146
"Queueing {:?} index for compaction: {name:?}" ,
147
147
Self :: search_type( )
@@ -151,6 +151,8 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
151
151
index_name : name. clone ( ) ,
152
152
developer_config : config. developer_config . clone ( ) ,
153
153
on_disk_state : config. on_disk_state ,
154
+ segments_to_compact,
155
+ compaction_reason,
154
156
} ;
155
157
to_build. push ( job) ;
156
158
}
@@ -160,43 +162,21 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
160
162
161
163
async fn build_one ( & self , job : CompactionJob < T > ) -> anyhow:: Result < u64 > {
162
164
let timer = compaction_build_one_timer ( Self :: search_type ( ) ) ;
163
- let ( segments , snapshot_ts) = match job. on_disk_state {
165
+ let snapshot_ts = match job. on_disk_state {
164
166
SearchOnDiskState :: Backfilling ( BackfillState {
165
- segments,
166
167
backfill_snapshot_ts,
167
168
..
168
- } ) => {
169
- let ts = backfill_snapshot_ts. with_context ( || {
170
- format ! (
171
- "Trying to compact backfilling {:?} segments with no backfill timestamp" ,
172
- Self :: search_type( )
173
- )
174
- } ) ?;
175
- ( segments, ts)
176
- } ,
169
+ } ) => backfill_snapshot_ts. with_context ( || {
170
+ format ! (
171
+ "Trying to compact backfilling {:?} segments with no backfill timestamp" ,
172
+ Self :: search_type( )
173
+ )
174
+ } ) ?,
177
175
SearchOnDiskState :: Backfilled ( snapshot)
178
- | SearchOnDiskState :: SnapshottedAt ( snapshot) => {
179
- let segments = match snapshot. data {
180
- SnapshotData :: Unknown ( _) => {
181
- anyhow:: bail!(
182
- "Trying to compact unknown {:?} snapshot" ,
183
- Self :: search_type( )
184
- ) ;
185
- } ,
186
- SnapshotData :: MultiSegment ( segments) => segments,
187
- SnapshotData :: SingleSegment ( _) => {
188
- anyhow:: bail!(
189
- "Trying to compact a single segment {:?} index!" ,
190
- Self :: search_type( )
191
- ) ;
192
- } ,
193
- } ;
194
- ( segments, snapshot. ts )
195
- } ,
176
+ | SearchOnDiskState :: SnapshottedAt ( snapshot) => snapshot. ts ,
196
177
} ;
197
178
198
- let ( segments_to_compact, reason) =
199
- Self :: find_segments_to_compact ( & segments, & job. developer_config , & self . config ) ?;
179
+ let segments_to_compact = job. segments_to_compact ;
200
180
anyhow:: ensure!( segments_to_compact. len( ) > 0 ) ;
201
181
202
182
let total_compacted_segments = segments_to_compact. len ( ) ;
@@ -231,7 +211,7 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
231
211
Self :: format( & new_segment, & job. developer_config) ?,
232
212
) ;
233
213
234
- finish_compaction_timer ( timer, reason ) ;
214
+ finish_compaction_timer ( timer, job . compaction_reason ) ;
235
215
Ok ( total_compacted_segments)
236
216
}
237
217
@@ -290,7 +270,7 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
290
270
segments : & ' a Vec < T :: Segment > ,
291
271
developer_config : & ' a T :: DeveloperConfig ,
292
272
compaction_config : & CompactionConfig ,
293
- ) -> anyhow:: Result < ( Vec < T :: Segment > , CompactionReason ) > {
273
+ ) -> anyhow:: Result < Option < ( Vec < T :: Segment > , CompactionReason ) > > {
294
274
fn to_owned < R : Clone > ( borrowed : Vec < & R > ) -> Vec < R > {
295
275
borrowed. into_iter ( ) . cloned ( ) . collect_vec ( )
296
276
}
@@ -315,7 +295,10 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
315
295
let compact_small =
316
296
Self :: get_compactable_segments ( small_segments, developer_config, compaction_config) ?;
317
297
if let Some ( compact_small) = compact_small {
318
- return Ok ( ( to_owned ( compact_small) , CompactionReason :: SmallSegments ) ) ;
298
+ return Ok ( Some ( (
299
+ to_owned ( compact_small) ,
300
+ CompactionReason :: SmallSegments ,
301
+ ) ) ) ;
319
302
}
320
303
// Next check to see if we have too many larger segments and if so, compact
321
304
// them.
@@ -329,7 +312,10 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
329
312
compaction_config,
330
313
) ?;
331
314
if let Some ( compact_large) = compact_large {
332
- return Ok ( ( to_owned ( compact_large) , CompactionReason :: LargeSegments ) ) ;
315
+ return Ok ( Some ( (
316
+ to_owned ( compact_large) ,
317
+ CompactionReason :: LargeSegments ,
318
+ ) ) ) ;
333
319
}
334
320
335
321
// Finally check to see if any individual segment has a large number of deleted
@@ -349,7 +335,7 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
349
335
} ) ?
350
336
. map ( |( segment, _) | vec ! [ segment] ) ;
351
337
if let Some ( compact_deletes) = compact_deletes {
352
- return Ok ( ( to_owned ( compact_deletes) , CompactionReason :: Deletes ) ) ;
338
+ return Ok ( Some ( ( to_owned ( compact_deletes) , CompactionReason :: Deletes ) ) ) ;
353
339
}
354
340
tracing:: trace!(
355
341
"Found no segments to compact, segments: {:#?}" ,
@@ -366,19 +352,7 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
366
352
} )
367
353
. collect_vec( )
368
354
) ;
369
- Ok ( ( vec ! [ ] , CompactionReason :: Unknown ) )
370
- }
371
-
372
- fn segments_need_compaction (
373
- segments : & Vec < T :: Segment > ,
374
- developer_config : & T :: DeveloperConfig ,
375
- compaction_config : & CompactionConfig ,
376
- ) -> anyhow:: Result < bool > {
377
- Ok (
378
- !Self :: find_segments_to_compact ( segments, developer_config, compaction_config) ?
379
- . 0
380
- . is_empty ( ) ,
381
- )
355
+ Ok ( None )
382
356
}
383
357
384
358
async fn compact (
@@ -461,4 +435,6 @@ struct CompactionJob<T: SearchIndex> {
461
435
index_name : TabletIndexName ,
462
436
developer_config : T :: DeveloperConfig ,
463
437
on_disk_state : SearchOnDiskState < T > ,
438
+ segments_to_compact : Vec < T :: Segment > ,
439
+ compaction_reason : CompactionReason ,
464
440
}
0 commit comments