Skip to content

Error handling improvements #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 138 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ log = "0.4.17"
tracing = "0.1.19"
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
tracing-bunyan-formatter = "0.3"
tracing-log = "0.1.1"
tracing-log = "0.1.1"

# error handling
anyhow = { version = "1.0.70", features = ["backtrace"] }
thiserror = "1.0.40"
backoff = { version = "0.4.0", features = ["tokio"] }
36 changes: 16 additions & 20 deletions src/beacon_chain/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use reqwest::{Client, StatusCode};
use std::time::Duration;

use crate::types::StdError;

use self::types::{BlobsSidecar, BlobsSidecarResponse, BlockMessage as Block, BlockResponse};
use self::types::{
BeaconAPIError, BeaconAPIResult, BlobsSidecar, BlobsSidecarResponse, BlockMessage as Block,
BlockResponse,
};

mod types;

Expand All @@ -18,7 +19,7 @@ pub struct Options {
}

impl BeaconChainAPI {
pub fn try_from(base_url: String, options: Option<Options>) -> Result<Self, StdError> {
pub fn try_from(base_url: String, options: Option<Options>) -> BeaconAPIResult<Self> {
let mut client_builder = Client::builder();

if let Some(options) = options {
Expand All @@ -33,12 +34,13 @@ impl BeaconChainAPI {
})
}

pub async fn get_block(&self, slot: Option<u32>) -> Result<Option<Block>, StdError> {
pub async fn get_block(&self, slot: Option<u32>) -> BeaconAPIResult<Option<Block>> {
let slot = match slot {
Some(slot) => slot.to_string(),
None => String::from("head"),
};
let url = self.build_url(&format!("/eth/v2/beacon/blocks/{}", slot));

let url = self.build_url(&format!("eth/v2/beacon/blocks/{slot}"));

let block_response = self.client.get(url).send().await?;

Expand All @@ -47,17 +49,14 @@ impl BeaconChainAPI {
block_response.json::<BlockResponse>().await?.data.message,
)),
StatusCode::NOT_FOUND => Ok(None),
_ => Err(format!(
"Couldn't fetch beacon block at slot {}: {}",
slot,
block_response.text().await?
)
.into()),
_ => Err(BeaconAPIError::JsonRpcClientError(
block_response.text().await?,
)),
}
}

pub async fn get_blobs_sidecar(&self, slot: u32) -> Result<Option<BlobsSidecar>, StdError> {
let url = self.build_url(&format!("/eth/v1/beacon/blobs_sidecars/{}", slot));
pub async fn get_blobs_sidecar(&self, slot: u32) -> BeaconAPIResult<Option<BlobsSidecar>> {
let url = self.build_url(&format!("eth/v1/beacon/blobs_sidecars/{slot}"));

let sidecar_response = self.client.get(url).send().await?;

Expand All @@ -66,12 +65,9 @@ impl BeaconChainAPI {
sidecar_response.json::<BlobsSidecarResponse>().await?.data,
)),
StatusCode::NOT_FOUND => Ok(None),
_ => Err(format!(
"Couldn't fetch blobs sidecar at slot {}: {}",
slot,
sidecar_response.text().await?
)
.into()),
_ => Err(BeaconAPIError::JsonRpcClientError(
sidecar_response.text().await?,
)),
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/beacon_chain/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Result;
use ethers::types::{Bytes, H256};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -36,3 +37,14 @@ pub struct BlobsSidecar {
pub struct BlobsSidecarResponse {
pub data: BlobsSidecar,
}

#[derive(Debug, thiserror::Error)]
pub enum BeaconAPIError {
#[error(transparent)]
Reqwest(#[from] reqwest::Error),

#[error("JSON-RPC beacon client error: {0}")]
JsonRpcClientError(String),
}

pub type BeaconAPIResult<T> = Result<T, BeaconAPIError>;
28 changes: 10 additions & 18 deletions src/db/blob_db_manager.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,38 @@
use anyhow::Result;
use async_trait::async_trait;

use crate::types::{Blob, BlockData, IndexerMetadata, StdError, TransactionData};
use crate::types::{Blob, BlockData, IndexerMetadata, TransactionData};

#[async_trait]
pub trait DBManager {
type Options;

async fn new(connection_uri: &str, db_name: &str) -> Result<Self, StdError>
async fn new(connection_uri: &str, db_name: &str) -> Result<Self>
where
Self: Sized;

async fn commit_transaction(&self, options: Option<&mut Self::Options>)
-> Result<(), StdError>;
async fn commit_transaction(&self, options: Option<&mut Self::Options>) -> Result<()>;

async fn insert_block(
&self,
block: &BlockData,
options: Option<&mut Self::Options>,
) -> Result<(), StdError>;
) -> Result<()>;

async fn insert_blob(
&self,
blob: &Blob,
options: Option<&mut Self::Options>,
) -> Result<(), StdError>;
async fn insert_blob(&self, blob: &Blob, options: Option<&mut Self::Options>) -> Result<()>;

async fn insert_tx(
&self,
tx: &TransactionData,
options: Option<&mut Self::Options>,
) -> Result<(), StdError>;
) -> Result<()>;

async fn start_transaction(&self, options: Option<&mut Self::Options>) -> Result<(), StdError>;
async fn start_transaction(&self, options: Option<&mut Self::Options>) -> Result<()>;

async fn update_last_slot(
&self,
slot: u32,
options: Option<&mut Self::Options>,
) -> Result<(), StdError>;
async fn update_last_slot(&self, slot: u32, options: Option<&mut Self::Options>) -> Result<()>;

async fn read_metadata(
&self,
options: Option<&mut Self::Options>,
) -> Result<Option<IndexerMetadata>, StdError>;
) -> Result<Option<IndexerMetadata>>;
}
36 changes: 12 additions & 24 deletions src/db/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use mongodb::{
bson::doc,
Expand All @@ -6,7 +7,7 @@ use mongodb::{
Client, ClientSession, Database,
};

use crate::types::{Blob, BlockData, IndexerMetadata, StdError, TransactionData};
use crate::types::{Blob, BlockData, IndexerMetadata, TransactionData};

use self::types::{BlobDocument, BlockDocument, IndexerMetadataDocument, TransactionDocument};

Expand All @@ -30,7 +31,7 @@ const INDEXER_METADATA_ID: &str = "indexer_metadata";
impl DBManager for MongoDBManager {
type Options = MongoDBManagerOptions;

async fn new(connection_uri: &str, db_name: &str) -> Result<Self, StdError>
async fn new(connection_uri: &str, db_name: &str) -> Result<Self>
where
Self: Sized,
{
Expand All @@ -44,14 +45,8 @@ impl DBManager for MongoDBManager {
Ok(MongoDBManager { client, db })
}

async fn commit_transaction(
&self,
options: Option<&mut Self::Options>,
) -> Result<(), StdError> {
let session = match options {
Some(options) => &mut options.session,
None => return Err("No session provided".into()),
};
async fn commit_transaction(&self, options: Option<&mut Self::Options>) -> Result<()> {
let session = &mut options.context("No session provided")?.session;

// An "UnknownTransactionCommitResult" label indicates that it is unknown whether the
// commit has satisfied the write concern associated with the transaction. If an error
Expand All @@ -76,7 +71,7 @@ impl DBManager for MongoDBManager {
&self,
block_data: &BlockData,
options: Option<&mut Self::Options>,
) -> Result<(), StdError> {
) -> Result<()> {
let block_document = BlockDocument::try_from(block_data)?;
let blocks_collection = self.db.collection::<BlockDocument>("blocks");

Expand All @@ -94,11 +89,7 @@ impl DBManager for MongoDBManager {
Ok(())
}

async fn insert_blob(
&self,
blob: &Blob,
options: Option<&mut Self::Options>,
) -> Result<(), StdError> {
async fn insert_blob(&self, blob: &Blob, options: Option<&mut Self::Options>) -> Result<()> {
let blob_document = BlobDocument::try_from(blob)?;
let blobs_collection = self.db.collection::<BlobDocument>("blobs");

Expand All @@ -120,7 +111,7 @@ impl DBManager for MongoDBManager {
&self,
tx: &TransactionData,
options: Option<&mut Self::Options>,
) -> Result<(), StdError> {
) -> Result<()> {
let tx_document = TransactionDocument::try_from(tx)?;
let txs_collection = self.db.collection::<TransactionDocument>("txs");

Expand All @@ -138,11 +129,8 @@ impl DBManager for MongoDBManager {
Ok(())
}

async fn start_transaction(&self, options: Option<&mut Self::Options>) -> Result<(), StdError> {
let session = match options {
Some(options) => &mut options.session,
None => return Err("No session provided".into()),
};
async fn start_transaction(&self, options: Option<&mut Self::Options>) -> Result<()> {
let session = &mut options.context("No session provided")?.session;

session.start_transaction(None).await?;

Expand All @@ -153,7 +141,7 @@ impl DBManager for MongoDBManager {
&self,
_slot: u32,
options: Option<&mut Self::Options>,
) -> Result<(), StdError> {
) -> Result<()> {
let indexer_metadata_collection = self
.db
.collection::<IndexerMetadataDocument>("indexer_metadata");
Expand Down Expand Up @@ -182,7 +170,7 @@ impl DBManager for MongoDBManager {
async fn read_metadata(
&self,
_options: Option<&mut Self::Options>,
) -> Result<Option<IndexerMetadata>, StdError> {
) -> Result<Option<IndexerMetadata>> {
let query = doc! { "_id": INDEXER_METADATA_ID};
let indexer_metadata_collection = self
.db
Expand Down
39 changes: 14 additions & 25 deletions src/db/mongodb/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use anyhow::{Context, Error, Result};
use ethers::types::{Address, Bytes, H256, U256};
use serde::{Deserialize, Serialize};

use crate::{
db::utils::{build_blob_id, build_block_id, build_tx_id},
types::{Blob, BlockData, IndexerMetadata, StdError, TransactionData},
types::{Blob, BlockData, IndexerMetadata, TransactionData},
};

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -37,19 +38,13 @@ pub struct BlobDocument {
}

impl TryFrom<&BlockData<'_>> for BlockDocument {
type Error = StdError;
type Error = Error;

fn try_from(block_data: &BlockData) -> Result<Self, Self::Error> {
let block = block_data.block;

let hash = match block.hash {
Some(hash) => hash,
None => return Err("Block hash not found".into()),
};
let number = match block.number {
Some(number) => number.as_u64(),
None => return Err("Block number not found".into()),
};
let hash = block.hash.context("Block hash not found")?;
let number = block.number.context("Block number not found")?.as_u64();

Ok(Self {
_id: build_block_id(&hash),
Expand All @@ -63,22 +58,16 @@ impl TryFrom<&BlockData<'_>> for BlockDocument {
}

impl TryFrom<&TransactionData<'_>> for TransactionDocument {
type Error = StdError;
type Error = Error;

fn try_from(tx_data: &TransactionData) -> Result<Self, Self::Error> {
let tx = tx_data.tx;
let to = match tx.to {
Some(to) => to,
None => return Err("Transaction recipient not found".into()),
};
let block_hash = match tx.block_hash {
Some(block_hash) => block_hash,
None => return Err("Transaction block hash not found".into()),
};
let block_number = match tx.block_number {
Some(block_number) => block_number.as_u64(),
None => return Err("Transaction block number not found".into()),
};
let to = tx.to.context("Transaction recipient not found")?;
let block_hash = tx.block_hash.context("Transaction block hash not found")?;
let block_number = tx
.block_number
.context("Transaction block number not found")?
.as_u64();

Ok(Self {
_id: build_tx_id(&tx.hash),
Expand All @@ -94,7 +83,7 @@ impl TryFrom<&TransactionData<'_>> for TransactionDocument {
}

impl TryFrom<&Blob<'_>> for BlobDocument {
type Error = StdError;
type Error = Error;

fn try_from(blob: &Blob) -> Result<Self, Self::Error> {
Ok(Self {
Expand All @@ -116,7 +105,7 @@ pub struct IndexerMetadataDocument {
}

impl TryFrom<IndexerMetadataDocument> for IndexerMetadata {
type Error = StdError;
type Error = Error;

fn try_from(doc: IndexerMetadataDocument) -> Result<Self, Self::Error> {
Ok(Self {
Expand Down
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::Result;
use tracing::Instrument;

use crate::{
db::blob_db_manager::DBManager,
slot_processor::SlotProcessor,
types::StdError,
utils::{
context::create_context,
telemetry::{get_subscriber, init_subscriber},
Expand All @@ -18,14 +18,14 @@ mod types;
mod utils;

#[tokio::main]
async fn main() -> Result<(), StdError> {
async fn main() -> Result<()> {
dotenv::dotenv().ok();

let subscriber = get_subscriber("blobscan_indexer".into(), "info".into(), std::io::stdout);
init_subscriber(subscriber);

let context = create_context().await?;
let mut slot_processor = SlotProcessor::try_init(&context).await?;
let mut slot_processor = SlotProcessor::try_init(&context, None).await?;

let mut current_slot = match context.db_manager.read_metadata(None).await? {
Some(metadata) => metadata.last_slot + 1,
Expand All @@ -42,7 +42,7 @@ async fn main() -> Result<(), StdError> {
slot_processor
.process_slots(current_slot, latest_slot)
.instrument(slot_span)
.await;
.await?;

current_slot = latest_slot;
}
Expand Down
Loading