Skip to content

refactor: refactor chain reorgs handling #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.66"
async-trait = "0.1.80"
dyn-clone = "1.0.17"
dotenv = "0.15.0"
envy = "0.4.2"
ethers = "1.0.2"
Expand Down Expand Up @@ -34,3 +35,6 @@ anyhow = { version = "1.0.70", features = ["backtrace"] }
thiserror = "1.0.40"
sentry = { version = "0.31.2", features = ["debug-images"] }
sentry-tracing = "0.31.2"

[dev-dependencies]
mockall = "0.12.1"
Empty file removed src/clients/beacon/errors.rs
Empty file.
27 changes: 23 additions & 4 deletions src/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use std::fmt::Debug;

use anyhow::Context as AnyhowContext;
use async_trait::async_trait;
use backoff::ExponentialBackoff;

use reqwest::{Client, Url};
use reqwest_eventsource::EventSource;

#[cfg(test)]
use mockall::automock;

use crate::{
clients::{beacon::types::BlockHeaderResponse, common::ClientResult},
json_get,
Expand All @@ -24,6 +31,15 @@ pub struct Config {
pub exp_backoff: Option<ExponentialBackoff>,
}

#[async_trait]
#[cfg_attr(test, automock)]
pub trait CommonBeaconClient: Send + Sync + Debug {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>>;
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>>;
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>>;
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource>;
}

impl BeaconClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/eth/", config.base_url))
Expand All @@ -36,8 +52,11 @@ impl BeaconClient {
exp_backoff,
})
}
}

pub async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
#[async_trait]
impl CommonBeaconClient for BeaconClient {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -47,7 +66,7 @@ impl BeaconClient {
})
}

pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -63,7 +82,7 @@ impl BeaconClient {
})
}

pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{}", {
block_id.to_detailed_string()
});
Expand All @@ -75,7 +94,7 @@ impl BeaconClient {
})
}

pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
let topics = topics
.iter()
.map(|topic| topic.into())
Expand Down
13 changes: 1 addition & 12 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt, str::FromStr};
use ethers::types::{Bytes, H256};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Debug, Clone)]
#[derive(Serialize, Debug, Clone, PartialEq)]
pub enum BlockId {
Head,
Finalized,
Expand All @@ -16,7 +16,6 @@ pub enum BlockId {
pub enum Topic {
Head,
FinalizedCheckpoint,
ChainReorg,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -80,15 +79,6 @@ pub struct BlockHeaderMessage {
pub slot: u32,
}

#[derive(Deserialize, Debug)]
pub struct ChainReorgEventData {
pub old_head_block: H256,
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
#[serde(deserialize_with = "deserialize_number")]
pub depth: u32,
}

#[derive(Deserialize, Debug)]
pub struct HeadEventData {
#[serde(deserialize_with = "deserialize_number")]
Expand Down Expand Up @@ -161,7 +151,6 @@ impl FromStr for BlockId {
impl From<&Topic> for String {
fn from(value: &Topic) -> Self {
match value {
Topic::ChainReorg => String::from("chain_reorg"),
Topic::Head => String::from("head"),
Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"),
}
Expand Down
38 changes: 32 additions & 6 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::fmt::Debug;

use async_trait::async_trait;
use backoff::ExponentialBackoff;
use chrono::TimeDelta;
use reqwest::{Client, Url};

#[cfg(test)]
use mockall::automock;

use crate::{
clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult},
json_get, json_put,
Expand All @@ -18,6 +24,24 @@ use self::{
mod jwt_manager;

pub mod types;

#[async_trait]
#[cfg_attr(test, automock)]
pub trait CommonBlobscanClient: Send + Sync + Debug {
fn try_with_client(client: Client, config: Config) -> ClientResult<Self>
where
Self: Sized;
async fn index(
&self,
block: Block,
transactions: Vec<Transaction>,
blobs: Vec<Blob>,
) -> ClientResult<()>;
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32>;
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
}

#[derive(Debug, Clone)]
pub struct BlobscanClient {
base_url: Url,
Expand All @@ -32,8 +56,10 @@ pub struct Config {
pub exp_backoff: Option<ExponentialBackoff>,
}

impl BlobscanClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
#[async_trait]

impl CommonBlobscanClient for BlobscanClient {
fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/", config.base_url))?;
let jwt_manager = JWTManager::new(JWTManagerConfig {
secret_key: config.secret_key,
Expand All @@ -50,7 +76,7 @@ impl BlobscanClient {
})
}

pub async fn index(
async fn index(
&self,
block: Block,
transactions: Vec<Transaction>,
Expand All @@ -67,7 +93,7 @@ impl BlobscanClient {
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
let url = self.base_url.join("indexer/reorged-slots")?;
let token = self.jwt_manager.get_token()?;
let req = ReorgedSlotsRequest {
Expand All @@ -78,15 +104,15 @@ impl BlobscanClient {
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
}

pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
let url = self.base_url.join("blockchain-sync-state")?;
let token = self.jwt_manager.get_token()?;
let req: BlockchainSyncStateRequest = sync_state.into();

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
let url = self.base_url.join("blockchain-sync-state")?;
json_get!(
&self.client,
Expand Down
2 changes: 1 addition & 1 deletion src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct BlockchainSyncStateResponse {
pub last_upper_synced_slot: Option<u32>,
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct BlockchainSyncState {
pub last_finalized_block: Option<u32>,
pub last_lower_synced_slot: Option<u32>,
Expand Down
Loading