Skip to content

Commit d21a5f9

Browse files
committed
refactor: ease context sharing by using an inner arc pointer
1 parent b8a86ca commit d21a5f9

File tree

6 files changed

+68
-51
lines changed

6 files changed

+68
-51
lines changed

src/beacon_client/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ impl BeaconClient {
3838
Some(slot) => slot.to_string(),
3939
None => String::from("head"),
4040
};
41-
4241
let url = self.build_url(&format!("eth/v2/beacon/blocks/{slot}"));
4342

4443
let block_response = self.client.get(url).send().await?;

src/blobscan_client/types.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ pub struct BlobEntity {
3434
pub index: u32,
3535
}
3636

37-
#[derive(Serialize, Debug)]
37+
#[derive(Serialize, Deserialize, Debug)]
3838
#[serde(rename_all = "camelCase")]
3939
pub struct FailedSlotsChunkEntity {
4040
pub initial_slot: u32,
4141
pub final_slot: u32,
4242
}
43+
4344
#[derive(Serialize, Debug)]
4445
pub struct FailedSlotsChunksRequest {
4546
pub chunks: Vec<FailedSlotsChunkEntity>,

src/context.rs

+44-23
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use std::{sync::Arc, time::Duration};
22

33
use anyhow::Result;
44
use ethers::prelude::*;
@@ -11,32 +11,53 @@ use crate::{
1111
use super::env::{get_env_vars, Environment};
1212

1313
#[derive(Debug, Clone)]
14-
pub struct Context {
14+
struct ContextRef {
1515
pub beacon_client: BeaconClient,
1616
pub blobscan_client: BlobscanClient,
1717
pub provider: Provider<Http>,
1818
}
1919

20-
pub fn create_context() -> Result<Context> {
21-
let Environment {
22-
blobscan_api_endpoint,
23-
beacon_node_rpc,
24-
execution_node_rpc,
25-
secret_key,
26-
..
27-
} = get_env_vars();
28-
let request_timeout = Some(Duration::from_secs(8));
29-
30-
Ok(Context {
31-
blobscan_client: BlobscanClient::try_from(BlobscanClientConfig {
32-
base_url: blobscan_api_endpoint,
20+
#[derive(Debug, Clone)]
21+
pub struct Context {
22+
inner: Arc<ContextRef>,
23+
}
24+
25+
impl Context {
26+
pub fn try_new() -> Result<Self> {
27+
let Environment {
28+
blobscan_api_endpoint,
29+
beacon_node_rpc,
30+
execution_node_rpc,
3331
secret_key,
34-
timeout: request_timeout,
35-
})?,
36-
beacon_client: BeaconClient::try_from(BeaconClientConfig {
37-
base_url: beacon_node_rpc,
38-
timeout: request_timeout,
39-
})?,
40-
provider: Provider::<Http>::try_from(execution_node_rpc)?,
41-
})
32+
..
33+
} = get_env_vars();
34+
let request_timeout = Some(Duration::from_secs(8));
35+
36+
Ok(Self {
37+
inner: Arc::new(ContextRef {
38+
blobscan_client: BlobscanClient::try_from(BlobscanClientConfig {
39+
base_url: blobscan_api_endpoint,
40+
secret_key,
41+
timeout: request_timeout,
42+
})?,
43+
beacon_client: BeaconClient::try_from(BeaconClientConfig {
44+
base_url: beacon_node_rpc,
45+
timeout: request_timeout,
46+
})?,
47+
provider: Provider::<Http>::try_from(execution_node_rpc)?,
48+
}),
49+
})
50+
}
51+
52+
pub fn beacon_client(&self) -> &BeaconClient {
53+
&self.inner.beacon_client
54+
}
55+
56+
pub fn blobscan_client(&self) -> &BlobscanClient {
57+
&self.inner.blobscan_client
58+
}
59+
60+
pub fn provider(&self) -> &Provider<Http> {
61+
&self.inner.provider
62+
}
4263
}

src/main.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use anyhow::Result;
2+
use context::Context;
23
use slot_processor_manager::SlotProcessorManager;
34

4-
use crate::{
5-
context::create_context,
6-
utils::telemetry::{get_subscriber, init_subscriber},
7-
};
5+
use crate::utils::telemetry::{get_subscriber, init_subscriber};
86

97
use std::{thread, time::Duration};
108

@@ -22,23 +20,25 @@ async fn main() -> Result<()> {
2220
let subscriber = get_subscriber("blobscan_indexer".into(), "info".into(), std::io::stdout);
2321
init_subscriber(subscriber);
2422

25-
let context = create_context()?;
26-
let mut current_slot = match context.blobscan_client.get_slot().await? {
23+
let context = Context::try_new()?;
24+
let beacon_client = context.beacon_client();
25+
let blobscan_client = context.blobscan_client();
26+
let mut current_slot = match blobscan_client.get_slot().await? {
2727
Some(last_slot) => last_slot + 1,
2828
None => 0,
2929
};
3030
let slot_processor_manager = SlotProcessorManager::try_new(context.clone())?;
3131

3232
loop {
33-
if let Some(latest_beacon_block) = context.beacon_client.get_block(None).await? {
33+
if let Some(latest_beacon_block) = beacon_client.get_block(None).await? {
3434
let latest_slot: u32 = latest_beacon_block.slot.parse()?;
3535

3636
if current_slot < latest_slot {
3737
slot_processor_manager
3838
.process_slots(current_slot, latest_slot)
3939
.await?;
4040

41-
context.blobscan_client.update_slot(latest_slot - 1).await?;
41+
blobscan_client.update_slot(latest_slot - 1).await?;
4242

4343
current_slot = latest_slot;
4444
}

src/slot_processor_manager/mod.rs

+6-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use core::panic;
2-
use std::sync::Arc;
32

43
use futures::future::join_all;
54
use tokio::task::{JoinError, JoinHandle};
@@ -11,17 +10,16 @@ use crate::{blobscan_client::types::FailedSlotsChunkEntity, context::Context};
1110
mod slot_processor;
1211

1312
pub struct SlotProcessorManager {
14-
shared_context: Arc<Context>,
13+
context: Context,
1514
max_threads_length: u32,
1615
}
1716

1817
impl SlotProcessorManager {
1918
pub fn try_new(context: Context) -> Result<Self, anyhow::Error> {
2019
let max_threads_length = std::thread::available_parallelism()?.get() as u32;
21-
let shared_context = Arc::new(context);
2220

2321
Ok(Self {
24-
shared_context,
22+
context,
2523
max_threads_length,
2624
})
2725
}
@@ -48,13 +46,13 @@ impl SlotProcessorManager {
4846
slots_per_thread
4947
};
5048

51-
let thread_context = Arc::clone(&self.shared_context);
49+
let thread_context = self.context.clone();
5250
let thread_initial_slot = current_slot;
5351
let thread_final_slot = current_slot + thread_slots_chunk;
5452

5553
let thread = tokio::spawn(async move {
5654
let slot_span = tracing::trace_span!("slot_processor", slot = end_slot);
57-
let slot_processor = SlotProcessor::new(&thread_context);
55+
let slot_processor = SlotProcessor::new(thread_context);
5856

5957
slot_processor
6058
.process_slots(thread_initial_slot, thread_final_slot)
@@ -97,8 +95,8 @@ impl SlotProcessorManager {
9795
.collect::<Vec<FailedSlotsChunkEntity>>();
9896

9997
if !failed_slots_chunks.is_empty() {
100-
self.shared_context
101-
.blobscan_client
98+
self.context
99+
.blobscan_client()
102100
.add_failed_slots_chunks(failed_slots_chunks)
103101
.await?;
104102
}

src/slot_processor_manager/slot_processor/mod.rs

+8-10
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::time::{Duration, Instant};
22

33
use anyhow::{Context as AnyhowContext, Result};
44
use backoff::{future::retry_notify, Error as BackoffError};
5-
use ethers::prelude::*;
65

6+
use ethers::prelude::*;
77
use tracing::{error, info, warn};
88

99
use crate::{
@@ -18,12 +18,12 @@ use self::helpers::{create_tx_hash_versioned_hashes_mapping, create_versioned_ha
1818
pub mod errors;
1919
mod helpers;
2020

21-
pub struct SlotProcessor<'a> {
22-
context: &'a Context,
21+
pub struct SlotProcessor {
22+
context: Context,
2323
}
2424

25-
impl<'a> SlotProcessor<'a> {
26-
pub fn new(context: &'a Context) -> SlotProcessor {
25+
impl SlotProcessor {
26+
pub fn new(context: Context) -> SlotProcessor {
2727
Self { context }
2828
}
2929

@@ -71,11 +71,9 @@ impl<'a> SlotProcessor<'a> {
7171
&self,
7272
slot: u32,
7373
) -> Result<(), backoff::Error<SingleSlotProcessingError>> {
74-
let Context {
75-
beacon_client,
76-
blobscan_client,
77-
provider,
78-
} = self.context;
74+
let beacon_client = self.context.beacon_client();
75+
let blobscan_client = self.context.blobscan_client();
76+
let provider = self.context.provider();
7977

8078
let start = Instant::now();
8179

0 commit comments

Comments
 (0)