@@ -49,7 +49,7 @@ use super::debouncing::{
49
49
} ;
50
50
use super :: ingester:: PERSIST_REQUEST_TIMEOUT ;
51
51
use super :: metrics:: IngestResultMetrics ;
52
- use super :: routing_table:: RoutingTable ;
52
+ use super :: routing_table:: { NextOpenShardError , RoutingTable } ;
53
53
use super :: workbench:: IngestWorkbench ;
54
54
use super :: { pending_subrequests, IngesterPool } ;
55
55
use crate :: { get_ingest_router_buffer_size, LeaderId } ;
@@ -287,6 +287,7 @@ impl IngestRouter {
287
287
}
288
288
for persist_failure in persist_response. failures {
289
289
workbench. record_persist_failure ( & persist_failure) ;
290
+
290
291
match persist_failure. reason ( ) {
291
292
PersistFailureReason :: ShardClosed => {
292
293
let shard_id = persist_failure. shard_id ( ) . clone ( ) ;
@@ -314,7 +315,7 @@ impl IngestRouter {
314
315
// That way we will avoid to retry the persist request on the very
315
316
// same node.
316
317
let shard_id = persist_failure. shard_id ( ) . clone ( ) ;
317
- workbench. rate_limited_shard . insert ( shard_id) ;
318
+ workbench. rate_limited_shards . insert ( shard_id) ;
318
319
}
319
320
_ => { }
320
321
}
@@ -363,39 +364,44 @@ impl IngestRouter {
363
364
self . populate_routing_table_debounced ( workbench, debounced_request)
364
365
. await ;
365
366
366
- // List of subrequest IDs for which no shards are available to route the subrequests to.
367
- let mut no_shards_available_subrequest_ids = Vec :: new ( ) ;
367
+ // Subrequests for which no shards are available to route the subrequests to.
368
+ let mut no_shards_available_subrequest_ids: Vec < SubrequestId > = Vec :: new ( ) ;
369
+ // Subrequests for which the shards are rate limited.
370
+ let mut rate_limited_subrequest_ids: Vec < SubrequestId > = Vec :: new ( ) ;
368
371
369
372
let mut per_leader_persist_subrequests: HashMap < & LeaderId , Vec < PersistSubrequest > > =
370
373
HashMap :: new ( ) ;
371
374
375
+ let rate_limited_shards: & HashSet < ShardId > = & workbench. rate_limited_shards ;
372
376
let state_guard = self . state . lock ( ) . await ;
373
377
374
- // TODO: Here would be the most optimal place to split the body of the HTTP request into
375
- // lines, validate, transform and then pack the docs into compressed batches routed
376
- // to the right shards.
377
-
378
- let rate_limited_shards: & HashSet < ShardId > = & workbench. rate_limited_shard ;
379
378
for subrequest in pending_subrequests ( & workbench. subworkbenches ) {
380
- let Some ( shard ) = state_guard
379
+ let next_open_shard_res_opt = state_guard
381
380
. routing_table
382
381
. find_entry ( & subrequest. index_id , & subrequest. source_id )
383
- . and_then ( |entry| {
382
+ . map ( |entry| {
384
383
entry. next_open_shard_round_robin ( & self . ingester_pool , rate_limited_shards)
385
- } )
386
- else {
387
- no_shards_available_subrequest_ids. push ( subrequest. subrequest_id ) ;
388
- continue ;
384
+ } ) ;
385
+ let next_open_shard = match next_open_shard_res_opt {
386
+ Some ( Ok ( next_open_shard) ) => next_open_shard,
387
+ Some ( Err ( NextOpenShardError :: RateLimited ) ) => {
388
+ rate_limited_subrequest_ids. push ( subrequest. subrequest_id ) ;
389
+ continue ;
390
+ }
391
+ Some ( Err ( NextOpenShardError :: NoShardsAvailable ) ) | None => {
392
+ no_shards_available_subrequest_ids. push ( subrequest. subrequest_id ) ;
393
+ continue ;
394
+ }
389
395
} ;
390
396
let persist_subrequest = PersistSubrequest {
391
397
subrequest_id : subrequest. subrequest_id ,
392
- index_uid : shard . index_uid . clone ( ) . into ( ) ,
393
- source_id : shard . source_id . clone ( ) ,
394
- shard_id : Some ( shard . shard_id . clone ( ) ) ,
398
+ index_uid : next_open_shard . index_uid . clone ( ) . into ( ) ,
399
+ source_id : next_open_shard . source_id . clone ( ) ,
400
+ shard_id : Some ( next_open_shard . shard_id . clone ( ) ) ,
395
401
doc_batch : subrequest. doc_batch . clone ( ) ,
396
402
} ;
397
403
per_leader_persist_subrequests
398
- . entry ( & shard . leader_id )
404
+ . entry ( & next_open_shard . leader_id )
399
405
. or_default ( )
400
406
. push ( persist_subrequest) ;
401
407
}
@@ -421,6 +427,7 @@ impl IngestRouter {
421
427
commit_type : commit_type as i32 ,
422
428
} ;
423
429
workbench. record_persist_request ( & persist_request) ;
430
+
424
431
let persist_future = async move {
425
432
let persist_result = tokio:: time:: timeout (
426
433
PERSIST_REQUEST_TIMEOUT ,
@@ -443,6 +450,9 @@ impl IngestRouter {
443
450
for subrequest_id in no_shards_available_subrequest_ids {
444
451
workbench. record_no_shards_available ( subrequest_id) ;
445
452
}
453
+ for subrequest_id in rate_limited_subrequest_ids {
454
+ workbench. record_rate_limited ( subrequest_id) ;
455
+ }
446
456
self . process_persist_results ( workbench, persist_futures)
447
457
. await ;
448
458
}
@@ -610,7 +620,6 @@ impl IngestRouterService for IngestRouter {
610
620
. retry_batch_persist ( ingest_request, MAX_PERSIST_ATTEMPTS )
611
621
. await )
612
622
} ;
613
-
614
623
update_ingest_metrics ( & ingest_res, num_subrequests) ;
615
624
616
625
ingest_res
@@ -1916,7 +1925,7 @@ mod tests {
1916
1925
}
1917
1926
1918
1927
#[ tokio:: test]
1919
- async fn test_do_not_retry_rate_limited_shards ( ) {
1928
+ async fn test_router_does_not_retry_rate_limited_shards ( ) {
1920
1929
// We avoid retrying a shard limited shard at the scale of a workbench.
1921
1930
let self_node_id = "test-router" . into ( ) ;
1922
1931
let control_plane = ControlPlaneServiceClient :: from_mock ( MockControlPlaneService :: new ( ) ) ;
@@ -2075,4 +2084,86 @@ mod tests {
2075
2084
} ;
2076
2085
router. ingest ( ingest_request) . await . unwrap ( ) ;
2077
2086
}
2087
+
2088
+ #[ tokio:: test]
2089
+ async fn test_router_returns_rate_limited_failure ( ) {
2090
+ let self_node_id = "test-router" . into ( ) ;
2091
+ let control_plane = ControlPlaneServiceClient :: from_mock ( MockControlPlaneService :: new ( ) ) ;
2092
+ let ingester_pool = IngesterPool :: default ( ) ;
2093
+ let replication_factor = 1 ;
2094
+ let router = IngestRouter :: new (
2095
+ self_node_id,
2096
+ control_plane,
2097
+ ingester_pool. clone ( ) ,
2098
+ replication_factor,
2099
+ EventBroker :: default ( ) ,
2100
+ ) ;
2101
+ let mut state_guard = router. state . lock ( ) . await ;
2102
+ let index_uid: IndexUid = IndexUid :: for_test ( "test-index-0" , 0 ) ;
2103
+
2104
+ state_guard. routing_table . replace_shards (
2105
+ index_uid. clone ( ) ,
2106
+ "test-source" ,
2107
+ vec ! [ Shard {
2108
+ index_uid: Some ( index_uid. clone( ) ) ,
2109
+ source_id: "test-source" . to_string( ) ,
2110
+ shard_id: Some ( ShardId :: from( 1 ) ) ,
2111
+ shard_state: ShardState :: Open as i32 ,
2112
+ leader_id: "test-ingester-0" . to_string( ) ,
2113
+ ..Default :: default ( )
2114
+ } ] ,
2115
+ ) ;
2116
+ drop ( state_guard) ;
2117
+
2118
+ let mut mock_ingester_0 = MockIngesterService :: new ( ) ;
2119
+ mock_ingester_0
2120
+ . expect_persist ( )
2121
+ . times ( 1 )
2122
+ . returning ( move |request| {
2123
+ assert_eq ! ( request. leader_id, "test-ingester-0" ) ;
2124
+ assert_eq ! ( request. commit_type( ) , CommitTypeV2 :: Auto ) ;
2125
+ assert_eq ! ( request. subrequests. len( ) , 1 ) ;
2126
+ let subrequest = & request. subrequests [ 0 ] ;
2127
+ assert_eq ! ( subrequest. subrequest_id, 0 ) ;
2128
+ let index_uid = subrequest. index_uid ( ) . clone ( ) ;
2129
+ assert_eq ! ( subrequest. source_id, "test-source" ) ;
2130
+ assert_eq ! ( subrequest. shard_id( ) , ShardId :: from( 1 ) ) ;
2131
+ assert_eq ! (
2132
+ subrequest. doc_batch,
2133
+ Some ( DocBatchV2 :: for_test( [ "test-doc-foo" ] ) )
2134
+ ) ;
2135
+
2136
+ let response = PersistResponse {
2137
+ leader_id : request. leader_id ,
2138
+ successes : Vec :: new ( ) ,
2139
+ failures : vec ! [ PersistFailure {
2140
+ subrequest_id: 0 ,
2141
+ index_uid: Some ( index_uid) ,
2142
+ source_id: "test-source" . to_string( ) ,
2143
+ shard_id: Some ( ShardId :: from( 1 ) ) ,
2144
+ reason: PersistFailureReason :: ShardRateLimited as i32 ,
2145
+ } ] ,
2146
+ } ;
2147
+ Ok ( response)
2148
+ } ) ;
2149
+ let ingester_0 = IngesterServiceClient :: from_mock ( mock_ingester_0) ;
2150
+ ingester_pool. insert ( "test-ingester-0" . into ( ) , ingester_0. clone ( ) ) ;
2151
+
2152
+ let ingest_request = IngestRequestV2 {
2153
+ subrequests : vec ! [ IngestSubrequest {
2154
+ subrequest_id: 0 ,
2155
+ index_id: "test-index-0" . to_string( ) ,
2156
+ source_id: "test-source" . to_string( ) ,
2157
+ doc_batch: Some ( DocBatchV2 :: for_test( [ "test-doc-foo" ] ) ) ,
2158
+ } ] ,
2159
+ commit_type : CommitTypeV2 :: Auto as i32 ,
2160
+ } ;
2161
+ let ingest_response = router. ingest ( ingest_request) . await . unwrap ( ) ;
2162
+ assert_eq ! ( ingest_response. successes. len( ) , 0 ) ;
2163
+ assert_eq ! ( ingest_response. failures. len( ) , 1 ) ;
2164
+ assert_eq ! (
2165
+ ingest_response. failures[ 0 ] . reason( ) ,
2166
+ IngestFailureReason :: ShardRateLimited
2167
+ ) ;
2168
+ }
2078
2169
}
0 commit comments