diff --git a/src/clients/beacon/mod.rs b/src/clients/beacon/mod.rs index cb1830f..ea52ddc 100644 --- a/src/clients/beacon/mod.rs +++ b/src/clients/beacon/mod.rs @@ -38,6 +38,12 @@ impl BeaconClient { } pub async fn get_block(&self, block_id: &BlockId) -> ClientResult> { + let block_id = match block_id { + BlockId::Hash(hash) => format!("0x{:x}", hash), + BlockId::Slot(slot) => slot.to_string(), + block_id => block_id.to_string(), + }; + let path = format!("v2/beacon/blocks/{block_id}"); let url = self.base_url.join(path.as_str())?; @@ -78,7 +84,7 @@ impl BeaconClient { .iter() .map(|topic| topic.into()) .collect::>() - .join("&"); + .join(","); let path = format!("v1/events?topics={topics}"); let url = self.base_url.join(&path)?; diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index a328e53..8f2f54a 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -10,6 +10,7 @@ pub enum BlockId { Head, Finalized, Slot(u32), + Hash(H256), } #[derive(Serialize, Debug)] @@ -21,6 +22,8 @@ pub enum Topic { #[derive(Deserialize, Debug)] pub struct ExecutionPayload { pub block_hash: H256, + #[serde(deserialize_with = "deserialize_number")] + pub block_number: u32, } #[derive(Deserialize, Debug)] @@ -30,7 +33,7 @@ pub struct BlockBody { } #[derive(Deserialize, Debug)] pub struct BlockMessage { - #[serde(deserialize_with = "deserialize_slot")] + #[serde(deserialize_with = "deserialize_number")] pub slot: u32, pub body: BlockBody, pub parent_root: H256, @@ -76,24 +79,29 @@ pub struct InnerBlockHeader { #[derive(Deserialize, Debug)] pub struct BlockHeaderMessage { pub parent_root: H256, - #[serde(deserialize_with = "deserialize_slot")] + #[serde(deserialize_with = "deserialize_number")] pub slot: u32, } #[derive(Deserialize, Debug)] pub struct HeadBlockEventData { - #[serde(deserialize_with = "deserialize_slot")] + #[serde(deserialize_with = "deserialize_number")] pub slot: u32, pub block: H256, } -fn deserialize_slot<'de, D>(deserializer: D) -> Result +#[derive(Deserialize, Debug)] +pub struct FinalizedCheckpointEventData { + pub block: H256, +} + +fn deserialize_number<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>, { - let slot = String::deserialize(deserializer)?; + let value = String::deserialize(deserializer)?; - slot.parse::().map_err(serde::de::Error::custom) + value.parse::().map_err(serde::de::Error::custom) } impl fmt::Display for BlockId { @@ -102,6 +110,7 @@ impl fmt::Display for BlockId { BlockId::Head => write!(f, "head"), BlockId::Finalized => write!(f, "finalized"), BlockId::Slot(slot) => write!(f, "{}", slot), + BlockId::Hash(hash) => write!(f, "{}", hash), } } } diff --git a/src/clients/blobscan/types.rs b/src/clients/blobscan/types.rs index 2c833aa..b4e16a0 100644 --- a/src/clients/blobscan/types.rs +++ b/src/clients/blobscan/types.rs @@ -57,6 +57,8 @@ pub struct BlockchainSyncStateRequest { pub last_lower_synced_slot: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub last_upper_synced_slot: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_finalized_block: Option, } #[derive(Deserialize, Debug)] @@ -70,6 +72,7 @@ pub struct BlockchainSyncStateResponse { #[derive(Debug)] pub struct BlockchainSyncState { + pub last_finalized_block: Option, pub last_lower_synced_slot: Option, pub last_upper_synced_slot: Option, } @@ -232,6 +235,7 @@ impl<'a> From<(&'a BeaconBlob, &'a H256, usize, &'a H256)> for Blob { impl From for BlockchainSyncState { fn from(response: BlockchainSyncStateResponse) -> Self { Self { + last_finalized_block: None, last_lower_synced_slot: response.last_lower_synced_slot, last_upper_synced_slot: response.last_upper_synced_slot, } @@ -243,6 +247,7 @@ impl From for BlockchainSyncStateRequest { Self { last_lower_synced_slot: sync_state.last_lower_synced_slot, last_upper_synced_slot: sync_state.last_upper_synced_slot, + last_finalized_block: sync_state.last_finalized_block, } } } diff --git a/src/indexer/error.rs b/src/indexer/error.rs index 605e9f6..8e52b15 100644 --- a/src/indexer/error.rs +++ b/src/indexer/error.rs @@ -12,6 +12,8 @@ pub enum IndexerError { SynchronizerError(#[from] SynchronizerError), #[error("{0}")] SerdeError(#[from] serde_json::Error), + #[error("Unexpected event \"{event}\" received")] + UnexpectedEvent { event: String }, } #[derive(Debug, thiserror::Error)] diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 77384cc..0b22bb2 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,6 +1,6 @@ use std::thread; -use anyhow::anyhow; +use anyhow::{anyhow, Context as AnyhowContext}; use futures::StreamExt; use reqwest_eventsource::Event; use tokio::{sync::mpsc, task::JoinHandle}; @@ -9,7 +9,7 @@ use tracing::{debug, error, info}; use crate::{ args::Args, clients::{ - beacon::types::{BlockId, HeadBlockEventData, Topic}, + beacon::types::{BlockId, FinalizedCheckpointEventData, HeadBlockEventData, Topic}, blobscan::types::BlockchainSyncState, }, context::{Config as ContextConfig, Context}, @@ -150,38 +150,77 @@ impl Indexer { tokio::spawn(async move { let result: Result<(), IndexerError> = async { + let beacon_client = task_context.beacon_client(); let blobscan_client = task_context.blobscan_client(); let mut event_source = task_context .beacon_client() - .subscribe_to_events(vec![Topic::Head])?; + .subscribe_to_events(vec![Topic::Head, Topic::FinalizedCheckpoint])?; let mut is_initial_sync_to_head = true; while let Some(event) = event_source.next().await { match event { Ok(Event::Open) => { - debug!(target = "indexer", "Listening for head block events…") + debug!(target = "indexer", "Listening for head and finalized block events…") } Ok(Event::Message(event)) => { - let head_block_data = - serde_json::from_str::(&event.data)?; - - let head_block_id = &BlockId::Slot(head_block_data.slot); - let initial_block_id = if is_initial_sync_to_head { - is_initial_sync_to_head = false; - &start_block_id - } else { - head_block_id - }; - - synchronizer.run(initial_block_id, head_block_id).await?; - - blobscan_client - .update_sync_state(BlockchainSyncState { - last_lower_synced_slot: None, - last_upper_synced_slot: Some(head_block_data.slot), - }) - .await?; - } + match event.event.as_str() { + "head" => { + let head_block_data = + serde_json::from_str::(&event.data)?; + + let head_block_id = &BlockId::Slot(head_block_data.slot); + let initial_block_id = if is_initial_sync_to_head { + is_initial_sync_to_head = false; + &start_block_id + } else { + head_block_id + }; + + synchronizer.run(initial_block_id, head_block_id).await?; + + blobscan_client + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot: None, + last_upper_synced_slot: Some(head_block_data.slot), + }) + .await?; + } + "finalized_checkpoint" => { + let finalized_checkpoint_data = + serde_json::from_str::( + &event.data, + )?; + let block_hash = finalized_checkpoint_data.block; + let full_block_hash = format!("0x{:x}", block_hash); + let last_finalized_block_number = beacon_client + .get_block(&BlockId::Hash(block_hash)) + .await? + .with_context(|| { + anyhow!("Finalized block with hash {full_block_hash} not found") + })? + .message.body.execution_payload + .with_context(|| { + anyhow!("Finalized block with hash {full_block_hash} has no execution payload") + })?.block_number; + + blobscan_client + .update_sync_state(BlockchainSyncState { + last_lower_synced_slot: None, + last_upper_synced_slot: None, + last_finalized_block: Some( + last_finalized_block_number + ), + }) + .await?; + + info!(target = "indexer", "Finalized block {full_block_hash} detected and stored"); + }, + unexpected_event_id => { + return Err(IndexerError::UnexpectedEvent { event: unexpected_event_id.to_string() }) + } + } + }, Err(error) => { event_source.close(); diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index f71617f..03bbe49 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -217,6 +217,7 @@ impl Synchronizer { .context .blobscan_client() .update_sync_state(BlockchainSyncState { + last_finalized_block: None, last_lower_synced_slot, last_upper_synced_slot, })