@@ -15,6 +15,8 @@ mod env;
15
15
mod slots_processor;
16
16
mod utils;
17
17
18
+ const MAX_SLOTS_PER_SAVE : u32 = 1000 ;
19
+
18
20
#[ tokio:: main]
19
21
async fn main ( ) -> Result < ( ) > {
20
22
dotenv:: dotenv ( ) . ok ( ) ;
@@ -42,21 +44,60 @@ async fn main() -> Result<()> {
42
44
let latest_slot: u32 = latest_beacon_block. slot . parse ( ) ?;
43
45
44
46
if current_slot < latest_slot {
45
- let slot_manager_span = tracing:: debug_span!(
46
- "slot_processor_manager" ,
47
- initial_slot = current_slot,
48
- final_slot = latest_slot
47
+ let unprocessed_slots = latest_slot - current_slot;
48
+ let current_max_slots_size = std:: cmp:: min ( unprocessed_slots, MAX_SLOTS_PER_SAVE ) ;
49
+ let num_chunks = unprocessed_slots / current_max_slots_size;
50
+
51
+ let remaining_slots = unprocessed_slots % current_max_slots_size;
52
+ let num_chunks = if remaining_slots > 0 {
53
+ num_chunks + 1
54
+ } else {
55
+ num_chunks
56
+ } ;
57
+
58
+ info ! (
59
+ "Processing slots from {} to {}, partitioned into {} chunks…" ,
60
+ current_slot, latest_slot, num_chunks
49
61
) ;
50
62
51
- slots_processor
52
- . process_slots ( current_slot, latest_slot)
53
- . instrument ( slot_manager_span)
54
- . await ?;
63
+ for i in 0 ..num_chunks {
64
+ let slots_in_current_chunk = if i == num_chunks - 1 {
65
+ current_max_slots_size + remaining_slots
66
+ } else {
67
+ current_max_slots_size
68
+ } ;
69
+
70
+ let chunk_initial_slot = current_slot + i * current_max_slots_size;
71
+ let chunk_final_slot = chunk_initial_slot + slots_in_current_chunk;
72
+
73
+ let slot_manager_span = tracing:: info_span!(
74
+ "slots_processor" ,
75
+ initial_slot = chunk_initial_slot,
76
+ final_slot = chunk_final_slot
77
+ ) ;
55
78
56
- blobscan_client. update_slot ( latest_slot - 1 ) . await ?;
57
- info ! ( "Latest slot updated to {}" , latest_slot - 1 ) ;
79
+ slots_processor
80
+ . process_slots ( chunk_initial_slot, chunk_final_slot)
81
+ . instrument ( slot_manager_span)
82
+ . await ?;
83
+
84
+ blobscan_client. update_slot ( chunk_final_slot - 1 ) . await ?;
85
+
86
+ info ! (
87
+ "Chunk {} of {} ({} slots) processed successfully!. Updating latest slot to {}." ,
88
+ i+1 ,
89
+ num_chunks,
90
+ chunk_final_slot - chunk_initial_slot,
91
+ chunk_final_slot - 1
92
+ ) ;
93
+ }
58
94
59
95
current_slot = latest_slot;
96
+
97
+ info ! (
98
+ "All slots processed successfully! Total slots processed: {}" ,
99
+ unprocessed_slots
100
+ ) ;
60
101
}
61
102
}
62
103
0 commit comments