Skip to content

Commit d72630c

Browse files
authored
Merge pull request #53 from Blobscan/support-dual-indexing
feat: add reorgs support + indexer restructuring
2 parents ed88527 + 192e08d commit d72630c

25 files changed

+1217
-613
lines changed

.vscode/settings.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
{
2-
"editor.formatOnSave": true
2+
"editor.formatOnSave": true,
3+
"rust-analyzer.linkedProjects": ["./Cargo.toml"],
4+
"rust-analyzer.showUnlinkedFileNotification": false
35
}

Cargo.lock

+45
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5-4
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,15 @@ ethers = "1.0.2"
1313
futures = "0.3.25"
1414
hex = "0.4.3"
1515
reqwest = { version = "0.11.13", features = ["json"] }
16+
reqwest-eventsource = "0.5.0"
1617
url = { version = "2.3.1", features = ["serde"] }
1718
serde = { version = "1.0.150", features = ["derive"] }
1819
tokio = { version = "1.23.0", features = ["full"] }
1920
jsonwebtoken = "8.3.0"
21+
backoff = { version = "0.4.0", features = ["tokio"] }
22+
chrono = "0.4.24"
23+
serde_json = "1.0.96"
24+
clap = { version = "4.3.0", features = ["derive"] }
2025

2126

2227
# logging
@@ -29,9 +34,5 @@ tracing-log = "0.1.1"
2934
# error handling
3035
anyhow = { version = "1.0.70", features = ["backtrace"] }
3136
thiserror = "1.0.40"
32-
backoff = { version = "0.4.0", features = ["tokio"] }
33-
chrono = "0.4.24"
34-
serde_json = "1.0.96"
35-
clap = { version = "4.3.0", features = ["derive"] }
3637
sentry = { version = "0.31.2", features = ["debug-images"] }
3738
sentry-tracing = "0.31.2"

src/args.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use clap::Parser;
22

3+
use crate::clients::beacon::types::BlockId;
4+
35
/// Blobscan's indexer for the EIP-4844 upgrade.
46
#[derive(Parser, Debug)]
57
#[command(author, version, about, long_about = None)]
68
pub struct Args {
79
/// Slot to start indexing from
810
#[arg(short, long)]
9-
pub from_slot: Option<u32>,
11+
pub from_slot: Option<BlockId>,
1012

1113
/// Number of threads used for parallel indexing
1214
#[arg(short, long)]
1315
pub num_threads: Option<u32>,
1416

1517
/// Amount of slots to be processed before saving latest slot in the database
16-
#[arg(short, long, default_value_t = 1000)]
17-
pub slots_per_save: u32,
18+
#[arg(short, long)]
19+
pub slots_per_save: Option<u32>,
1820
}

src/clients/beacon/mod.rs

+50-16
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,87 @@
11
use anyhow::Context as AnyhowContext;
2+
use backoff::ExponentialBackoff;
23
use reqwest::{Client, Url};
3-
use std::time::Duration;
4+
use reqwest_eventsource::EventSource;
45

5-
use crate::{clients::common::ClientResult, json_get};
6+
use crate::{
7+
clients::{beacon::types::BlockHeaderResponse, common::ClientResult},
8+
json_get,
9+
};
610

7-
use self::types::{Blob, BlobsResponse, BlockMessage as Block, BlockResponse};
11+
use self::types::{Blob, BlobsResponse, Block, BlockHeader, BlockId, BlockResponse, Topic};
812

913
pub mod types;
1014

1115
#[derive(Debug, Clone)]
1216
pub struct BeaconClient {
1317
base_url: Url,
1418
client: Client,
19+
exp_backoff: Option<ExponentialBackoff>,
1520
}
1621

1722
pub struct Config {
1823
pub base_url: String,
19-
pub timeout: Option<Duration>,
24+
pub exp_backoff: Option<ExponentialBackoff>,
2025
}
2126

2227
impl BeaconClient {
2328
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
2429
let base_url = Url::parse(&format!("{}/eth/", config.base_url))
2530
.with_context(|| "Failed to parse base URL")?;
31+
let exp_backoff = config.exp_backoff;
2632

27-
Ok(Self { base_url, client })
33+
Ok(Self {
34+
base_url,
35+
client,
36+
exp_backoff,
37+
})
38+
}
39+
40+
pub async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
41+
let path = format!("v2/beacon/blocks/{block_id}");
42+
let url = self.base_url.join(path.as_str())?;
43+
44+
json_get!(&self.client, url, BlockResponse, self.exp_backoff.clone()).map(|res| match res {
45+
Some(r) => Some(r.data),
46+
None => None,
47+
})
2848
}
2949

