Skip to content

feat(fortuna): Traced client for better observability #1651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "6.2.2"
version = "6.2.3"
edition = "2021"

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions apps/fortuna/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod eth_gas_oracle;
pub(crate) mod ethereum;
pub(crate) mod reader;
pub(crate) mod traced_client;
109 changes: 76 additions & 33 deletions apps/fortuna/src/chain/ethereum.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{
api::ChainId,
chain::{
eth_gas_oracle::EthProviderOracle,
reader::{
Expand All @@ -9,6 +10,10 @@ use {
EntropyReader,
RequestedWithCallbackEvent,
},
traced_client::{
RpcMetrics,
TracedClient,
},
},
config::EthereumConfig,
},
Expand All @@ -22,7 +27,6 @@ use {
abi::RawLog,
contract::{
abigen,
ContractError,
EthLogDecode,
},
core::types::Address,
Expand All @@ -34,6 +38,7 @@ use {
},
prelude::{
BlockId,
JsonRpcClient,
PendingTransaction,
TransactionRequest,
},
Expand Down Expand Up @@ -67,15 +72,19 @@ abigen!(
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json"
);

pub type SignablePythContract = PythRandom<
pub type SignablePythContractInner<T> = PythRandom<
LegacyTxMiddleware<
GasOracleMiddleware<
NonceManagerMiddleware<SignerMiddleware<Provider<Http>, LocalWallet>>,
EthProviderOracle<Provider<Http>>,
NonceManagerMiddleware<SignerMiddleware<Provider<T>, LocalWallet>>,
EthProviderOracle<Provider<T>>,
>,
>,
>;
pub type SignablePythContract = SignablePythContractInner<Http>;
pub type InstrumentedSignablePythContract = SignablePythContractInner<TracedClient>;

pub type PythContract = PythRandom<Provider<Http>>;
pub type InstrumentedPythContract = PythRandom<Provider<TracedClient>>;

/// Middleware that converts a transaction into a legacy transaction if use_legacy_tx is true.
/// We can not use TransformerMiddleware because keeper calls fill_transaction first which bypasses
Expand Down Expand Up @@ -157,32 +166,7 @@ impl<M: Middleware> Middleware for LegacyTxMiddleware<M> {
}
}

impl SignablePythContract {
pub async fn from_config(
chain_config: &EthereumConfig,
private_key: &str,
) -> Result<SignablePythContract> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
let chain_id = provider.get_chainid().await?;
let gas_oracle = EthProviderOracle::new(provider.clone());
let wallet__ = private_key
.parse::<LocalWallet>()?
.with_chain_id(chain_id.as_u64());

let address = wallet__.address();

Ok(PythRandom::new(
chain_config.contract_addr,
Arc::new(LegacyTxMiddleware::new(
chain_config.legacy_tx,
GasOracleMiddleware::new(
NonceManagerMiddleware::new(SignerMiddleware::new(provider, wallet__), address),
gas_oracle,
),
)),
))
}

impl<T: JsonRpcClient + 'static + Clone> SignablePythContractInner<T> {
/// Submit a request for a random number to the contract.
///
/// This method is a version of the autogenned `request` method that parses the emitted logs
Expand Down Expand Up @@ -249,10 +233,54 @@ impl SignablePythContract {
Err(anyhow!("Request failed").into())
}
}

pub async fn from_config_and_provider(
chain_config: &EthereumConfig,
private_key: &str,
provider: Provider<T>,
) -> Result<SignablePythContractInner<T>> {
let chain_id = provider.get_chainid().await?;
let gas_oracle = EthProviderOracle::new(provider.clone());
let wallet__ = private_key
.parse::<LocalWallet>()?
.with_chain_id(chain_id.as_u64());

let address = wallet__.address();

Ok(PythRandom::new(
chain_config.contract_addr,
Arc::new(LegacyTxMiddleware::new(
chain_config.legacy_tx,
GasOracleMiddleware::new(
NonceManagerMiddleware::new(SignerMiddleware::new(provider, wallet__), address),
gas_oracle,
),
)),
))
}
}

impl SignablePythContract {
pub async fn from_config(chain_config: &EthereumConfig, private_key: &str) -> Result<Self> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
Self::from_config_and_provider(chain_config, private_key, provider).await
}
}

