Skip to content

Commit b9b4e57

Browse files
committed
fix: fix: process skipped blocks in the new canonical chain after a reorg, if any exist
1 parent b6947ef commit b9b4e57

File tree

5 files changed

+189
-81
lines changed

5 files changed

+189
-81
lines changed

src/clients/beacon/types.rs

+44
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ use std::{fmt, str::FromStr};
33
use alloy::primitives::{Bytes, B256};
44
use serde::{Deserialize, Serialize};
55

6+
use crate::clients::common::ClientError;
7+
8+
use super::CommonBeaconClient;
9+
610
#[derive(Serialize, Debug, Clone, PartialEq)]
711
pub enum BlockId {
812
Head,
@@ -189,3 +193,43 @@ impl From<BlockHeaderResponse> for BlockHeader {
189193
}
190194
}
191195
}
196+
197+
#[derive(Debug, thiserror::Error)]
198+
pub enum BlockIdResolutionError {
199+
#[error("Block with id '{0}' not found")]
200+
BlockNotFound(BlockId),
201+
#[error("Failed to resolve block id '{block_id}'")]
202+
FailedBlockIdResolution {
203+
block_id: BlockId,
204+
#[source]
205+
error: ClientError,
206+
},
207+
}
208+
209+
pub trait BlockIdResolution {
210+
async fn resolve_to_slot(
211+
&self,
212+
beacon_client: &dyn CommonBeaconClient,
213+
) -> Result<u32, BlockIdResolutionError>;
214+
}
215+
216+
impl BlockIdResolution for BlockId {
217+
async fn resolve_to_slot(
218+
&self,
219+
beacon_client: &dyn CommonBeaconClient,
220+
) -> Result<u32, BlockIdResolutionError> {
221+
match self {
222+
BlockId::Slot(slot) => Ok(*slot),
223+
_ => match beacon_client
224+
.get_block_header(self.clone().into())
225+
.await
226+
.map_err(|err| BlockIdResolutionError::FailedBlockIdResolution {
227+
block_id: self.clone(),
228+
error: err,
229+
})? {
230+
Some(header) => Ok(header.slot),
231+
None => Err(BlockIdResolutionError::BlockNotFound(self.clone())),
232+
},
233+
}
234+
}
235+
}

src/slots_processor/error.rs

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
use crate::clients::common::ClientError;
1+
use alloy::primitives::B256;
2+
3+
use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError};
24

