Skip to content

Add Parallel Indexing #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b258d2c
refactor(SlotProcessor): undo unnecessary mutability
PJColombo May 7, 2023
f884509
feat: add `SlotProcessorManager`
PJColombo May 9, 2023
7d78535
refactor: separate backoff config
PJColombo May 10, 2023
3dfa7c5
feat(SlotProcessorManager): store failed slots chunks for deferred re…
PJColombo May 12, 2023
be38d02
feat(blobscan_client): allow `to` transaction field to be optional
PJColombo May 13, 2023
7e07d73
refactor(slot_processor_manager): separate slot update logic
PJColombo May 13, 2023
b8a86ca
chore: solve clippy errors
PJColombo May 13, 2023
d21a5f9
refactor: ease context sharing by using an inner arc pointer
PJColombo May 13, 2023
0e65b45
refactor: use same reqwest client for both blobscan and beacon clients
PJColombo May 13, 2023
475fe41
refactor: decouple env vars logic from context instantiation
PJColombo May 13, 2023
bc85e47
feat: add slot retryer
PJColombo May 14, 2023
9768539
chore: add tracing spans
PJColombo May 15, 2023
cc9f030
chore: remove unused env variable
PJColombo May 15, 2023
40d9cbd
refactor: add small tweaks to clients' types
PJColombo May 15, 2023
008ac6e
fix: rollback slot retryer + improve
PJColombo May 17, 2023
cd741b0
fix: add the following:
PJColombo May 17, 2023
2f0be84
chore: update beacon api default url
PJColombo May 17, 2023
2151fa1
chore: allow to parameterize number of slots processing threads
PJColombo May 17, 2023
0aa7e29
feat: add slot save interval for error resilience
PJColombo May 17, 2023
87f24c1
fix(jwt_manager): increase refresh interval
PJColombo May 18, 2023
b0cf5c3
fix: rollback beacon api env var default value
PJColombo May 18, 2023
6013b40
fix: wrap update slot request in a retryer + log any error
PJColombo May 18, 2023
a284f2a
fix(tracing): demote spans to `trace` level + add `slot` field to slo…
PJColombo May 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ SECRET_KEY=
BLOBSCAN_API_ENDPOINT=
EXECUTION_NODE_URL=
BEACON_NODE_RPC=
NUM_PROCESSING_THREADS=
21 changes: 7 additions & 14 deletions src/beacon_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use reqwest::{Client, StatusCode};
use std::time::Duration;

use self::types::{
BeaconClientError, BeaconClientResult, BlobData, BlobsResponse, BlockMessage as Block,
BeaconClientError, BeaconClientResult, Blob, BlobsResponse, BlockMessage as Block,
BlockResponse,
};

Expand All @@ -11,7 +11,7 @@ pub mod types;
#[derive(Debug, Clone)]
pub struct BeaconClient {
base_url: String,
client: reqwest::Client,
client: Client,
}

