Skip to content

Commit d08fc1c

Browse files
authored
Merge pull request #30 from Blobscan/feature/parallel-indexing
Add Parallel Indexing
2 parents f2e8835 + a284f2a commit d08fc1c

File tree

17 files changed

+649
-408
lines changed

17 files changed

+649
-408
lines changed

.env.example

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ SECRET_KEY=supersecret
44
BLOBSCAN_API_ENDPOINT=
55
EXECUTION_NODE_URL=
66
BEACON_NODE_RPC=
7+
NUM_PROCESSING_THREADS=

src/beacon_client/mod.rs

+7-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use reqwest::{Client, StatusCode};
22
use std::time::Duration;
33

44
use self::types::{
5-
BeaconClientError, BeaconClientResult, BlobData, BlobsResponse, BlockMessage as Block,
5+
BeaconClientError, BeaconClientResult, Blob, BlobsResponse, BlockMessage as Block,
66
BlockResponse,
77
};
88

@@ -11,7 +11,7 @@ pub mod types;
1111
#[derive(Debug, Clone)]
1212
pub struct BeaconClient {
1313
base_url: String,
14-
client: reqwest::Client,
14+
client: Client,
1515
}
1616

1717
pub struct Config {
@@ -20,25 +20,18 @@ pub struct Config {
2020
}
2121

2222
impl BeaconClient {
23-
pub fn try_from(config: Config) -> BeaconClientResult<Self> {
24-
let mut client_builder = Client::builder();
25-
26-
if let Some(timeout) = config.timeout {
27-
client_builder = client_builder.timeout(timeout);
28-
}
29-
30-
Ok(Self {
23+
pub fn with_client(client: Client, config: Config) -> Self {
24+
Self {
3125
base_url: config.base_url,
32-
client: client_builder.build()?,
33-
})
26+
client,
27+
}
3428
}
3529

3630
pub async fn get_block(&self, slot: Option<u32>) -> BeaconClientResult<Option<Block>> {
3731
let slot = match slot {
3832
Some(slot) => slot.to_string(),
3933
None => String::from("head"),
4034
};
41-
4235
let url = self.build_url(&format!("eth/v2/beacon/blocks/{slot}"));
4336

4437
let block_response = self.client.get(url).send().await?;
@@ -54,7 +47,7 @@ impl BeaconClient {
5447
}
5548
}
5649

57-
pub async fn get_blobs(&self, slot: u32) -> BeaconClientResult<Option<Vec<BlobData>>> {
50+
pub async fn get_blobs(&self, slot: u32) -> BeaconClientResult<Option<Vec<Blob>>> {
5851
let url = self.build_url(&format!("eth/v1/beacon/blobs/{slot}"));
5952

6053
let blobs_response = self.client.get(url).send().await?;

src/beacon_client/types.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,43 @@
11
use anyhow::Result;
22
use ethers::types::{Bytes, H256};
3-
use serde::{Deserialize, Serialize};
3+
use serde::Deserialize;
44

5-
#[derive(Serialize, Deserialize, Debug)]
5+
#[derive(Deserialize, Debug)]
66
pub struct ExecutionPayload {
77
pub block_hash: H256,
88
}
99

10-
#[derive(Serialize, Deserialize, Debug)]
10+
#[derive(Deserialize, Debug)]
1111
pub struct BlockBody {
1212
pub execution_payload: Option<ExecutionPayload>,
1313
pub blob_kzg_commitments: Option<Vec<String>>,
1414
}
15-
#[derive(Serialize, Deserialize, Debug)]
15+
#[derive(Deserialize, Debug)]
1616
pub struct BlockMessage {
1717
pub slot: String,
1818
pub body: BlockBody,
1919
}
2020

21-
#[derive(Serialize, Deserialize, Debug)]
21+
#[derive(Deserialize, Debug)]
2222
pub struct Block {
2323
pub message: BlockMessage,
2424
}
2525

26-
#[derive(Serialize, Deserialize, Debug)]
26+
#[derive(Deserialize, Debug)]
2727
pub struct BlockResponse {
2828
pub data: Block,
2929
}
3030

31-
#[derive(Serialize, Deserialize, Debug)]
32-
pub struct BlobData {
31+
#[derive(Deserialize, Debug)]
32+
pub struct Blob {
3333
pub index: String,
3434
pub kzg_commitment: String,
3535
pub blob: Bytes,
3636
}
3737

38-
#[derive(Serialize, Deserialize, Debug)]
38+
#[derive(Deserialize, Debug)]
3939
pub struct BlobsResponse {
40-
pub data: Vec<BlobData>,
40+
pub data: Vec<Blob>,
4141
}
4242

4343
#[derive(Debug, thiserror::Error)]

src/blobscan_client/jwt_manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl JWTManager {
3535
refresh_interval: config.refresh_interval,
3636
safety_margin: match config.safety_magin {
3737
Some(safety_margin) => safety_margin,
38-
None => Duration::seconds(10),
38+
None => Duration::minutes(1),
3939
},
4040
}
4141
}

src/blobscan_client/mod.rs

+23-31
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ use reqwest::{Client, StatusCode};
55
use self::{
66
jwt_manager::{Config as JWTManagerConfig, JWTManager},
77
types::{
8-
BlobEntity, BlobscanClientError, BlobscanClientResult, BlockEntity, IndexRequest,
9-
SlotRequest, SlotResponse, TransactionEntity,
8+
Blob, BlobscanClientError, BlobscanClientResult, Block, IndexRequest, SlotRequest,
9+
SlotResponse, Transaction,
1010
},
1111
};
1212

1313
mod jwt_manager;
1414

1515
pub mod types;
1616

17-
#[derive(Debug)]
17+
#[derive(Debug, Clone)]
1818
pub struct BlobscanClient {
1919
base_url: String,
2020
client: reqwest::Client,
@@ -27,30 +27,28 @@ pub struct Config {
2727
pub timeout: Option<Duration>,
2828
}
2929

30-
impl BlobscanClient {
31-
pub fn try_from(config: Config) -> BlobscanClientResult<Self> {
32-
let mut client_builder = Client::builder();
33-
34-
if let Some(timeout) = config.timeout {
35-
client_builder = client_builder.timeout(timeout);
36-
}
30+
pub fn build_jwt_manager(secret_key: String) -> JWTManager {
31+
JWTManager::new(JWTManagerConfig {
32+
secret_key,
33+
refresh_interval: chrono::Duration::hours(1),
34+
safety_magin: None,
35+
})
36+
}
3737

38-
Ok(Self {
38+
impl BlobscanClient {
39+
pub fn with_client(client: Client, config: Config) -> Self {
40+
Self {
3941
base_url: config.base_url,
40-
client: client_builder.build()?,
41-
jwt_manager: JWTManager::new(JWTManagerConfig {
42-
secret_key: config.secret_key,
43-
refresh_interval: chrono::Duration::minutes(30),
44-
safety_magin: None,
45-
}),
46-
})
42+
client,
43+
jwt_manager: build_jwt_manager(config.secret_key),
44+
}
4745
}
4846

4947
pub async fn index(
5048
&self,
51-
block: BlockEntity,
52-
transactions: Vec<TransactionEntity>,
53-
blobs: Vec<BlobEntity>,
49+
block: Block,
50+
transactions: Vec<Transaction>,
51+
blobs: Vec<Blob>,
5452
) -> BlobscanClientResult<()> {
5553
let path = String::from("index");
5654
let url = self.build_url(&path);
@@ -63,17 +61,15 @@ impl BlobscanClient {
6361

6462
let index_response = self
6563
.client
66-
.post(url)
64+
.put(url)
6765
.bearer_auth(token)
6866
.json(&index_request)
6967
.send()
7068
.await?;
7169

7270
match index_response.status() {
7371
StatusCode::OK => Ok(()),
74-
_ => Err(BlobscanClientError::BlobscanClientError(
75-
index_response.text().await?,
76-
)),
72+
_ => Err(BlobscanClientError::ApiError(index_response.text().await?)),
7773
}
7874
}
7975

@@ -92,9 +88,7 @@ impl BlobscanClient {
9288

9389
match slot_response.status() {
9490
StatusCode::OK => Ok(()),
95-
_ => Err(BlobscanClientError::BlobscanClientError(
96-
slot_response.text().await?,
97-
)),
91+
_ => Err(BlobscanClientError::ApiError(slot_response.text().await?)),
9892
}
9993
}
10094

@@ -107,9 +101,7 @@ impl BlobscanClient {
107101
match slot_response.status() {
108102
StatusCode::OK => Ok(Some(slot_response.json::<SlotResponse>().await?.slot)),
109103
StatusCode::NOT_FOUND => Ok(None),
110-
_ => Err(BlobscanClientError::BlobscanClientError(
111-
slot_response.text().await?,
112-
)),
104+
_ => Err(BlobscanClientError::ApiError(slot_response.text().await?)),
113105
}
114106
}
115107

src/blobscan_client/types.rs

+43-29
Original file line numberDiff line numberDiff line change
@@ -4,68 +4,86 @@ use ethers::types::{
44
};
55
use serde::{Deserialize, Serialize};
66

7-
use crate::{beacon_client::types::BlobData, utils::web3::calculate_versioned_hash};
7+
use crate::{beacon_client::types::Blob as BeaconBlob, utils::web3::calculate_versioned_hash};
88

99
#[derive(Serialize, Deserialize, Debug)]
10-
pub struct BlockEntity {
10+
pub struct Block {
1111
pub number: U64,
1212
pub hash: H256,
1313
pub timestamp: U256,
1414
pub slot: u32,
1515
}
1616

1717
#[derive(Serialize, Deserialize, Debug)]
18-
pub struct TransactionEntity {
18+
#[serde(rename_all = "camelCase")]
19+
pub struct Transaction {
1920
pub hash: H256,
2021
pub from: Address,
21-
pub to: Address,
22-
#[serde(rename = "blockNumber")]
22+
#[serde(default, skip_serializing_if = "Option::is_none")]
23+
pub to: Option<Address>,
2324
pub block_number: U64,
2425
}
2526

2627
#[derive(Serialize, Deserialize, Debug)]
27-
pub struct BlobEntity {
28-
#[serde(rename = "versionedHash")]
28+
#[serde(rename_all = "camelCase")]
29+
pub struct Blob {
2930
pub versioned_hash: H256,
3031
pub commitment: String,
3132
pub data: Bytes,
32-
#[serde(rename = "txHash")]
3333
pub tx_hash: H256,
3434
pub index: u32,
3535
}
3636

3737
#[derive(Serialize, Deserialize, Debug)]
38-
pub struct SlotResponse {
39-
pub slot: u32,
38+
#[serde(rename_all = "camelCase")]
39+
pub struct FailedSlotsChunk {
40+
#[serde(default, skip_serializing_if = "Option::is_none")]
41+
pub id: Option<u32>,
42+
pub initial_slot: u32,
43+
pub final_slot: u32,
4044
}
4145

42-
#[derive(Serialize, Deserialize, Debug)]
46+
#[derive(Serialize, Debug)]
4347
pub struct SlotRequest {
4448
pub slot: u32,
4549
}
50+
#[derive(Deserialize, Debug)]
51+
pub struct SlotResponse {
52+
pub slot: u32,
53+
}
4654

47-
#[derive(Serialize, Deserialize, Debug)]
55+
#[derive(Serialize, Debug)]
4856
pub struct IndexRequest {
49-
pub block: BlockEntity,
50-
pub transactions: Vec<TransactionEntity>,
51-
pub blobs: Vec<BlobEntity>,
57+
pub block: Block,
58+
pub transactions: Vec<Transaction>,
59+
pub blobs: Vec<Blob>,
5260
}
5361

5462
#[derive(Debug, thiserror::Error)]
5563
pub enum BlobscanClientError {
5664
#[error(transparent)]
5765
Reqwest(#[from] reqwest::Error),
5866

59-
#[error("Blobscan client error: {0}")]
60-
BlobscanClientError(String),
67+
#[error("API usage error: {0}")]
68+
ApiError(String),
6169

6270
#[error(transparent)]
63-
JWTError(#[from] anyhow::Error),
71+
Other(#[from] anyhow::Error),
6472
}
6573

6674
pub type BlobscanClientResult<T> = Result<T, BlobscanClientError>;
6775

68-
impl<'a> TryFrom<(&'a EthersBlock<EthersTransaction>, u32)> for BlockEntity {
76+
impl From<(u32, u32)> for FailedSlotsChunk {
77+
fn from((initial_slot, final_slot): (u32, u32)) -> Self {
78+
Self {
79+
id: None,
80+
initial_slot,
81+
final_slot,
82+
}
83+
}
84+
}
85+
86+
impl<'a> TryFrom<(&'a EthersBlock<EthersTransaction>, u32)> for Block {
6987
type Error = anyhow::Error;
7088

7189
fn try_from(
@@ -86,9 +104,7 @@ impl<'a> TryFrom<(&'a EthersBlock<EthersTransaction>, u32)> for BlockEntity {
86104
}
87105
}
88106

89-
impl<'a> TryFrom<(&'a EthersTransaction, &'a EthersBlock<EthersTransaction>)>
90-
for TransactionEntity
91-
{
107+
impl<'a> TryFrom<(&'a EthersTransaction, &'a EthersBlock<EthersTransaction>)> for Transaction {
92108
type Error = anyhow::Error;
93109

94110
fn try_from(
@@ -102,18 +118,16 @@ impl<'a> TryFrom<(&'a EthersTransaction, &'a EthersBlock<EthersTransaction>)>
102118
.with_context(|| "Missing block number field in execution block".to_string())?,
103119
hash,
104120
from: ethers_tx.from,
105-
to: ethers_tx
106-
.to
107-
.with_context(|| format!("Missing to field in transaction {hash}"))?,
121+
to: ethers_tx.to,
108122
})
109123
}
110124
}
111125

112-
impl<'a> TryFrom<(&'a BlobData, u32, H256)> for BlobEntity {
126+
impl<'a> TryFrom<(&'a BeaconBlob, u32, H256)> for Blob {
113127
type Error = anyhow::Error;
114128

115129
fn try_from(
116-
(blob_data, index, tx_hash): (&'a BlobData, u32, H256),
130+
(blob_data, index, tx_hash): (&'a BeaconBlob, u32, H256),
117131
) -> Result<Self, Self::Error> {
118132
Ok(Self {
119133
tx_hash,
@@ -125,9 +139,9 @@ impl<'a> TryFrom<(&'a BlobData, u32, H256)> for BlobEntity {
125139
}
126140
}
127141

128-
impl<'a> From<(&'a BlobData, &'a H256, usize, &'a H256)> for BlobEntity {
142+
impl<'a> From<(&'a BeaconBlob, &'a H256, usize, &'a H256)> for Blob {
129143
fn from(
130-
(blob_data, versioned_hash, index, tx_hash): (&'a BlobData, &'a H256, usize, &'a H256),
144+
(blob_data, versioned_hash, index, tx_hash): (&'a BeaconBlob, &'a H256, usize, &'a H256),
131145
) -> Self {
132146
Self {
133147
tx_hash: *tx_hash,

0 commit comments

Comments
 (0)