Skip to content

Commit c943978

Browse files
committed
feat: add slot save interval for error resilience
1 parent 2151fa1 commit c943978

File tree

6 files changed

+56
-15
lines changed

6 files changed

+56
-15
lines changed

src/blobscan_client/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl BlobscanClient {
6161

6262
let index_response = self
6363
.client
64-
.post(url)
64+
.put(url)
6565
.bearer_auth(token)
6666
.json(&index_request)
6767
.send()

src/main.rs

+51-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ mod env;
1515
mod slots_processor;
1616
mod utils;
1717

18+
const MAX_SLOTS_PER_SAVE: u32 = 1000;
19+
1820
#[tokio::main]
1921
async fn main() -> Result<()> {
2022
dotenv::dotenv().ok();
@@ -42,21 +44,60 @@ async fn main() -> Result<()> {
4244
let latest_slot: u32 = latest_beacon_block.slot.parse()?;
4345

4446
if current_slot < latest_slot {
45-
let slot_manager_span = tracing::debug_span!(
46-
"slot_processor_manager",
47-
initial_slot = current_slot,
48-
final_slot = latest_slot
47+
let unprocessed_slots = latest_slot - current_slot;
48+
let current_max_slots_size = std::cmp::min(unprocessed_slots, MAX_SLOTS_PER_SAVE);
49+
let num_chunks = unprocessed_slots / current_max_slots_size;
50+
51+
let remaining_slots = unprocessed_slots % current_max_slots_size;
52+
let num_chunks = if remaining_slots > 0 {
53+
num_chunks + 1
54+
} else {
55+
num_chunks
56+
};
57+
58+
info!(
59+
"Processing slots from {} to {}, partitioned into {} chunks…",
60+
current_slot, latest_slot, num_chunks
4961
);
5062

51-
slots_processor
52-
.process_slots(current_slot, latest_slot)
53-
.instrument(slot_manager_span)
54-
.await?;
63+
for i in 0..num_chunks {
64+
let slots_in_current_chunk = if i == num_chunks - 1 {
65+
current_max_slots_size + remaining_slots
66+
} else {
67+
current_max_slots_size
68+
};
69+
70+
let chunk_initial_slot = current_slot + i * current_max_slots_size;
71+
let chunk_final_slot = chunk_initial_slot + slots_in_current_chunk;
72+
73+
let slot_manager_span = tracing::info_span!(
74+
"slots_processor",
75+
initial_slot = chunk_initial_slot,
76+
final_slot = chunk_final_slot
77+
);
5578

56-
blobscan_client.update_slot(latest_slot - 1).await?;
57-
info!("Latest slot updated to {}", latest_slot - 1);
79+
slots_processor
80+
.process_slots(chunk_initial_slot, chunk_final_slot)
81+
.instrument(slot_manager_span)
82+
.await?;
83+
84+
blobscan_client.update_slot(chunk_final_slot - 1).await?;
85+
86+
info!(
87+
"Chunk {} of {} ({} slots) processed successfully!. Updating latest slot to {}.",
88+
i+1,
89+
num_chunks,
90+
chunk_final_slot - chunk_initial_slot,
91+
chunk_final_slot - 1
92+
);
93+
}
5894

5995
current_slot = latest_slot;
96+
97+
info!(
98+
"All slots processed successfully! Total slots processed: {}",
99+
unprocessed_slots
100+
);
60101
}
61102
}
62103

src/slots_processor/error.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::slot_processor::errors::SlotProcessorError;
1+
use super::slot_processor::error::SlotProcessorError;
22

33
#[derive(Debug, thiserror::Error)]
44
pub enum SlotsChunkThreadError {

src/slots_processor/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl SlotsProcessor {
6666
let thread_initial_slot = current_slot;
6767
let thread_final_slot = current_slot + thread_slots_chunk;
6868

69-
let thread_slots_span = tracing::info_span!(
69+
let thread_slots_span = tracing::debug_span!(
7070
"slots_chunk_processor",
7171
chunk_initial_slot = thread_initial_slot,
7272
chunk_final_slot = thread_final_slot

src/slots_processor/slot_processor/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use crate::{
1212
utils::exp_backoff::get_exp_backoff_config,
1313
};
1414

15-
use self::errors::SlotProcessorError;
15+
use self::error::SlotProcessorError;
1616
use self::helpers::{create_tx_hash_versioned_hashes_mapping, create_versioned_hash_blob_mapping};
1717

18-
pub mod errors;
18+
pub mod error;
1919
mod helpers;
2020

2121
pub struct SlotProcessor {

0 commit comments

Comments
 (0)