impl InstrumentedSignablePythContract {
pub async fn from_config(
chain_config: &EthereumConfig,
private_key: &str,
chain_id: ChainId,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
Self::from_config_and_provider(chain_config, private_key, provider).await
}
}

impl PythContract {
pub fn from_config(chain_config: &EthereumConfig) -> Result<PythContract> {
pub fn from_config(chain_config: &EthereumConfig) -> Result<Self> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;

Ok(PythRandom::new(
Expand All @@ -262,8 +290,23 @@ impl PythContract {
}
}

impl InstrumentedPythContract {
pub fn from_config(
chain_config: &EthereumConfig,
chain_id: ChainId,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;

Ok(PythRandom::new(
chain_config.contract_addr,
Arc::new(provider),
))
}
}

#[async_trait]
impl EntropyReader for PythContract {
impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
async fn get_request(
&self,
provider_address: Address,
Expand Down Expand Up @@ -330,7 +373,7 @@ impl EntropyReader for PythContract {
user_random_number: [u8; 32],
provider_revelation: [u8; 32],
) -> Result<U256> {
let result: Result<U256, ContractError<Provider<Http>>> = self
let result = self
.reveal_with_callback(
provider,
sequence_number,
Expand Down
135 changes: 135 additions & 0 deletions apps/fortuna/src/chain/traced_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use {
crate::api::ChainId,
anyhow::Result,
axum::async_trait,
ethers::{
prelude::Http,
providers::{
HttpClientError,
JsonRpcClient,
Provider,
},
},
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
histogram::Histogram,
},
registry::Registry,
},
std::sync::Arc,
tokio::{
sync::RwLock,
time::Instant,
},
};

#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)]
pub struct RpcLabel {
chain_id: ChainId,
method: String,
}

#[derive(Debug)]
pub struct RpcMetrics {
count: Family<RpcLabel, Counter>,
latency: Family<RpcLabel, Histogram>,
errors_count: Family<RpcLabel, Counter>,
}

impl RpcMetrics {
pub async fn new(metrics_registry: Arc<RwLock<Registry>>) -> Self {
let count = Family::default();
let mut guard = metrics_registry.write().await;
let sub_registry = guard.sub_registry_with_prefix("rpc_requests");
sub_registry.register(
"count",
"The number of RPC requests made to the chain with the specified method.",
count.clone(),
);

let latency = Family::<RpcLabel, Histogram>::new_with_constructor(|| {
Histogram::new(
[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]
.into_iter(),
)
});
sub_registry.register(
"latency",
"The latency of RPC requests to the chain with the specified method.",
latency.clone(),
);

let errors_count = Family::default();
sub_registry.register(
"errors_count",
"The number of RPC requests made to the chain that failed.",
errors_count.clone(),
);

Self {
count,
latency,
errors_count,
}
}
}

#[derive(Debug, Clone)]
pub struct TracedClient {
inner: Http,

chain_id: ChainId,
metrics: Arc<RpcMetrics>,
}

#[async_trait]
impl JsonRpcClient for TracedClient {
type Error = HttpClientError;

async fn request<
T: serde::Serialize + Send + Sync + std::fmt::Debug,
R: serde::de::DeserializeOwned + Send,
>(
&self,
method: &str,
params: T,
) -> Result<R, HttpClientError> {
let start = Instant::now();
let label = &RpcLabel {
chain_id: self.chain_id.clone(),
method: method.to_string(),
};
self.metrics.count.get_or_create(label).inc();
let res = match self.inner.request(method, params).await {
Ok(result) => Ok(result),
Err(e) => {
self.metrics.errors_count.get_or_create(label).inc();
Err(e)
}
};

let latency = start.elapsed().as_secs_f64();
self.metrics.latency.get_or_create(label).observe(latency);
res
}
}

impl TracedClient {
pub fn new(
chain_id: ChainId,
url: &str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suggest passing the Url type here

metrics: Arc<RpcMetrics>,
) -> Result<Provider<TracedClient>> {
let url = url::Url::parse(url)?;
Ok(Provider::new(TracedClient {
inner: Http::new(url),
chain_id,
metrics,
}))
}
}
Loading
Loading