Skip to content

Commit b6947ef

Browse files
committed
fix: improve reorg handling logic
1 parent bff7882 commit b6947ef

File tree

9 files changed

+378
-200
lines changed

9 files changed

+378
-200
lines changed

src/clients/beacon/types.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub struct BlockBody {
3333
#[derive(Deserialize, Debug)]
3434
pub struct BlockMessage {
3535
pub body: BlockBody,
36+
pub parent_root: B256,
37+
#[serde(deserialize_with = "deserialize_number")]
38+
pub slot: u32,
3639
}
3740

3841
#[derive(Deserialize, Debug)]
@@ -62,7 +65,7 @@ pub struct BlockHeaderResponse {
6265
pub data: BlockHeaderData,
6366
}
6467

65-
#[derive(Deserialize, Debug)]
68+
#[derive(Deserialize, Debug, Clone)]
6669
pub struct BlockHeader {
6770
pub root: B256,
6871
pub parent_root: B256,
@@ -90,6 +93,7 @@ pub struct BlockHeaderMessage {
9093
pub struct HeadEventData {
9194
#[serde(deserialize_with = "deserialize_number")]
9295
pub slot: u32,
96+
#[allow(dead_code)]
9397
pub block: B256,
9498
}
9599

src/clients/blobscan/mod.rs

+30
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use std::fmt::Debug;
22

3+
use alloy::primitives::B256;
34
use async_trait::async_trait;
45
use backoff::ExponentialBackoff;
56
use chrono::TimeDelta;
67
use reqwest::{Client, Url};
78

89
#[cfg(test)]
910
use mockall::automock;
11+
use types::{BlobscanBlock, ReorgedBlocksRequestBody};
1012

1113
use crate::{
1214
clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult},
@@ -37,7 +39,13 @@ pub trait CommonBlobscanClient: Send + Sync + Debug {
3739
transactions: Vec<Transaction>,
3840
blobs: Vec<Blob>,
3941
) -> ClientResult<()>;
42+
async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>>;
4043
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32>;
44+
async fn handle_reorg(
45+
&self,
46+
rewinded_blocks: Vec<B256>,
47+
forwarded_blocks: Vec<B256>,
48+
) -> ClientResult<()>;
4149
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
4250
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
4351
}
@@ -93,6 +101,12 @@ impl CommonBlobscanClient for BlobscanClient {
93101
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
94102
}
95103

104+
async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>> {
105+
let url = self.base_url.join(&format!("block/{}?slot=true", slot))?;
106+
107+
json_get!(&self.client, url, BlobscanBlock, self.exp_backoff.clone())
108+
}
109+
96110
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
97111
let url = self.base_url.join("indexer/reorged-slots")?;
98112
let token = self.jwt_manager.get_token()?;
@@ -104,6 +118,22 @@ impl CommonBlobscanClient for BlobscanClient {
104118
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
105119
}
106120

121+
async fn handle_reorg(
122+
&self,
123+
rewinded_blocks: Vec<B256>,
124+
forwarded_blocks: Vec<B256>,
125+
) -> ClientResult<()> {
126+
let url = self.base_url.join("indexer/reorged-blocks")?;
127+
let token = self.jwt_manager.get_token()?;
128+
129+
let req = ReorgedBlocksRequestBody {
130+
forwarded_blocks,
131+
rewinded_blocks,
132+
};
133+
134+
json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req).map(|_| ())
135+
}
136+
107137
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
108138
let url = self.base_url.join("blockchain-sync-state")?;
109139
let token = self.jwt_manager.get_token()?;

src/clients/blobscan/types.rs

+14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ use serde::{Deserialize, Serialize};
88

99
use crate::{clients::beacon::types::Blob as BeaconBlob, utils::web3::calculate_versioned_hash};
1010

11+
#[derive(Serialize, Deserialize, Debug)]
12+
pub struct BlobscanBlock {
13+
pub hash: B256,
14+
pub number: u32,
15+
pub slot: u32,
16+
}
17+
1118
#[derive(Serialize, Deserialize, Debug)]
1219
#[serde(rename_all = "camelCase")]
1320
pub struct Block {
@@ -92,6 +99,13 @@ pub struct ReorgedSlotsRequest {
9299
pub reorged_slots: Vec<u32>,
93100
}
94101

102+
#[derive(Deserialize, Serialize, Debug)]
103+
#[serde(rename_all = "camelCase")]
104+
pub struct ReorgedBlocksRequestBody {
105+
pub forwarded_blocks: Vec<B256>,
106+
pub rewinded_blocks: Vec<B256>,
107+
}
108+
95109
#[derive(Deserialize, Debug)]
96110
#[serde(rename_all = "camelCase")]
97111
pub struct ReorgedSlotsResponse {

src/indexer/error.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub enum IndexerError {
1616
#[error(transparent)]
1717
SyncingTaskError(#[from] IndexingError),
1818
#[error("failed to retrieve blobscan's sync state")]
19-
BlobscanSyncStateRetrievalError(#[source] ClientError),
19+
BlobscanSyncStateRetrievalError(#[from] ClientError),
2020
#[error("failed to send syncing task message")]
2121
SyncingTaskMessageSendFailure(#[from] SendError<IndexerTaskMessage>),
2222
}

src/indexer/event_handlers/head.rs

+23-108
Original file line numberDiff line numberDiff line change
@@ -1,147 +1,62 @@
1-
use std::cmp;
2-
3-
use alloy::{primitives::B256, transports::Transport};
4-
use tracing::info;
5-
61
use crate::{
7-
clients::{
8-
beacon::types::{BlockHeader, BlockId, HeadEventData},
9-
blobscan::types::BlockchainSyncState,
10-
common::ClientError,
11-
},
12-
context::CommonContext,
2+
clients::beacon::types::{BlockId, HeadEventData},
133
synchronizer::{error::SynchronizerError, CommonSynchronizer},
144
};
155

166
#[derive(Debug, thiserror::Error)]
177
pub enum HeadEventHandlerError {
188
#[error(transparent)]
199
EventDeserializationFailure(#[from] serde_json::Error),
20-
#[error("failed to retrieve header for block \"{0}\"")]
21-
BlockHeaderRetrievalError(BlockId, #[source] ClientError),
22-
#[error("header for block \"{0}\" not found")]
23-
BlockHeaderNotFound(BlockId),
2410
#[error("failed to index head block")]
2511
BlockSyncedError(#[from] SynchronizerError),
26-
#[error("failed to handle reorged slots")]
27-
BlobscanReorgedSlotsFailure(#[source] ClientError),
28-
#[error("failed to update blobscan's sync state")]
29-
BlobscanSyncStateUpdateError(#[source] ClientError),
3012
}
3113

32-
pub struct HeadEventHandler<T> {
33-
context: Box<dyn CommonContext<T>>,
14+
pub struct HeadEventHandler {
3415
synchronizer: Box<dyn CommonSynchronizer>,
35-
start_block_id: BlockId,
36-
last_block_hash: Option<B256>,
16+
is_first_event: bool,
17+
custom_start_block_id: Option<BlockId>,
3718
}
3819

39-
impl<T> HeadEventHandler<T>
40-
where
41-
T: Transport + Send + Sync + 'static,
42-
{
20+
impl HeadEventHandler {
4321
pub fn new(
44-
context: Box<dyn CommonContext<T>>,
4522
synchronizer: Box<dyn CommonSynchronizer>,
46-
start_block_id: BlockId,
23+
custom_start_block_id: Option<BlockId>,
4724
) -> Self {
4825
HeadEventHandler {
49-
context,
5026
synchronizer,
51-
start_block_id,
52-
last_block_hash: None,
27+
is_first_event: true,
28+
custom_start_block_id,
5329
}
5430
}
5531

5632
pub async fn handle(&mut self, event_data: String) -> Result<(), HeadEventHandlerError> {
5733
let head_block_data = serde_json::from_str::<HeadEventData>(&event_data)?;
34+
let head_slot = head_block_data.slot;
5835

59-
let head_block_slot = head_block_data.slot;
60-
let head_block_hash = head_block_data.block;
36+
// If this is the first event being processed, ensure the synchronizer is fully up to date
37+
if self.is_first_event {
38+
self.is_first_event = false;
6139

62-
let head_block_id = BlockId::Slot(head_block_data.slot);
63-
let initial_block_id = if self.last_block_hash.is_none() {
64-
self.start_block_id.clone()
65-
} else {
66-
head_block_id.clone()
67-
};
40+
let start_block_id = self.custom_start_block_id.clone().or(self
41+
.synchronizer
42+
.get_last_synced_block()
43+
.map(|block| (block.slot + 1).into()));
6844

69-
let head_block_header = self.get_block_header(head_block_id).await?;
45+
if let Some(start_block_id) = start_block_id {
46+
if self.custom_start_block_id.is_some() {
47+
self.synchronizer.clear_last_synced_block();
48+
}
7049

71-
if let Some(last_block_hash) = self.last_block_hash {
72-
if last_block_hash != head_block_header.parent_root {
73-
let parent_block_header = self
74-
.get_block_header(head_block_header.parent_root.into())
50+
self.synchronizer
51+
.sync_blocks(start_block_id, head_slot.into())
7552
.await?;
76-
let parent_block_slot = parent_block_header.slot;
77-
let reorg_start_slot = parent_block_slot + 1;
78-
let reorg_final_slot = head_block_slot;
79-
let reorged_slots = (reorg_start_slot..reorg_final_slot).collect::<Vec<u32>>();
80-
81-
let result: Result<(), HeadEventHandlerError> = async {
82-
let total_updated_slots = self.context
83-
.blobscan_client()
84-
.handle_reorged_slots(reorged_slots.as_slice())
85-
.await
86-
.map_err(HeadEventHandlerError::BlobscanReorgedSlotsFailure)?;
87-
88-
89-
info!(slot=head_block_slot, "Reorganization detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots);
90-
91-
// Re-index parent block as it may be mark as reorged and not indexed
92-
self.synchronizer
93-
.run(
94-
parent_block_slot.into(),
95-
(parent_block_slot + 1).into(),
96-
)
97-
.await?;
98-
99-
Ok(())
100-
}
101-
.await;
102-
103-
if let Err(err) = result {
104-
// If an error occurred while handling the reorg try to update the latest synced slot to the last known slot before the reorg
105-
self.context
106-
.blobscan_client()
107-
.update_sync_state(BlockchainSyncState {
108-
last_finalized_block: None,
109-
last_lower_synced_slot: None,
110-
last_upper_synced_slot: Some(cmp::max(parent_block_slot - 1, 0)),
111-
})
112-
.await
113-
.map_err(HeadEventHandlerError::BlobscanSyncStateUpdateError)?;
114-
115-
return Err(err);
116-
}
11753
}
11854
}
11955

120-
self.synchronizer
121-
.run(initial_block_id, (head_block_slot + 1).into())
122-
.await?;
123-
124-
self.last_block_hash = Some(head_block_hash);
56+
self.synchronizer.sync_block(head_slot.into()).await?;
12557

12658
Ok(())
12759
}
128-
129-
async fn get_block_header(
130-
&self,
131-
block_id: BlockId,
132-
) -> Result<BlockHeader, HeadEventHandlerError> {
133-
match self
134-
.context
135-
.beacon_client()
136-
.get_block_header(block_id.clone())
137-
.await
138-
.map_err(|err| {
139-
HeadEventHandlerError::BlockHeaderRetrievalError(block_id.clone(), err)
140-
})? {
141-
Some(block) => Ok(block.into()),
142-
None => Err(HeadEventHandlerError::BlockHeaderNotFound(block_id.clone())),
143-
}
144-
}
14560
}
14661

14762
// #[cfg(test)]

0 commit comments

Comments
 (0)