pub struct Config {
Expand All @@ -20,25 +20,18 @@ pub struct Config {
}

impl BeaconClient {
pub fn try_from(config: Config) -> BeaconClientResult<Self> {
let mut client_builder = Client::builder();

if let Some(timeout) = config.timeout {
client_builder = client_builder.timeout(timeout);
}

Ok(Self {
pub fn with_client(client: Client, config: Config) -> Self {
Self {
base_url: config.base_url,
client: client_builder.build()?,
})
client,
}
}

pub async fn get_block(&self, slot: Option<u32>) -> BeaconClientResult<Option<Block>> {
let slot = match slot {
Some(slot) => slot.to_string(),
None => String::from("head"),
};

let url = self.build_url(&format!("eth/v2/beacon/blocks/{slot}"));

let block_response = self.client.get(url).send().await?;
Expand All @@ -54,7 +47,7 @@ impl BeaconClient {
}
}

pub async fn get_blobs(&self, slot: u32) -> BeaconClientResult<Option<Vec<BlobData>>> {
pub async fn get_blobs(&self, slot: u32) -> BeaconClientResult<Option<Vec<Blob>>> {
let url = self.build_url(&format!("eth/v1/beacon/blobs/{slot}"));

let blobs_response = self.client.get(url).send().await?;
Expand Down
20 changes: 10 additions & 10 deletions src/beacon_client/types.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
use anyhow::Result;
use ethers::types::{Bytes, H256};
use serde::{Deserialize, Serialize};
use serde::Deserialize;

#[derive(Serialize, Deserialize, Debug)]
#[derive(Deserialize, Debug)]
pub struct ExecutionPayload {
pub block_hash: H256,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Deserialize, Debug)]
pub struct BlockBody {
pub execution_payload: Option<ExecutionPayload>,
pub blob_kzg_commitments: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Deserialize, Debug)]
pub struct BlockMessage {
pub slot: String,
pub body: BlockBody,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Deserialize, Debug)]
pub struct Block {
pub message: BlockMessage,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Deserialize, Debug)]
pub struct BlockResponse {
pub data: Block,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct BlobData {
#[derive(Deserialize, Debug)]
pub struct Blob {
pub index: String,
pub kzg_commitment: String,
pub blob: Bytes,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Deserialize, Debug)]
pub struct BlobsResponse {
pub data: Vec<BlobData>,
pub data: Vec<Blob>,
}

#[derive(Debug, thiserror::Error)]
Expand Down
2 changes: 1 addition & 1 deletion src/blobscan_client/jwt_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl JWTManager {
refresh_interval: config.refresh_interval,
safety_margin: match config.safety_magin {
Some(safety_margin) => safety_margin,
None => Duration::seconds(10),
None => Duration::minutes(1),
},
}
}
Expand Down
54 changes: 23 additions & 31 deletions src/blobscan_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use reqwest::{Client, StatusCode};
use self::{
jwt_manager::{Config as JWTManagerConfig, JWTManager},
types::{
BlobEntity, BlobscanClientError, BlobscanClientResult, BlockEntity, IndexRequest,
SlotRequest, SlotResponse, TransactionEntity,
Blob, BlobscanClientError, BlobscanClientResult, Block, IndexRequest, SlotRequest,
SlotResponse, Transaction,
},
};

mod jwt_manager;

pub mod types;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BlobscanClient {
base_url: String,
client: reqwest::Client,
Expand All @@ -27,30 +27,28 @@ pub struct Config {
pub timeout: Option<Duration>,
}

impl BlobscanClient {
pub fn try_from(config: Config) -> BlobscanClientResult<Self> {
let mut client_builder = Client::builder();

if let Some(timeout) = config.timeout {
client_builder = client_builder.timeout(timeout);
}
pub fn build_jwt_manager(secret_key: String) -> JWTManager {
JWTManager::new(JWTManagerConfig {
secret_key,
refresh_interval: chrono::Duration::hours(1),
safety_magin: None,
})
}

Ok(Self {
impl BlobscanClient {
pub fn with_client(client: Client, config: Config) -> Self {
Self {
base_url: config.base_url,
client: client_builder.build()?,
jwt_manager: JWTManager::new(JWTManagerConfig {
secret_key: config.secret_key,
refresh_interval: chrono::Duration::minutes(30),
safety_magin: None,
}),
})
client,
jwt_manager: build_jwt_manager(config.secret_key),
}
}

pub async fn index(
&self,
block: BlockEntity,
transactions: Vec<TransactionEntity>,
blobs: Vec<BlobEntity>,
block: Block,
transactions: Vec<Transaction>,
blobs: Vec<Blob>,
) -> BlobscanClientResult<()> {
let path = String::from("index");
let url = self.build_url(&path);
Expand All @@ -63,17 +61,15 @@ impl BlobscanClient {

let index_response = self
.client
.post(url)
.put(url)
.bearer_auth(token)
.json(&index_request)
.send()
.await?;

match index_response.status() {
StatusCode::OK => Ok(()),
_ => Err(BlobscanClientError::BlobscanClientError(
index_response.text().await?,
)),
_ => Err(BlobscanClientError::ApiError(index_response.text().await?)),
}
}

Expand All @@ -92,9 +88,7 @@ impl BlobscanClient {

match slot_response.status() {
StatusCode::OK => Ok(()),
_ => Err(BlobscanClientError::BlobscanClientError(
slot_response.text().await?,
)),
_ => Err(BlobscanClientError::ApiError(slot_response.text().await?)),
}
}

Expand All @@ -107,9 +101,7 @@ impl BlobscanClient {
match slot_response.status() {
StatusCode::OK => Ok(Some(slot_response.json::<SlotResponse>().await?.slot)),
StatusCode::NOT_FOUND => Ok(None),
_ => Err(BlobscanClientError::BlobscanClientError(
slot_response.text().await?,
)),
_ => Err(BlobscanClientError::ApiError(slot_response.text().await?)),
}
}

Expand Down
72 changes: 43 additions & 29 deletions src/blobscan_client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,86 @@ use ethers::types::{
};
use serde::{Deserialize, Serialize};

use crate::{beacon_client::types::BlobData, utils::web3::calculate_versioned_hash};
use crate::{beacon_client::types::Blob as BeaconBlob, utils::web3::calculate_versioned_hash};

#[derive(Serialize, Deserialize, Debug)]
pub struct BlockEntity {
pub struct Block {
pub number: U64,
pub hash: H256,
pub timestamp: U256,
pub slot: u32,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct TransactionEntity {
#[serde(rename_all = "camelCase")]
pub struct Transaction {
pub hash: H256,
pub from: Address,
pub to: Address,
#[serde(rename = "blockNumber")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub to: Option<Address>,
pub block_number: U64,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct BlobEntity {
#[serde(rename = "versionedHash")]
#[serde(rename_all = "camelCase")]
pub struct Blob {
pub versioned_hash: H256,
pub commitment: String,
pub data: Bytes,
#[serde(rename = "txHash")]
pub tx_hash: H256,
pub index: u32,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SlotResponse {
pub slot: u32,
#[serde(rename_all = "camelCase")]
pub struct FailedSlotsChunk {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<u32>,
pub initial_slot: u32,
pub final_slot: u32,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Debug)]
pub struct SlotRequest {
pub slot: u32,
}
#[derive(Deserialize, Debug)]
pub struct SlotResponse {
pub slot: u32,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Debug)]
pub struct IndexRequest {
pub block: BlockEntity,
pub transactions: Vec<TransactionEntity>,
pub blobs: Vec<BlobEntity>,
pub block: Block,
pub transactions: Vec<Transaction>,
pub blobs: Vec<Blob>,
}

#[derive(Debug, thiserror::Error)]
pub enum BlobscanClientError {
#[error(transparent)]
Reqwest(#[from] reqwest::Error),

#[error("Blobscan client error: {0}")]
BlobscanClientError(String),
#[error("API usage error: {0}")]
ApiError(String),

#[error(transparent)]
JWTError(#[from] anyhow::Error),
Other(#[from] anyhow::Error),
}

pub type BlobscanClientResult<T> = Result<T, BlobscanClientError>;

impl<'a> TryFrom<(&'a EthersBlock<EthersTransaction>, u32)> for BlockEntity {
impl From<(u32, u32)> for FailedSlotsChunk {
fn from((initial_slot, final_slot): (u32, u32)) -> Self {
Self {
id: None,
initial_slot,
final_slot,
}
}
}

impl<'a> TryFrom<(&'a EthersBlock<EthersTransaction>, u32)> for Block {
type Error = anyhow::Error;

fn try_from(
Expand All @@ -86,9 +104,7 @@ impl<'a> TryFrom<(&'a EthersBlock<EthersTransaction>, u32)> for BlockEntity {
}
}

impl<'a> TryFrom<(&'a EthersTransaction, &'a EthersBlock<EthersTransaction>)>
for TransactionEntity
{
impl<'a> TryFrom<(&'a EthersTransaction, &'a EthersBlock<EthersTransaction>)> for Transaction {
type Error = anyhow::Error;

fn try_from(
Expand All @@ -102,18 +118,16 @@ impl<'a> TryFrom<(&'a EthersTransaction, &'a EthersBlock<EthersTransaction>)>
.with_context(|| "Missing block number field in execution block".to_string())?,
hash,
from: ethers_tx.from,
to: ethers_tx
.to
.with_context(|| format!("Missing to field in transaction {hash}"))?,
to: ethers_tx.to,
})
}
}

impl<'a> TryFrom<(&'a BlobData, u32, H256)> for BlobEntity {
impl<'a> TryFrom<(&'a BeaconBlob, u32, H256)> for Blob {
type Error = anyhow::Error;

fn try_from(
(blob_data, index, tx_hash): (&'a BlobData, u32, H256),
(blob_data, index, tx_hash): (&'a BeaconBlob, u32, H256),
) -> Result<Self, Self::Error> {
Ok(Self {
tx_hash,
Expand All @@ -125,9 +139,9 @@ impl<'a> TryFrom<(&'a BlobData, u32, H256)> for BlobEntity {
}
}

impl<'a> From<(&'a BlobData, &'a H256, usize, &'a H256)> for BlobEntity {
impl<'a> From<(&'a BeaconBlob, &'a H256, usize, &'a H256)> for Blob {
fn from(
(blob_data, versioned_hash, index, tx_hash): (&'a BlobData, &'a H256, usize, &'a H256),
(blob_data, versioned_hash, index, tx_hash): (&'a BeaconBlob, &'a H256, usize, &'a H256),
) -> Self {
Self {
tx_hash: *tx_hash,
Expand Down
Loading