Skip to content

Commit f3ea424

Browse files
authored
Merge pull request #81 from Blobscan/fix/reorg-handling
fix: improve reorg handling logic
2 parents 38f8a7b + 9d26452 commit f3ea424

File tree

12 files changed

+551
-263
lines changed

12 files changed

+551
-263
lines changed

src/clients/beacon/mod.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ use reqwest_eventsource::EventSource;
99

1010
#[cfg(test)]
1111
use mockall::automock;
12+
use types::BlockHeader;
1213

1314
use crate::{
1415
clients::{beacon::types::BlockHeaderResponse, common::ClientResult},
1516
json_get,
1617
};
1718

18-
use self::types::{Blob, BlobsResponse, Block, BlockHeader, BlockId, BlockResponse, Topic};
19+
use self::types::{Blob, BlobsResponse, Block, BlockId, BlockResponse, Topic};
1920

2021
pub mod types;
2122

@@ -34,9 +35,9 @@ pub struct Config {
3435
#[async_trait]
3536
#[cfg_attr(test, automock)]
3637
pub trait CommonBeaconClient: Send + Sync + Debug {
37-
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>>;
38-
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>>;
39-
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>>;
38+
async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>>;
39+
async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>>;
40+
async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>>;
4041
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource>;
4142
}
4243

@@ -56,7 +57,7 @@ impl BeaconClient {
5657

5758
#[async_trait]
5859
impl CommonBeaconClient for BeaconClient {
59-
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
60+
async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>> {
6061
let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
6162
let url = self.base_url.join(path.as_str())?;
6263

@@ -66,7 +67,7 @@ impl CommonBeaconClient for BeaconClient {
6667
})
6768
}
6869

69-
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
70+
async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>> {
7071
let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
7172
let url = self.base_url.join(path.as_str())?;
7273

@@ -77,12 +78,12 @@ impl CommonBeaconClient for BeaconClient {
7778
self.exp_backoff.clone()
7879
)
7980
.map(|res| match res {
80-
Some(r) => Some(r.data),
81+
Some(r) => Some(r.into()),
8182
None => None,
8283
})
8384
}
8485

85-
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
86+
async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>> {
8687
let path = format!("v1/beacon/blob_sidecars/{}", {
8788
block_id.to_detailed_string()
8889
});

src/clients/beacon/types.rs

+79-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ use std::{fmt, str::FromStr};
33
use alloy::primitives::{Bytes, B256};
44
use serde::{Deserialize, Serialize};
55

6+
use crate::clients::common::ClientError;
7+
8+
use super::CommonBeaconClient;
9+
610
#[derive(Serialize, Debug, Clone, PartialEq)]
711
pub enum BlockId {
812
Head,
@@ -33,6 +37,9 @@ pub struct BlockBody {
3337
#[derive(Deserialize, Debug)]
3438
pub struct BlockMessage {
3539
pub body: BlockBody,
40+
pub parent_root: B256,
41+
#[serde(deserialize_with = "deserialize_number")]
42+
pub slot: u32,
3643
}
3744

3845
#[derive(Deserialize, Debug)]
@@ -59,11 +66,18 @@ pub struct BlobsResponse {
5966

6067
#[derive(Deserialize, Debug)]
6168
pub struct BlockHeaderResponse {
62-
pub data: BlockHeader,
69+
pub data: BlockHeaderData,
6370
}
6471

65-
#[derive(Deserialize, Debug)]
72+
#[derive(Deserialize, Debug, Clone)]
6673
pub struct BlockHeader {
74+
pub root: B256,
75+
pub parent_root: B256,
76+
pub slot: u32,
77+
}
78+
79+
#[derive(Deserialize, Debug)]
80+
pub struct BlockHeaderData {
6781
pub root: B256,
6882
pub header: InnerBlockHeader,
6983
}
@@ -83,6 +97,7 @@ pub struct BlockHeaderMessage {
8397
pub struct HeadEventData {
8498
#[serde(deserialize_with = "deserialize_number")]
8599
pub slot: u32,
100+
#[allow(dead_code)]
86101
pub block: B256,
87102
}
88103

@@ -156,3 +171,65 @@ impl From<&Topic> for String {
156171
}
157172
}
158173
}
174+
175+
impl From<B256> for BlockId {
176+
fn from(value: B256) -> Self {
177+
BlockId::Hash(value)
178+
}
179+
}
180+
181+
impl From<u32> for BlockId {
182+
fn from(value: u32) -> Self {
183+
BlockId::Slot(value)
184+
}
185+
}
186+
187+
impl From<BlockHeaderResponse> for BlockHeader {
188+
fn from(response: BlockHeaderResponse) -> Self {
189+
BlockHeader {
190+
root: response.data.root,
191+
parent_root: response.data.header.message.parent_root,
192+
slot: response.data.header.message.slot,
193+
}
194+
}
195+
}
196+
197+
#[derive(Debug, thiserror::Error)]
198+
pub enum BlockIdResolutionError {
199+
#[error("Block with id '{0}' not found")]
200+
BlockNotFound(BlockId),
201+
#[error("Failed to resolve block id '{block_id}'")]
202+
FailedBlockIdResolution {
203+
block_id: BlockId,
204+
#[source]
205+
error: ClientError,
206+
},
207+
}
208+
209+
pub trait BlockIdResolution {
210+
async fn resolve_to_slot(
211+
&self,
212+
beacon_client: &dyn CommonBeaconClient,
213+
) -> Result<u32, BlockIdResolutionError>;
214+
}
215+
216+
impl BlockIdResolution for BlockId {
217+
async fn resolve_to_slot(
218+
&self,
219+
beacon_client: &dyn CommonBeaconClient,
220+
) -> Result<u32, BlockIdResolutionError> {
221+
match self {
222+
BlockId::Slot(slot) => Ok(*slot),
223+
_ => match beacon_client
224+
.get_block_header(self.clone().into())
225+
.await
226+
.map_err(|err| BlockIdResolutionError::FailedBlockIdResolution {
227+
block_id: self.clone(),
228+
error: err,
229+
})? {
230+
Some(header) => Ok(header.slot),
231+
None => Err(BlockIdResolutionError::BlockNotFound(self.clone())),
232+
},
233+
}
234+
}
235+
}

src/clients/blobscan/mod.rs

+27-12
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
use std::fmt::Debug;
22

3+
use alloy::primitives::B256;
34
use async_trait::async_trait;
45
use backoff::ExponentialBackoff;
56
use chrono::TimeDelta;
67
use reqwest::{Client, Url};
78

89
#[cfg(test)]
910
use mockall::automock;
11+
use types::{BlobscanBlock, ReorgedBlocksRequestBody};
1012

11-
use crate::{
12-
clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult},
13-
json_get, json_put,
14-
};
13+
use crate::{clients::common::ClientResult, json_get, json_put};
1514

1615
use self::{
1716
jwt_manager::{Config as JWTManagerConfig, JWTManager},
1817
types::{
1918
Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse,
20-
IndexRequest, ReorgedSlotsRequest, Transaction,
19+
IndexRequest, Transaction,
2120
},
2221
};
2322

@@ -37,7 +36,12 @@ pub trait CommonBlobscanClient: Send + Sync + Debug {
3736
transactions: Vec<Transaction>,
3837
blobs: Vec<Blob>,
3938
) -> ClientResult<()>;
40-
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32>;
39+
async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>>;
40+
async fn handle_reorg(
41+
&self,
42+
rewinded_blocks: Vec<B256>,
43+
forwarded_blocks: Vec<B256>,
44+
) -> ClientResult<()>;
4145
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
4246
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
4347
}
@@ -93,15 +97,26 @@ impl CommonBlobscanClient for BlobscanClient {
9397
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
9498
}
9599

96-
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
97-
let url = self.base_url.join("indexer/reorged-slots")?;
100+
async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>> {
101+
let url = self.base_url.join(&format!("slots/{}", slot))?;
102+
103+
json_get!(&self.client, url, BlobscanBlock, self.exp_backoff.clone())
104+
}
105+
106+
async fn handle_reorg(
107+
&self,
108+
rewinded_blocks: Vec<B256>,
109+
forwarded_blocks: Vec<B256>,
110+
) -> ClientResult<()> {
111+
let url = self.base_url.join("indexer/reorged-blocks")?;
98112
let token = self.jwt_manager.get_token()?;
99-
let req = ReorgedSlotsRequest {
100-
reorged_slots: slots.to_owned(),
113+
114+
let req = ReorgedBlocksRequestBody {
115+
forwarded_blocks,
116+
rewinded_blocks,
101117
};
102118

103-
json_put!(&self.client, url, ReorgedSlotsResponse, token, &req)
104-
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
119+
json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req).map(|_| ())
105120
}
106121

107122
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {

src/clients/blobscan/types.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ use serde::{Deserialize, Serialize};
88

99
use crate::{clients::beacon::types::Blob as BeaconBlob, utils::web3::calculate_versioned_hash};
1010

11+
#[derive(Serialize, Deserialize, Debug)]
12+
pub struct BlobscanBlock {
13+
pub hash: B256,
14+
pub number: u32,
15+
pub slot: u32,
16+
}
17+
1118
#[derive(Serialize, Deserialize, Debug)]
1219
#[serde(rename_all = "camelCase")]
1320
pub struct Block {
@@ -86,16 +93,11 @@ pub struct IndexRequest {
8693
pub blobs: Vec<Blob>,
8794
}
8895

89-
#[derive(Serialize, Debug)]
90-
#[serde(rename_all = "camelCase")]
91-
pub struct ReorgedSlotsRequest {
92-
pub reorged_slots: Vec<u32>,
93-
}
94-
95-
#[derive(Deserialize, Debug)]
96+
#[derive(Deserialize, Serialize, Debug)]
9697
#[serde(rename_all = "camelCase")]
97-
pub struct ReorgedSlotsResponse {
98-
pub total_updated_slots: u32,
98+
pub struct ReorgedBlocksRequestBody {
99+
pub forwarded_blocks: Vec<B256>,
100+
pub rewinded_blocks: Vec<B256>,
99101
}
100102

101103
impl fmt::Debug for Blob {

src/indexer/error.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub enum IndexerError {
1616
#[error(transparent)]
1717
SyncingTaskError(#[from] IndexingError),
1818
#[error("failed to retrieve blobscan's sync state")]
19-
BlobscanSyncStateRetrievalError(#[source] ClientError),
19+
BlobscanSyncStateRetrievalError(#[from] ClientError),
2020
#[error("failed to send syncing task message")]
2121
SyncingTaskMessageSendFailure(#[from] SendError<IndexerTaskMessage>),
2222
}

src/indexer/event_handlers/finalized_checkpoint.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ use tracing::info;
33

44
use crate::{
55
clients::{
6-
beacon::types::{BlockId, FinalizedCheckpointEventData},
7-
blobscan::types::BlockchainSyncState,
6+
beacon::types::FinalizedCheckpointEventData, blobscan::types::BlockchainSyncState,
87
common::ClientError,
98
},
109
context::CommonContext,
@@ -46,7 +45,7 @@ where
4645
let last_finalized_block_number = match self
4746
.context
4847
.beacon_client()
49-
.get_block(&BlockId::Hash(block_hash))
48+
.get_block(block_hash.into())
5049
.await
5150
.map_err(|err| {
5251
FinalizedCheckpointEventHandlerError::BlockRetrievalError(

0 commit comments

Comments
 (0)