@@ -27,9 +27,13 @@ impl index::File {
27
27
& mut <<P as Progress >:: SubProgress as Progress >:: SubProgress ,
28
28
) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > ,
29
29
{
30
- let ( chunk_size, thread_limit, _) =
31
- parallel:: optimize_chunk_size_and_thread_limit ( 1000 , Some ( self . num_objects as usize ) , thread_limit, None ) ;
32
- let there_are_enough_entries_to_process = || self . num_objects > 10_000 ;
30
+ let index_entries =
31
+ util:: index_entries_sorted_by_offset_ascending ( self , root. add_child ( "collecting sorted index" ) ) ;
32
+
33
+ let ( chunk_size, thread_limit, available_cores) =
34
+ parallel:: optimize_chunk_size_and_thread_limit ( 1000 , Some ( index_entries. len ( ) ) , thread_limit, None ) ;
35
+ let there_are_enough_entries_to_process = || index_entries. len ( ) > chunk_size * available_cores;
36
+ let input_chunks = index_entries. chunks ( chunk_size. max ( chunk_size) ) ;
33
37
let reduce_progress = parking_lot:: Mutex :: new ( {
34
38
let mut p = root. add_child ( "Traversing" ) ;
35
39
p. init ( Some ( self . num_objects ( ) ) , Some ( "objects" ) ) ;
@@ -46,13 +50,10 @@ impl index::File {
46
50
47
51
in_parallel_if (
48
52
there_are_enough_entries_to_process,
49
- util:: Chunks {
50
- iter : self . iter ( ) ,
51
- size : chunk_size,
52
- } ,
53
+ input_chunks,
53
54
thread_limit,
54
55
state_per_thread,
55
- |entries : Vec < index:: Entry > ,
56
+ |entries : & [ index:: Entry ] ,
56
57
( cache, ref mut processor, buf, progress) |
57
58
-> Result < Vec < decode:: Outcome > , Error > {
58
59
progress. init ( Some ( entries. len ( ) as u32 ) , Some ( "entries" ) ) ;
0 commit comments