|
1 |
| -use anyhow::Result; |
| 1 | +use anyhow::{anyhow, Context as AnyhowContext, Result}; |
| 2 | +use backoff::{future::retry_notify, Error as BackoffError}; |
2 | 3 | use context::{Config as ContextConfig, Context};
|
3 | 4 | use env::Environment;
|
4 | 5 | use slots_processor::{Config as SlotsProcessorConfig, SlotsProcessor};
|
5 |
| -use tracing::{info, Instrument}; |
| 6 | +use tracing::{error, info, warn, Instrument}; |
| 7 | +use utils::exp_backoff::get_exp_backoff_config; |
6 | 8 |
|
7 | 9 | use crate::utils::telemetry::{get_subscriber, init_subscriber};
|
8 | 10 |
|
@@ -81,10 +83,27 @@ async fn main() -> Result<()> {
|
81 | 83 | .instrument(slot_manager_span)
|
82 | 84 | .await?;
|
83 | 85 |
|
84 |
| - blobscan_client.update_slot(chunk_final_slot - 1).await?; |
| 86 | + match retry_notify( |
| 87 | + get_exp_backoff_config(), |
| 88 | + || async move { |
| 89 | + blobscan_client |
| 90 | + .update_slot(chunk_final_slot - 1) |
| 91 | + .await.map_err(|err| err.into()) |
| 92 | + }, |
| 93 | + |e, duration: Duration| { |
| 94 | + let duration = duration.as_secs(); |
| 95 | + warn!("Failed to update latest slot to {}. Retrying in {duration} seconds… (Reason: {e})", chunk_final_slot - 1); |
| 96 | + }, |
| 97 | + ).await { |
| 98 | + Ok(_) => (), |
| 99 | + Err(err) => { |
| 100 | + error!("Failed to update latest slot to {}", chunk_final_slot - 1); |
| 101 | + return Err(err.into()); |
| 102 | + } |
| 103 | + }; |
85 | 104 |
|
86 | 105 | info!(
|
87 |
| - "Chunk {} of {} ({} slots) processed successfully!. Updating latest slot to {}.", |
| 106 | + "Chunk {} of {} ({} slots) processed successfully!. Latest slot updated to {}.", |
88 | 107 | i+1,
|
89 | 108 | num_chunks,
|
90 | 109 | chunk_final_slot - chunk_initial_slot,
|
|
0 commit comments