38
38
import org .elasticsearch .transport .TransportException ;
39
39
import org .elasticsearch .transport .TransportRequest ;
40
40
import org .elasticsearch .transport .TransportRequestOptions ;
41
+ import org .hamcrest .Matchers ;
41
42
42
43
import java .io .IOException ;
43
44
import java .util .ArrayList ;
56
57
57
58
import static org .elasticsearch .common .util .concurrent .ConcurrentCollections .newConcurrentMap ;
58
59
import static org .elasticsearch .common .util .concurrent .ConcurrentCollections .newConcurrentSet ;
60
+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
59
61
60
62
public class SearchAsyncActionTests extends ESTestCase {
61
63
@@ -371,6 +373,102 @@ protected void executeNext(Runnable runnable, Thread originalThread) {
371
373
executor .shutdown ();
372
374
}
373
375
376
+ public void testAllowPartialResults () throws InterruptedException {
377
+ SearchRequest request = new SearchRequest ();
378
+ request .allowPartialSearchResults (false );
379
+ int numConcurrent = randomIntBetween (1 , 5 );
380
+ request .setMaxConcurrentShardRequests (numConcurrent );
381
+ int numShards = randomIntBetween (5 , 10 );
382
+ AtomicBoolean searchPhaseDidRun = new AtomicBoolean (false );
383
+ ActionListener <SearchResponse > responseListener = ActionListener .wrap (response -> {},
384
+ (e ) -> { throw new AssertionError ("unexpected" , e );} );
385
+ DiscoveryNode primaryNode = new DiscoveryNode ("node_1" , buildNewFakeTransportAddress (), Version .CURRENT );
386
+ // for the sake of this test we place the replica on the same node. ie. this is not a mistake since we limit per node now
387
+ DiscoveryNode replicaNode = new DiscoveryNode ("node_1" , buildNewFakeTransportAddress (), Version .CURRENT );
388
+
389
+ AtomicInteger contextIdGenerator = new AtomicInteger (0 );
390
+ GroupShardsIterator <SearchShardIterator > shardsIter = getShardsIter ("idx" ,
391
+ new OriginalIndices (new String []{"idx" }, SearchRequest .DEFAULT_INDICES_OPTIONS ),
392
+ numShards , true , primaryNode , replicaNode );
393
+ int numShardAttempts = 0 ;
394
+ for (SearchShardIterator it : shardsIter ) {
395
+ numShardAttempts += it .remaining ();
396
+ }
397
+ CountDownLatch latch = new CountDownLatch (numShardAttempts );
398
+
399
+ SearchTransportService transportService = new SearchTransportService (null , null );
400
+ Map <String , Transport .Connection > lookup = new HashMap <>();
401
+ Map <ShardId , Boolean > seenShard = new ConcurrentHashMap <>();
402
+ lookup .put (primaryNode .getId (), new MockConnection (primaryNode ));
403
+ lookup .put (replicaNode .getId (), new MockConnection (replicaNode ));
404
+ Map <String , AliasFilter > aliasFilters = Collections .singletonMap ("_na_" , new AliasFilter (null , Strings .EMPTY_ARRAY ));
405
+ AtomicInteger numRequests = new AtomicInteger (0 );
406
+ AtomicInteger numFailReplicas = new AtomicInteger (0 );
407
+ AbstractSearchAsyncAction <TestSearchPhaseResult > asyncAction =
408
+ new AbstractSearchAsyncAction <>(
409
+ "test" ,
410
+ logger ,
411
+ transportService ,
412
+ (cluster , node ) -> {
413
+ assert cluster == null : "cluster was not null: " + cluster ;
414
+ return lookup .get (node ); },
415
+ aliasFilters ,
416
+ Collections .emptyMap (),
417
+ Collections .emptyMap (),
418
+ null ,
419
+ request ,
420
+ responseListener ,
421
+ shardsIter ,
422
+ new TransportSearchAction .SearchTimeProvider (0 , 0 , () -> 0 ),
423
+ 0 ,
424
+ null ,
425
+ new InitialSearchPhase .ArraySearchPhaseResults <>(shardsIter .size ()),
426
+ request .getMaxConcurrentShardRequests (),
427
+ SearchResponse .Clusters .EMPTY ) {
428
+
429
+ @ Override
430
+ protected void executePhaseOnShard (SearchShardIterator shardIt , ShardRouting shard ,
431
+ SearchActionListener <TestSearchPhaseResult > listener ) {
432
+ seenShard .computeIfAbsent (shard .shardId (), (i ) -> {
433
+ numRequests .incrementAndGet (); // only count this once per shard copy
434
+ return Boolean .TRUE ;
435
+ });
436
+ new Thread (() -> {
437
+ Transport .Connection connection = getConnection (null , shard .currentNodeId ());
438
+ TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult (contextIdGenerator .incrementAndGet (),
439
+ connection .getNode ());
440
+ if (shardIt .remaining () > 0 ) {
441
+ numFailReplicas .incrementAndGet ();
442
+ listener .onFailure (new RuntimeException ());
443
+ } else {
444
+ listener .onResponse (testSearchPhaseResult );
445
+ }
446
+ }).start ();
447
+ }
448
+
449
+ @ Override
450
+ protected SearchPhase getNextPhase (SearchPhaseResults <TestSearchPhaseResult > results , SearchPhaseContext context ) {
451
+ return new SearchPhase ("test" ) {
452
+ @ Override
453
+ public void run () {
454
+ assertTrue (searchPhaseDidRun .compareAndSet (false , true ));
455
+ }
456
+ };
457
+ }
458
+
459
+ @ Override
460
+ protected void executeNext (Runnable runnable , Thread originalThread ) {
461
+ super .executeNext (runnable , originalThread );
462
+ latch .countDown ();
463
+ }
464
+ };
465
+ asyncAction .start ();
466
+ latch .await ();
467
+ assertTrue (searchPhaseDidRun .get ());
468
+ assertEquals (numShards , numRequests .get ());
469
+ assertThat (numFailReplicas .get (), greaterThanOrEqualTo (1 ));
470
+ }
471
+
374
472
static GroupShardsIterator <SearchShardIterator > getShardsIter (String index , OriginalIndices originalIndices , int numShards ,
375
473
boolean doReplicas , DiscoveryNode primaryNode , DiscoveryNode replicaNode ) {
376
474
ArrayList <SearchShardIterator > list = new ArrayList <>();
@@ -389,12 +487,12 @@ static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, Orig
389
487
RecoverySource .PeerRecoverySource .INSTANCE , new UnassignedInfo (UnassignedInfo .Reason .INDEX_CREATED , "foobar" ));
390
488
if (replicaNode != null ) {
391
489
routing = routing .initialize (replicaNode .getId (), i + "r" , 0 );
392
- if (randomBoolean ()) {
490
+ // if (randomBoolean()) {
393
491
routing .started ();
394
492
started .add (routing );
395
- } else {
396
- initializing .add (routing );
397
- }
493
+ // } else {
494
+ // initializing.add(routing);
495
+ // }
398
496
} else {
399
497
unassigned .add (routing ); // unused yet
400
498
}
0 commit comments