Skip to content

Commit 1203e5c

Browse files
committed
feat(indexer): index finalized checkpoint blocks
1 parent 9718d5d commit 1203e5c

File tree

5 files changed

+84
-28
lines changed

5 files changed

+84
-28
lines changed

src/clients/beacon/types.rs

+13-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub enum BlockId {
1010
Head,
1111
Finalized,
1212
Slot(u32),
13+
Hash(H256),
1314
}
1415

1516
#[derive(Serialize, Debug)]
@@ -21,6 +22,8 @@ pub enum Topic {
2122
#[derive(Deserialize, Debug)]
2223
pub struct ExecutionPayload {
2324
pub block_hash: H256,
25+
#[serde(deserialize_with = "deserialize_number")]
26+
pub block_number: u32,
2427
}
2528

2629
#[derive(Deserialize, Debug)]
@@ -30,7 +33,7 @@ pub struct BlockBody {
3033
}
3134
#[derive(Deserialize, Debug)]
3235
pub struct BlockMessage {
33-
#[serde(deserialize_with = "deserialize_slot")]
36+
#[serde(deserialize_with = "deserialize_number")]
3437
pub slot: u32,
3538
pub body: BlockBody,
3639
pub parent_root: H256,
@@ -76,18 +79,23 @@ pub struct InnerBlockHeader {
7679
#[derive(Deserialize, Debug)]
7780
pub struct BlockHeaderMessage {
7881
pub parent_root: H256,
79-
#[serde(deserialize_with = "deserialize_slot")]
82+
#[serde(deserialize_with = "deserialize_number")]
8083
pub slot: u32,
8184
}
8285

8386
#[derive(Deserialize, Debug)]
8487
pub struct HeadBlockEventData {
85-
#[serde(deserialize_with = "deserialize_slot")]
88+
#[serde(deserialize_with = "deserialize_number")]
8689
pub slot: u32,
8790
pub block: H256,
8891
}
8992

90-
fn deserialize_slot<'de, D>(deserializer: D) -> Result<u32, D::Error>
93+
#[derive(Deserialize, Debug)]
94+
pub struct FinalizedCheckpointEventData {
95+
pub block: H256,
96+
}
97+
98+
fn deserialize_number<'de, D>(deserializer: D) -> Result<u32, D::Error>
9199
where
92100
D: serde::Deserializer<'de>,
93101
{
@@ -102,6 +110,7 @@ impl fmt::Display for BlockId {
102110
BlockId::Head => write!(f, "head"),
103111
BlockId::Finalized => write!(f, "finalized"),
104112
BlockId::Slot(slot) => write!(f, "{}", slot),
113+
BlockId::Hash(hash) => write!(f, "{}", hash),
105114
}
106115
}
107116
}

src/clients/blobscan/types.rs

+5
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ pub struct BlockchainSyncStateRequest {
5757
pub last_lower_synced_slot: Option<u32>,
5858
#[serde(default, skip_serializing_if = "Option::is_none")]
5959
pub last_upper_synced_slot: Option<u32>,
60+
#[serde(default, skip_serializing_if = "Option::is_none")]
61+
pub last_finalized_block: Option<u32>,
6062
}
6163

6264
#[derive(Deserialize, Debug)]
@@ -70,6 +72,7 @@ pub struct BlockchainSyncStateResponse {
7072

7173
#[derive(Debug)]
7274
pub struct BlockchainSyncState {
75+
pub last_finalized_block: Option<u32>,
7376
pub last_lower_synced_slot: Option<u32>,
7477
pub last_upper_synced_slot: Option<u32>,
7578
}
@@ -232,6 +235,7 @@ impl<'a> From<(&'a BeaconBlob, &'a H256, usize, &'a H256)> for Blob {
232235
impl From<BlockchainSyncStateResponse> for BlockchainSyncState {
233236
fn from(response: BlockchainSyncStateResponse) -> Self {
234237
Self {
238+
last_finalized_block: None,
235239
last_lower_synced_slot: response.last_lower_synced_slot,
236240
last_upper_synced_slot: response.last_upper_synced_slot,
237241
}
@@ -243,6 +247,7 @@ impl From<BlockchainSyncState> for BlockchainSyncStateRequest {
243247
Self {
244248
last_lower_synced_slot: sync_state.last_lower_synced_slot,
245249
last_upper_synced_slot: sync_state.last_upper_synced_slot,
250+
last_finalized_block: sync_state.last_finalized_block,
246251
}
247252
}
248253
}

src/indexer/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ pub enum IndexerError {
1212
SynchronizerError(#[from] SynchronizerError),
1313
#[error("{0}")]
1414
SerdeError(#[from] serde_json::Error),
15+
#[error("Unexpected event \"{event}\" received")]
16+
UnexpectedEvent { event: String },
1517
}
1618

1719
#[derive(Debug, thiserror::Error)]

src/indexer/mod.rs

+63-24
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::thread;
22

3-
use anyhow::anyhow;
3+
use anyhow::{anyhow, Context as AnyhowContext};
44
use futures::StreamExt;
55
use reqwest_eventsource::Event;
66
use tokio::{sync::mpsc, task::JoinHandle};
@@ -9,7 +9,7 @@ use tracing::{debug, error, info};
99
use crate::{
1010
args::Args,
1111
clients::{
12-
beacon::types::{BlockId, HeadBlockEventData, Topic},
12+
beacon::types::{BlockId, FinalizedCheckpointEventData, HeadBlockEventData, Topic},
1313
blobscan::types::BlockchainSyncState,
1414
},
1515
context::{Config as ContextConfig, Context},
@@ -150,38 +150,77 @@ impl Indexer {
150150

151151
tokio::spawn(async move {
152152
let result: Result<(), IndexerError> = async {
153+
let beacon_client = task_context.beacon_client();
153154
let blobscan_client = task_context.blobscan_client();
154155
let mut event_source = task_context
155156
.beacon_client()
156-
.subscribe_to_events(vec![Topic::Head])?;
157+
.subscribe_to_events(vec![Topic::Head, Topic::FinalizedCheckpoint])?;
157158
let mut is_initial_sync_to_head = true;
158159

159160
while let Some(event) = event_source.next().await {
160161
match event {
161162
Ok(Event::Open) => {
162-
debug!(target = "indexer", "Listening for head block events…")
163+
debug!(target = "indexer", "Listening for head and finalized block events…")
163164
}
164165
Ok(Event::Message(event)) => {
165-
let head_block_data =
166-
serde_json::from_str::<HeadBlockEventData>(&event.data)?;
167-
168-
let head_block_id = &BlockId::Slot(head_block_data.slot);
169-
let initial_block_id = if is_initial_sync_to_head {
170-
is_initial_sync_to_head = false;
171-
&start_block_id
172-
} else {
173-
head_block_id
174-
};
175-
176-
synchronizer.run(initial_block_id, head_block_id).await?;
177-
178-
blobscan_client
179-
.update_sync_state(BlockchainSyncState {
180-
last_lower_synced_slot: None,
181-
last_upper_synced_slot: Some(head_block_data.slot),
182-
})
183-
.await?;
184-
}
166+
match event.event.as_str() {
167+
"head" => {
168+
let head_block_data =
169+
serde_json::from_str::<HeadBlockEventData>(&event.data)?;
170+
171+
let head_block_id = &BlockId::Slot(head_block_data.slot);
172+
let initial_block_id = if is_initial_sync_to_head {
173+
is_initial_sync_to_head = false;
174+
&start_block_id
175+
} else {
176+
head_block_id
177+
};
178+
179+
synchronizer.run(initial_block_id, head_block_id).await?;
180+
181+
blobscan_client
182+
.update_sync_state(BlockchainSyncState {
183+
last_finalized_block: None,
184+
last_lower_synced_slot: None,
185+
last_upper_synced_slot: Some(head_block_data.slot),
186+
})
187+
.await?;
188+
}
189+
"finalized_checkpoint" => {
190+
let finalized_checkpoint_data =
191+
serde_json::from_str::<FinalizedCheckpointEventData>(
192+
&event.data,
193+
)?;
194+
let block_hash = finalized_checkpoint_data.block;
195+
let full_block_hash = format!("0x{:x}", block_hash);
196+
let last_finalized_block_number = beacon_client
197+
.get_block(&BlockId::Hash(block_hash))
198+
.await?
199+
.with_context(|| {
200+
anyhow!("Finalized block with hash {full_block_hash} not found")
201+
})?
202+
.message.body.execution_payload
203+
.with_context(|| {
204+
anyhow!("Finalized block with hash {full_block_hash} has no execution payload")
205+
})?.block_number;
206+
207+
blobscan_client
208+
.update_sync_state(BlockchainSyncState {
209+
last_lower_synced_slot: None,
210+
last_upper_synced_slot: None,
211+
last_finalized_block: Some(
212+
last_finalized_block_number
213+
),
214+
})
215+
.await?;
216+
217+
info!(target = "indexer", "Finalized block {full_block_hash} detected and stored");
218+
},
219+
unexpected_event_id => {
220+
return Err(IndexerError::UnexpectedEvent { event: unexpected_event_id.to_string() })
221+
}
222+
}
223+
},
185224
Err(error) => {
186225
event_source.close();
187226

src/synchronizer/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ impl Synchronizer {
217217
.context
218218
.blobscan_client()
219219
.update_sync_state(BlockchainSyncState {
220+
last_finalized_block: None,
220221
last_lower_synced_slot,
221222
last_upper_synced_slot,
222223
})

0 commit comments

Comments
 (0)