Skip to content

Commit 432911e

Browse files
authored
fix: the right code (#3)
* fix: the right code * chore: fmt clippy
1 parent 1ee24f2 commit 432911e

File tree

2 files changed

+97
-56
lines changed

2 files changed

+97
-56
lines changed

src/tasks/block.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,13 @@ impl BlockBuilder {
107107

108108
let (sender, mut inbound) = mpsc::unbounded_channel();
109109

110+
let mut sleep = Box::pin(tokio::time::sleep(Duration::from_secs(
111+
self.incoming_transactions_buffer,
112+
)));
113+
110114
let handle = tokio::spawn(
111115
async move {
112116
loop {
113-
let sleep: tokio::time::Sleep = tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer));
114-
tokio::pin!(sleep);
115117

116118
select! {
117119
biased;
@@ -124,6 +126,10 @@ impl BlockBuilder {
124126
break
125127
}
126128
}
129+
130+
// Reset the sleep timer, as we want to do so when (and only when) our sleep future has elapsed,
131+
// irrespective of whether we have any blocks to build.
132+
sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.incoming_transactions_buffer));
127133
}
128134
item_res = inbound.recv() => {
129135
match item_res {

src/tasks/submit.rs

+89-54
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,37 @@ use crate::{
33
signer::LocalOrAws,
44
tasks::block::InProgressBlock,
55
};
6+
use alloy::consensus::{constants::GWEI_TO_WEI, SimpleCoder};
7+
use alloy::eips::BlockNumberOrTag;
68
use alloy::network::{TransactionBuilder, TransactionBuilder4844};
79
use alloy::providers::{Provider as _, WalletProvider};
810
use alloy::rpc::types::eth::TransactionRequest;
911
use alloy::signers::Signer;
1012
use alloy::sol_types::SolCall;
1113
use alloy::transports::TransportError;
12-
use alloy::{consensus::SimpleCoder, providers::SendableTx};
1314
use alloy_primitives::{FixedBytes, U256};
14-
use eyre::bail;
15+
use alloy_sol_types::SolError;
16+
use eyre::eyre;
1517
use oauth2::{
1618
basic::BasicClient, basic::BasicTokenType, reqwest::http_client, AuthUrl, ClientId,
1719
ClientSecret, EmptyExtraTokenFields, StandardTokenResponse, TokenResponse, TokenUrl,
1820
};
1921
use tokio::{sync::mpsc, task::JoinHandle};
20-
use tracing::{debug, error, instrument, trace, warn};
21-
use zenith_types::{SignRequest, SignResponse, Zenith};
22-
23-
macro_rules! spawn_provider_send {
24-
($provider:expr, $tx:expr) => {
25-
let p = $provider.clone();
26-
let t = $tx.clone();
27-
tokio::spawn(async move {
28-
if let Err(e) = p.send_tx_envelope(t).await {
29-
warn!(%e, "error in transaction broadcast");
30-
}
31-
});
32-
33-
};
34-
}
22+
use tracing::{debug, error, instrument, trace};
23+
use zenith_types::{
24+
SignRequest, SignResponse,
25+
Zenith::{self, IncorrectHostBlock},
26+
};
3527

3628
/// OAuth Audience Claim Name, required param by IdP for client credential grant
3729
const OAUTH_AUDIENCE_CLAIM: &str = "audience";
3830

31+
pub enum ControlFlow {
32+
Retry,
33+
Skip,
34+
Done,
35+
}
36+
3937
/// Submits sidecars in ethereum txns to mainnet ethereum
4038
pub struct SubmitTask {
4139
/// Ethereum Provider
@@ -102,10 +100,10 @@ impl SubmitTask {
102100
#[instrument(skip_all)]
103101
async fn construct_sig_request(&self, contents: &InProgressBlock) -> eyre::Result<SignRequest> {
104102
let ru_chain_id = U256::from(self.config.ru_chain_id);
105-
let block_height = self.host_block_height().await?;
103+
let next_block_height = self.next_host_block_height().await?;
106104

107105
Ok(SignRequest {
108-
host_block_number: U256::from(block_height),
106+
host_block_number: U256::from(next_block_height),
109107
host_chain_id: U256::from(self.config.host_chain_id),
110108
ru_chain_id,
111109
gas_limit: U256::from(self.config.rollup_block_gas_limit),
@@ -133,19 +131,23 @@ impl SubmitTask {
133131
let sidecar = in_progress.encode_blob::<SimpleCoder>().build()?;
134132
Ok(TransactionRequest::default()
135133
.with_blob_sidecar(sidecar)
136-
.with_input(data))
134+
.with_input(data)
135+
.with_max_priority_fee_per_gas((GWEI_TO_WEI * 16) as u128))
137136
}
138137

139-
async fn host_block_height(&self) -> eyre::Result<u64> {
138+
async fn next_host_block_height(&self) -> eyre::Result<u64> {
140139
let result = self.provider.get_block_number().await?;
141-
Ok(result)
140+
let next = result
141+
.checked_add(1)
142+
.ok_or_else(|| eyre!("next host block height overflow"))?;
143+
Ok(next)
142144
}
143145

144146
async fn submit_transaction(
145147
&self,
146148
resp: &SignResponse,
147149
in_progress: &InProgressBlock,
148-
) -> eyre::Result<()> {
150+
) -> eyre::Result<ControlFlow> {
149151
let v: u8 = resp.sig.v().y_parity_byte() + 27;
150152
let r: FixedBytes<32> = resp.sig.r().into();
151153
let s: FixedBytes<32> = resp.sig.s().into();
@@ -161,60 +163,62 @@ impl SubmitTask {
161163
let tx = self
162164
.build_blob_tx(header, v, r, s, in_progress)?
163165
.with_from(self.provider.default_signer_address())
164-
.with_to(self.config.zenith_address);
165-
166-
if let Err(TransportError::ErrorResp(e)) = self.provider.call(&tx).await {
166+
.with_to(self.config.zenith_address)
167+
.with_gas_limit(1_000_000);
168+
169+
if let Err(TransportError::ErrorResp(e)) = self
170+
.provider
171+
.call(&tx)
172+
.block(BlockNumberOrTag::Pending.into())
173+
.await
174+
{
167175
error!(
168176
code = e.code,
169177
message = %e.message,
170178
data = ?e.data,
171179
"error in transaction submission"
172180
);
173181

174-
bail!("bailing transaction submission")
175-
}
176-
177-
self.send_transaction(resp, tx).await?;
182+
if e.as_revert_data() == Some(IncorrectHostBlock::SELECTOR.into()) {
183+
return Ok(ControlFlow::Retry);
184+
}
178185

179-
Ok(())
180-
}
186+
return Ok(ControlFlow::Skip);
187+
}
181188

182-
async fn send_transaction(
183-
&self,
184-
resp: &SignResponse,
185-
tx: TransactionRequest,
186-
) -> Result<(), eyre::Error> {
187189
tracing::debug!(
188190
host_block_number = %resp.req.host_block_number,
189191
gas_limit = %resp.req.gas_limit,
190192
"sending transaction to network"
191193
);
192194

193-
let SendableTx::Envelope(tx) = self.provider.fill(tx).await? else {
194-
bail!("failed to fill transaction")
195+
let _ = match self.provider.send_transaction(tx).await {
196+
Ok(result) => result,
197+
Err(e) => {
198+
error!(error = %e, "error sending transaction");
199+
return Ok(ControlFlow::Skip);
200+
}
195201
};
196202

197-
// Send the tx via the primary provider
198-
spawn_provider_send!(&self.provider, &tx);
199-
200-
// Spawn send_tx futures for all additional broadcast providers
201-
for provider in self.config.connect_additional_broadcast().await? {
202-
spawn_provider_send!(&provider, &tx);
203-
}
204-
205203
tracing::info!(
206-
tx_hash = %tx.tx_hash(),
207204
ru_chain_id = %resp.req.ru_chain_id,
208205
gas_limit = %resp.req.gas_limit,
209206
"dispatched to network"
210207
);
211-
Ok(())
208+
209+
Ok(ControlFlow::Done)
212210
}
213211

214212
#[instrument(skip_all, err)]
215-
async fn handle_inbound(&self, in_progress: &InProgressBlock) -> eyre::Result<()> {
213+
async fn handle_inbound(&self, in_progress: &InProgressBlock) -> eyre::Result<ControlFlow> {
216214
tracing::info!(txns = in_progress.len(), "handling inbound block");
217-
let sig_request = self.construct_sig_request(in_progress).await?;
215+
let sig_request = match self.construct_sig_request(in_progress).await {
216+
Ok(sig_request) => sig_request,
217+
Err(e) => {
218+
tracing::error!(error = %e, "error constructing signature request");
219+
return Ok(ControlFlow::Skip);
220+
}
221+
};
218222

219223
tracing::debug!(
220224
host_block_number = %sig_request.host_block_number,
@@ -235,7 +239,13 @@ impl SubmitTask {
235239
sig,
236240
}
237241
} else {
238-
let resp: SignResponse = self.sup_quincey(&sig_request).await?;
242+
let resp: SignResponse = match self.sup_quincey(&sig_request).await {
243+
Ok(resp) => resp,
244+
Err(e) => {
245+
tracing::error!(error = %e, "error acquiring signature from quincey");
246+
return Ok(ControlFlow::Retry);
247+
}
248+
};
239249
tracing::debug!(
240250
sig = hex::encode(resp.sig.as_bytes()),
241251
"acquired signature from quincey"
@@ -252,8 +262,33 @@ impl SubmitTask {
252262
let handle = tokio::spawn(async move {
253263
loop {
254264
if let Some(in_progress) = inbound.recv().await {
255-
if let Err(e) = self.handle_inbound(&in_progress).await {
256-
error!(%e, "error in block submission. Dropping block.");
265+
let mut retries = 0;
266+
loop {
267+
match self.handle_inbound(&in_progress).await {
268+
Ok(ControlFlow::Retry) => {
269+
retries += 1;
270+
if retries > 3 {
271+
tracing::error!(
272+
"error handling inbound block: too many retries"
273+
);
274+
break;
275+
}
276+
tracing::error!("error handling inbound block: retrying");
277+
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
278+
}
279+
Ok(ControlFlow::Skip) => {
280+
tracing::info!("skipping block");
281+
break;
282+
}
283+
Ok(ControlFlow::Done) => {
284+
tracing::info!("block landed successfully");
285+
break;
286+
}
287+
Err(e) => {
288+
tracing::error!(error = %e, "error handling inbound block");
289+
break;
290+
}
291+
}
257292
}
258293
} else {
259294
tracing::debug!("upstream task gone");

0 commit comments

Comments
 (0)