1
- use std:: { thread, time :: Duration } ;
1
+ use std:: thread;
2
2
3
3
use alloy:: transports:: http:: ReqwestTransport ;
4
4
use anyhow:: anyhow;
5
5
use event_handlers:: { finalized_checkpoint:: FinalizedCheckpointHandler , head:: HeadEventHandler } ;
6
6
use futures:: StreamExt ;
7
7
use reqwest_eventsource:: Event ;
8
- use tokio:: { sync:: mpsc, task:: JoinHandle , time :: sleep } ;
8
+ use tokio:: { sync:: mpsc, task:: JoinHandle } ;
9
9
use tracing:: { debug, error, info, Instrument } ;
10
10
11
11
use crate :: {
@@ -237,7 +237,7 @@ impl Indexer<ReqwestTransport> {
237
237
. subscribe_to_events ( & topics)
238
238
. map_err ( LiveIndexingError :: BeaconEventsSubscriptionError ) ?;
239
239
240
- info ! ( "Subscribed to beacon events : {}" , events) ;
240
+ info ! ( "Subscribed to beacon SSE stream : {}" , events) ;
241
241
242
242
while let Some ( event) = event_source. next ( ) . await {
243
243
match event {
@@ -271,11 +271,7 @@ impl Indexer<ReqwestTransport> {
271
271
event_source. close ( ) ;
272
272
273
273
if let reqwest_eventsource:: Error :: StreamEnded = error {
274
- info ! (
275
- "Beacon node events stream ended. Retrying subscription connection…"
276
- ) ;
277
-
278
- sleep ( Duration :: from_secs ( 1 ) ) . await ;
274
+ info ! ( "Beacon node SSE stream ended. Resubscribing to stream…" ) ;
279
275
280
276
break ;
281
277
} else {
@@ -289,7 +285,6 @@ impl Indexer<ReqwestTransport> {
289
285
. instrument ( realtime_sync_task_span)
290
286
. await ;
291
287
292
- // Send final status message
293
288
if let Err ( error) = result {
294
289
tx. send ( IndexerTaskMessage :: Error ( error. into ( ) ) ) . await ?;
295
290
} else {
0 commit comments