35
#[derive(Debug, thiserror::Error)]
46
pub enum SlotProcessingError {
57
#[error(transparent)]
68
ClientError(#[from] crate::clients::common::ClientError),
79
#[error(transparent)]
810
Provider(#[from] alloy::transports::TransportError),
9-
1011
#[error(transparent)]
1112
Other(#[from] anyhow::Error),
1213
}
@@ -22,8 +23,19 @@ pub enum SlotsProcessorError {
2223
failed_slot: u32,
2324
error: SlotProcessingError,
2425
},
26+
#[error("Failed to process reorg. old slot {old_slot}, new slot {new_slot}, new head block root {new_head_block_root}, old head block root {old_head_block_root}")]
27+
FailedReorgProcessing {
28+
old_slot: u32,
29+
new_slot: u32,
30+
new_head_block_root: B256,
31+
old_head_block_root: B256,
32+
#[source]
33+
error: anyhow::Error,
34+
},
2535
#[error("Failed to handle reorged slots")]
2636
ReorgedFailure(#[from] ClientError),
37+
#[error("Failed to handle forwarded blocks")]
38+
ForwardedBlocksFailure(#[from] SynchronizerError),
2739
#[error(transparent)]
2840
Other(#[from] anyhow::Error),
2941
}

src/slots_processor/mod.rs

+113-40
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ use alloy::{
33
};
44
use anyhow::{anyhow, Context as AnyhowContext, Result};
55

6+
use crate::clients::beacon::types::BlockHeader;
67
use tracing::{debug, info};
78

89
use crate::{
910
clients::{
10-
beacon::types::BlockHeader,
1111
blobscan::types::{Blob, BlobscanBlock, Block, Transaction},
1212
common::ClientError,
1313
},
@@ -20,6 +20,23 @@ use self::helpers::{create_tx_hash_versioned_hashes_mapping, create_versioned_ha
2020
pub mod error;
2121
mod helpers;
2222

23+
pub struct BlockData {
24+
pub root: B256,
25+
pub parent_root: B256,
26+
pub slot: u32,
27+
pub execution_block_hash: B256,
28+
}
29+
30+
impl From<&BlockData> for BlockHeader {
31+
fn from(block: &BlockData) -> Self {
32+
BlockHeader {
33+
root: block.root,
34+
parent_root: block.parent_root,
35+
slot: block.slot,
36+
}
37+
}
38+
}
39+
2340
pub struct SlotsProcessor<T> {
2441
context: Box<dyn CommonContext<T>>,
2542
pub last_processed_block: Option<BlockHeader>,
@@ -68,7 +85,13 @@ impl SlotsProcessor<ReqwestTransport> {
6885
if prev_block_header.root != block_header.parent_root {
6986
self.process_reorg(&prev_block_header, &block_header)
7087
.await
71-
.map_err(|err| SlotsProcessorError::ReorgedFailure(err))?;
88+
.map_err(|error| SlotsProcessorError::FailedReorgProcessing {
89+
old_slot: prev_block_header.slot,
90+
new_slot: block_header.slot,
91+
new_head_block_root: block_header.root,
92+
old_head_block_root: prev_block_header.root,
93+
error,
94+
})?;
7295
}
7396
}
7497

@@ -85,25 +108,40 @@ impl SlotsProcessor<ReqwestTransport> {
85108
&mut self,
86109
slot: u32,
87110
) -> Result<Option<BlockHeader>, SlotProcessingError> {
88-
let beacon_client = self.context.beacon_client();
89-
let blobscan_client = self.context.blobscan_client();
90-
let provider = self.context.provider();
91-
92-
let beacon_block_header = Some(match beacon_client.get_block_header(slot.into()).await? {
111+
let beacon_block_header = match self
112+
.context
113+
.beacon_client()
114+
.get_block_header(slot.into())
115+
.await?
116+
{
93117
Some(header) => header,
94118
None => {
95119
debug!(slot, "Skipping as there is no beacon block header");
96120

97121
return Ok(None);
98122
}
99-
});
123+
};
124+
125+
self.process_block(&beacon_block_header).await?;
126+
127+
Ok(Some(beacon_block_header))
128+
}
129+
130+
async fn process_block(
131+
&self,
132+
beacon_block_header: &BlockHeader,
133+
) -> Result<(), SlotProcessingError> {
134+
let beacon_client = self.context.beacon_client();
135+
let blobscan_client = self.context.blobscan_client();
136+
let provider = self.context.provider();
137+
let slot = beacon_block_header.slot;
100138

101139
let beacon_block = match beacon_client.get_block(slot.into()).await? {
102140
Some(block) => block,
103141
None => {
104142
debug!(slot = slot, "Skipping as there is no beacon block");
105143

106-
return Ok(None);
144+
return Ok(());
107145
}
108146
};
109147

@@ -115,7 +153,7 @@ impl SlotsProcessor<ReqwestTransport> {
115153
"Skipping as beacon block doesn't contain execution payload"
116154
);
117155

118-
return Ok(beacon_block_header);
156+
return Ok(());
119157
}
120158
};
121159

@@ -130,7 +168,7 @@ impl SlotsProcessor<ReqwestTransport> {
130168
"Skipping as beacon block doesn't contain blob kzg commitments"
131169
);
132170

133-
return Ok(beacon_block_header);
171+
return Ok(());
134172
}
135173

136174
let execution_block_hash = execution_payload.block_hash;
@@ -160,15 +198,15 @@ impl SlotsProcessor<ReqwestTransport> {
160198
if blobs.is_empty() {
161199
debug!(slot, "Skipping as blobs sidecar is empty");
162200

163-
return Ok(beacon_block_header);
201+
return Ok(());
164202
} else {
165203
blobs
166204
}
167205
}
168206
None => {
169207
debug!(slot, "Skipping as there is no blobs sidecar");
170208

171-
return Ok(beacon_block_header);
209+
return Ok(());
172210
}
173211
};
174212

@@ -217,19 +255,22 @@ impl SlotsProcessor<ReqwestTransport> {
217255

218256
info!(slot, block_number, "Block indexed successfully");
219257

220-
Ok(beacon_block_header)
258+
Ok(())
221259
}
222260

261+
/// Handles reorgs by rewinding the blobscan blocks to the common ancestor and forwarding to the new head.
223262
async fn process_reorg(
224263
&mut self,
225264
old_head_header: &BlockHeader,
226265
new_head_header: &BlockHeader,
227-
) -> Result<(), ClientError> {
266+
) -> Result<(), anyhow::Error> {
228267
let mut current_old_slot = old_head_header.slot;
229268

230-
let mut rewinded_execution_blocks: Vec<B256> = vec![];
269+
let mut rewinded_blocks: Vec<B256> = vec![];
231270

232271
loop {
272+
// We iterate over blocks by slot and not block root as blobscan blocks don't
273+
// have parent root we can use to traverse the chain
233274
let old_blobscan_block = match self
234275
.context
235276
.blobscan_client()
@@ -240,56 +281,77 @@ impl SlotsProcessor<ReqwestTransport> {
240281
None => {
241282
current_old_slot -= 1;
242283

284+
// TODO: use fork slot instead of 0 as a stop condition to avoid long loops
243285
if current_old_slot == 0 {
244-
return Err(anyhow!(
245-
"No blobscan block found for old head slot {}",
246-
old_head_header.slot
247-
)
248-
.into());
286+
return Err(anyhow!("No common block found").into());
249287
}
250288

251289
continue;
252290
}
253291
};
254292

255-
let forwarded_execution_blocks = self
256-
.get_canonical_execution_blocks(new_head_header.root, &old_blobscan_block)
293+
let canonical_block_path = self
294+
.get_canonical_block_path(&old_blobscan_block, new_head_header.root)
257295
.await?;
258-
259-
rewinded_execution_blocks.push(old_blobscan_block.hash);
260-
261-
if !forwarded_execution_blocks.is_empty() {
262-
let rewinded_blocks_count = rewinded_execution_blocks.len();
263-
let forwarded_blocks_count = forwarded_execution_blocks.len();
296+
let canonical_block_path = canonical_block_path.into_iter().rev().collect::<Vec<_>>();
297+
298+
// If a path exists, we've found the common ancient block
299+
// and can proceed with handling the reorg.
300+
if !canonical_block_path.is_empty() {
301+
let rewinded_blocks_count = rewinded_blocks.len();
302+
let forwarded_blocks_count = canonical_block_path.len();
303+
304+
let canonical_block_headers: Vec<BlockHeader> = canonical_block_path
305+
.iter()
306+
.map(|block| block.into())
307+
.collect::<Vec<_>>();
308+
309+
// If the new canonical block path includes blocks beyond the new head block,
310+
// they were skipped and must be processed.
311+
for block in canonical_block_headers.iter() {
312+
if block.slot != new_head_header.slot {
313+
self.process_block(block)
314+
.await
315+
.with_context(|| format!("Failed to sync forwarded block"))?;
316+
}
317+
}
264318

265319
info!(
266320
new_slot = new_head_header.slot,
267321
old_slot = old_head_header.slot,
268322
"Reorg detected! rewinded blocks: {rewinded_blocks_count}, forwarded blocks: {forwarded_blocks_count}",
269323
);
324+
325+
let forwarded_blocks = canonical_block_path
326+
.iter()
327+
.map(|block| block.execution_block_hash)
328+
.collect::<Vec<_>>();
329+
270330
self.context
271331
.blobscan_client()
272-
.handle_reorg(rewinded_execution_blocks, forwarded_execution_blocks)
332+
.handle_reorg(rewinded_blocks, forwarded_blocks)
273333
.await?;
274334

275335
return Ok(());
276336
}
337+
338+
rewinded_blocks.push(old_blobscan_block.hash);
277339
}
278340
}
279341

280-
async fn get_canonical_execution_blocks(
342+
/// Returns the path of blocks with execution payload from the head block to the provided block.
343+
async fn get_canonical_block_path(
281344
&mut self,
282-
canonical_block_root: B256,
283345
blobscan_block: &BlobscanBlock,
284-
) -> Result<Vec<B256>, ClientError> {
346+
head_block_root: B256,
347+
) -> Result<Vec<BlockData>, ClientError> {
285348
let beacon_client = self.context.beacon_client();
286-
let mut canonical_execution_blocks: Vec<B256> = vec![];
349+
let mut canonical_execution_blocks: Vec<BlockData> = vec![];
287350

288-
let mut canonical_block = match beacon_client.get_block(canonical_block_root.into()).await?
289-
{
351+
let mut canonical_block = match beacon_client.get_block(head_block_root.into()).await? {
290352
Some(block) => block,
291353
None => {
292-
return Ok(canonical_execution_blocks);
354+
return Ok(vec![]);
293355
}
294356
};
295357

@@ -299,28 +361,39 @@ impl SlotsProcessor<ReqwestTransport> {
299361
}
300362
}
301363

364+
let mut current_canonical_block_root = head_block_root;
365+
302366
while canonical_block.message.parent_root != B256::ZERO {
367+
let canonical_block_parent_root = canonical_block.message.parent_root;
368+
303369
if canonical_block.message.slot < blobscan_block.slot {
304370
return Ok(vec![]);
305371
}
306372

307-
if let Some(execution_payload) = canonical_block.message.body.execution_payload {
373+
if let Some(execution_payload) = &canonical_block.message.body.execution_payload {
308374
if execution_payload.block_hash == blobscan_block.hash {
309375
return Ok(canonical_execution_blocks);
310376
}
311377

312-
canonical_execution_blocks.push(execution_payload.block_hash);
378+
canonical_execution_blocks.push(BlockData {
379+
root: current_canonical_block_root,
380+
parent_root: canonical_block_parent_root,
381+
slot: canonical_block.message.slot,
382+
execution_block_hash: execution_payload.block_hash,
383+
});
313384
}
314385

315386
canonical_block = match beacon_client
316-
.get_block(canonical_block.message.parent_root.into())
387+
.get_block(canonical_block_parent_root.into())
317388
.await?
318389
{
319390
Some(block) => block,
320391
None => {
321392
return Ok(canonical_execution_blocks);
322393
}
323394
};
395+
396+
current_canonical_block_root = canonical_block_parent_root;
324397
}
325398

326399
Ok(vec![])

0 commit comments

Comments
 (0)