30-
pub async fn get_block(&self, slot: Option<u32>) -> ClientResult<Option<Block>> {
31-
let slot = match slot {
32-
Some(slot) => slot.to_string(),
33-
None => String::from("head"),
34-
};
35-
let path = format!("v2/beacon/blocks/{slot}");
50+
pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
51+
let path = format!("v1/beacon/headers/{block_id}");
3652
let url = self.base_url.join(path.as_str())?;
3753

38-
json_get!(&self.client, url, BlockResponse).map(|res| match res {
39-
Some(r) => Some(r.data.message),
54+
json_get!(
55+
&self.client,
56+
url,
57+
BlockHeaderResponse,
58+
self.exp_backoff.clone()
59+
)
60+
.map(|res| match res {
61+
Some(r) => Some(r.data),
4062
None => None,
4163
})
4264
}
4365

44-
pub async fn get_blobs(&self, slot: u32) -> ClientResult<Option<Vec<Blob>>> {
45-
let path = format!("v1/beacon/blob_sidecars/{slot}");
66+
pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
67+
let path = format!("v1/beacon/blob_sidecars/{block_id}");
4668
let url = self.base_url.join(path.as_str())?;
4769

48-
json_get!(&self.client, url, BlobsResponse).map(|res| match res {
70+
json_get!(&self.client, url, BlobsResponse, self.exp_backoff.clone()).map(|res| match res {
4971
Some(r) => Some(r.data),
5072
None => None,
5173
})
5274
}
75+
76+
pub fn subscribe_to_events(&self, topics: Vec<Topic>) -> ClientResult<EventSource> {
77+
let topics = topics
78+
.iter()
79+
.map(|topic| topic.into())
80+
.collect::<Vec<String>>()
81+
.join("&");
82+
let path = format!("v1/events?topics={topics}");
83+
let url = self.base_url.join(&path)?;
84+
85+
Ok(EventSource::get(url))
86+
}
5387
}

src/clients/beacon/types.rs

+104-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,22 @@
1+
use std::{fmt, str::FromStr};
2+
13
use ethers::types::{Bytes, H256};
2-
use serde::Deserialize;
4+
use serde::{Deserialize, Serialize};
5+
6+
use crate::slots_processor::BlockData;
7+
8+
#[derive(Serialize, Debug, Clone)]
9+
pub enum BlockId {
10+
Head,
11+
Finalized,
12+
Slot(u32),
13+
}
14+
15+
#[derive(Serialize, Debug)]
16+
pub enum Topic {
17+
Head,
18+
FinalizedCheckpoint,
19+
}
320

421
#[derive(Deserialize, Debug)]
522
pub struct ExecutionPayload {
@@ -13,8 +30,10 @@ pub struct BlockBody {
1330
}
1431
#[derive(Deserialize, Debug)]
1532
pub struct BlockMessage {
16-
pub slot: String,
33+
#[serde(deserialize_with = "deserialize_slot")]
34+
pub slot: u32,
1735
pub body: BlockBody,
36+
pub parent_root: H256,
1837
}
1938

2039
#[derive(Deserialize, Debug)]
@@ -38,3 +57,86 @@ pub struct Blob {
3857
pub struct BlobsResponse {
3958
pub data: Vec<Blob>,
4059
}
60+
61+
#[derive(Deserialize, Debug)]
62+
pub struct BlockHeaderResponse {
63+
pub data: BlockHeader,
64+
}
65+
66+
#[derive(Deserialize, Debug)]
67+
pub struct BlockHeader {
68+
pub root: H256,
69+
pub header: InnerBlockHeader,
70+
}
71+
#[derive(Deserialize, Debug)]
72+
pub struct InnerBlockHeader {
73+
pub message: BlockHeaderMessage,
74+
}
75+
76+
#[derive(Deserialize, Debug)]
77+
pub struct BlockHeaderMessage {
78+
pub parent_root: H256,
79+
#[serde(deserialize_with = "deserialize_slot")]
80+
pub slot: u32,
81+
}
82+
83+
#[derive(Deserialize, Debug)]
84+
pub struct HeadBlockEventData {
85+
#[serde(deserialize_with = "deserialize_slot")]
86+
pub slot: u32,
87+
pub block: H256,
88+
}
89+
90+
fn deserialize_slot<'de, D>(deserializer: D) -> Result<u32, D::Error>
91+
where
92+
D: serde::Deserializer<'de>,
93+
{
94+
let slot = String::deserialize(deserializer)?;
95+
96+
slot.parse::<u32>().map_err(serde::de::Error::custom)
97+
}
98+
99+
impl fmt::Display for BlockId {
100+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101+
match self {
102+
BlockId::Head => write!(f, "head"),
103+
BlockId::Finalized => write!(f, "finalized"),
104+
BlockId::Slot(slot) => write!(f, "{}", slot),
105+
}
106+
}
107+
}
108+
109+
impl FromStr for BlockId {
110+
type Err = String;
111+
112+
fn from_str(s: &str) -> Result<Self, Self::Err> {
113+
match s {
114+
"head" => Ok(BlockId::Head),
115+
"finalized" => Ok(BlockId::Finalized),
116+
_ => match s.parse::<u32>() {
117+
Ok(num) => Ok(BlockId::Slot(num)),
118+
Err(_) => {
119+
Err("Invalid block ID. Expected 'head', 'finalized' or a number.".to_string())
120+
}
121+
},
122+
}
123+
}
124+
}
125+
126+
impl From<&Topic> for String {
127+
fn from(value: &Topic) -> Self {
128+
match value {
129+
Topic::Head => String::from("head"),
130+
Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"),
131+
}
132+
}
133+
}
134+
135+
impl From<HeadBlockEventData> for BlockData {
136+
fn from(event_data: HeadBlockEventData) -> Self {
137+
Self {
138+
root: event_data.block,
139+
slot: event_data.slot,
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)