6
6
keeper:: keeper_metrics:: KeeperMetrics ,
7
7
keeper:: process_event:: process_event_with_backoff,
8
8
} ,
9
- anyhow:: { anyhow, Result } ,
10
- ethers:: {
11
- providers:: { Middleware , Provider , Ws } ,
12
- types:: U256 ,
13
- } ,
14
- futures:: StreamExt ,
9
+ anyhow:: Result ,
10
+ ethers:: types:: U256 ,
15
11
std:: { collections:: HashSet , sync:: Arc } ,
16
12
tokio:: {
17
13
spawn,
@@ -176,15 +172,13 @@ pub async fn watch_blocks_wrapper(
176
172
chain_state : BlockchainState ,
177
173
latest_safe_block : BlockNumber ,
178
174
tx : mpsc:: Sender < BlockRange > ,
179
- geth_rpc_wss : Option < String > ,
180
175
) {
181
176
let mut last_safe_block_processed = latest_safe_block;
182
177
loop {
183
178
if let Err ( e) = watch_blocks (
184
179
chain_state. clone ( ) ,
185
180
& mut last_safe_block_processed,
186
181
tx. clone ( ) ,
187
- geth_rpc_wss. clone ( ) ,
188
182
)
189
183
. in_current_span ( )
190
184
. await
@@ -203,47 +197,11 @@ pub async fn watch_blocks(
203
197
chain_state : BlockchainState ,
204
198
last_safe_block_processed : & mut BlockNumber ,
205
199
tx : mpsc:: Sender < BlockRange > ,
206
- geth_rpc_wss : Option < String > ,
207
200
) -> Result < ( ) > {
208
201
tracing:: info!( "Watching blocks to handle new events" ) ;
209
202
210
- let provider_option = match geth_rpc_wss {
211
- Some ( wss) => Some ( match Provider :: < Ws > :: connect ( wss. clone ( ) ) . await {
212
- Ok ( provider) => provider,
213
- Err ( e) => {
214
- tracing:: error!( "Error while connecting to wss: {}. error: {:?}" , wss, e) ;
215
- return Err ( e. into ( ) ) ;
216
- }
217
- } ) ,
218
- None => {
219
- tracing:: info!( "No wss provided" ) ;
220
- None
221
- }
222
- } ;
223
-
224
- let mut stream_option = match provider_option {
225
- Some ( ref provider) => Some ( match provider. subscribe_blocks ( ) . await {
226
- Ok ( client) => client,
227
- Err ( e) => {
228
- tracing:: error!( "Error while subscribing to blocks. error {:?}" , e) ;
229
- return Err ( e. into ( ) ) ;
230
- }
231
- } ) ,
232
- None => None ,
233
- } ;
234
-
235
203
loop {
236
- match stream_option {
237
- Some ( ref mut stream) => {
238
- if stream. next ( ) . await . is_none ( ) {
239
- tracing:: error!( "Error blocks subscription stream ended" ) ;
240
- return Err ( anyhow ! ( "Error blocks subscription stream ended" ) ) ;
241
- }
242
- }
243
- None => {
244
- time:: sleep ( POLL_INTERVAL ) . await ;
245
- }
246
- }
204
+ time:: sleep ( POLL_INTERVAL ) . await ;
247
205
248
206
let latest_safe_block = get_latest_safe_block ( & chain_state) . in_current_span ( ) . await ;
249
207
if latest_safe_block > * last_safe_block_processed {
0 commit comments