29
29
import org .elasticsearch .action .support .IndicesOptions ;
30
30
import org .elasticsearch .client .Client ;
31
31
import org .elasticsearch .cluster .ClusterState ;
32
+ import org .elasticsearch .cluster .block .ClusterBlockException ;
32
33
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
33
34
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
34
35
import org .elasticsearch .cluster .node .DiscoveryNode ;
@@ -472,10 +473,89 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
472
473
Map <String , Set <String >> routingMap = indexNameExpressionResolver .resolveSearchRouting (clusterState , searchRequest .routing (),
473
474
searchRequest .indices ());
474
475
routingMap = routingMap == null ? Collections .emptyMap () : Collections .unmodifiableMap (routingMap );
475
- String [] concreteIndices = new String [indices .length ];
476
- for (int i = 0 ; i < indices .length ; i ++) {
477
- concreteIndices [i ] = indices [i ].getName ();
476
+ Map <String , Float > concreteIndexBoosts = resolveIndexBoosts (searchRequest , clusterState );
477
+
478
+ if (shouldSplitIndices (searchRequest )) {
479
+ //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.
480
+ //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other
481
+ //indices (possibly slower) being searched at the same time.
482
+ List <String > writeIndicesList = new ArrayList <>();
483
+ List <String > readOnlyIndicesList = new ArrayList <>();
484
+ splitIndices (indices , clusterState , writeIndicesList , readOnlyIndicesList );
485
+ String [] writeIndices = writeIndicesList .toArray (new String [0 ]);
486
+ String [] readOnlyIndices = readOnlyIndicesList .toArray (new String [0 ]);
487
+
488
+ if (readOnlyIndices .length == 0 ) {
489
+ executeSearch (task , timeProvider , searchRequest , localIndices , writeIndices , routingMap ,
490
+ aliasFilter , concreteIndexBoosts , remoteShardIterators , remoteConnections , clusterState , listener , clusters );
491
+ } else if (writeIndices .length == 0 && remoteShardIterators .isEmpty ()) {
492
+ executeSearch (task , timeProvider , searchRequest , localIndices , readOnlyIndices , routingMap ,
493
+ aliasFilter , concreteIndexBoosts , remoteShardIterators , remoteConnections , clusterState , listener , clusters );
494
+ } else {
495
+ //Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so
496
+ //that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices.
497
+ CountDown countDown = new CountDown (2 );
498
+ AtomicReference <Exception > exceptions = new AtomicReference <>();
499
+ SearchResponseMerger searchResponseMerger = createSearchResponseMerger (searchRequest .source (), timeProvider ,
500
+ searchService ::createReduceContext );
501
+ CountDownActionListener <SearchResponse , SearchResponse > countDownActionListener =
502
+ new CountDownActionListener <>(countDown , exceptions , listener ) {
503
+ @ Override
504
+ void innerOnResponse (SearchResponse searchResponse ) {
505
+ searchResponseMerger .add (searchResponse );
506
+ }
507
+
508
+ @ Override
509
+ SearchResponse createFinalResponse () {
510
+ return searchResponseMerger .getMergedResponse (clusters );
511
+ }
512
+ };
513
+
514
+ //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
515
+ //will be provided separately to executeSearch.
516
+ SearchRequest writeIndicesRequest = SearchRequest .subSearchRequest (searchRequest , writeIndices ,
517
+ RemoteClusterService .LOCAL_CLUSTER_GROUP_KEY , timeProvider .getAbsoluteStartMillis (), false );
518
+ executeSearch (task , timeProvider , writeIndicesRequest , localIndices , writeIndices , routingMap ,
519
+ aliasFilter , concreteIndexBoosts , remoteShardIterators , remoteConnections , clusterState , countDownActionListener ,
520
+ SearchResponse .Clusters .EMPTY );
521
+
522
+ //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
523
+ //will be provided separately to executeSearch.
524
+ SearchRequest readOnlyIndicesRequest = SearchRequest .subSearchRequest (searchRequest , readOnlyIndices ,
525
+ RemoteClusterService .LOCAL_CLUSTER_GROUP_KEY , timeProvider .getAbsoluteStartMillis (), false );
526
+ executeSearch (task , timeProvider , readOnlyIndicesRequest , localIndices , readOnlyIndices , routingMap ,
527
+ aliasFilter , concreteIndexBoosts , Collections .emptyList (), (alias , id ) -> null , clusterState , countDownActionListener ,
528
+ SearchResponse .Clusters .EMPTY );
529
+ }
530
+ } else {
531
+ String [] concreteIndices = Arrays .stream (indices ).map (Index ::getName ).toArray (String []::new );
532
+ executeSearch (task , timeProvider , searchRequest , localIndices , concreteIndices , routingMap ,
533
+ aliasFilter , concreteIndexBoosts , remoteShardIterators , remoteConnections , clusterState , listener , clusters );
534
+ }
535
+ }
536
+
537
+ static boolean shouldSplitIndices (SearchRequest searchRequest ) {
538
+ return searchRequest .scroll () == null && searchRequest .searchType () != DFS_QUERY_THEN_FETCH
539
+ && (searchRequest .source () == null || searchRequest .source ().size () != 0 );
540
+ }
541
+
542
+ static void splitIndices (Index [] indices , ClusterState clusterState , List <String > writeIndices , List <String > readOnlyIndices ) {
543
+ for (Index index : indices ) {
544
+ ClusterBlockException writeBlock = clusterState .blocks ().indexBlockedException (ClusterBlockLevel .WRITE , index .getName ());
545
+ if (writeBlock == null ) {
546
+ writeIndices .add (index .getName ());
547
+ } else {
548
+ readOnlyIndices .add (index .getName ());
549
+ }
478
550
}
551
+ }
552
+
553
+ private void executeSearch (SearchTask task , SearchTimeProvider timeProvider , SearchRequest searchRequest ,
554
+ OriginalIndices localIndices , String [] concreteIndices , Map <String , Set <String >> routingMap ,
555
+ Map <String , AliasFilter > aliasFilter , Map <String , Float > concreteIndexBoosts ,
556
+ List <SearchShardIterator > remoteShardIterators , BiFunction <String , String , DiscoveryNode > remoteConnections ,
557
+ ClusterState clusterState , ActionListener <SearchResponse > listener , SearchResponse .Clusters clusters ) {
558
+
479
559
Map <String , Long > nodeSearchCounts = searchTransportService .getPendingSearchRequests ();
480
560
GroupShardsIterator <ShardIterator > localShardsIterator = clusterService .operationRouting ().searchShards (clusterState ,
481
561
concreteIndices , routingMap , searchRequest .preference (), searchService .getResponseCollectorService (), nodeSearchCounts );
@@ -484,8 +564,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
484
564
485
565
failIfOverShardCountLimit (clusterService , shardIterators .size ());
486
566
487
- Map <String , Float > concreteIndexBoosts = resolveIndexBoosts (searchRequest , clusterState );
488
-
489
567
// optimize search type for cases where there is only one shard group to search on
490
568
if (shardIterators .size () == 1 ) {
491
569
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
@@ -498,11 +576,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
498
576
if (searchRequest .isSuggestOnly ()) {
499
577
// disable request cache if we have only suggest
500
578
searchRequest .requestCache (false );
501
- switch (searchRequest .searchType ()) {
502
- case DFS_QUERY_THEN_FETCH :
503
- // convert to Q_T_F if we have only suggest
504
- searchRequest .searchType (QUERY_THEN_FETCH );
505
- break ;
579
+ if (searchRequest .searchType () == DFS_QUERY_THEN_FETCH ) {
580
+ // convert to Q_T_F if we have only suggest
581
+ searchRequest .searchType (QUERY_THEN_FETCH );
506
582
}
507
583
}
508
584
@@ -611,22 +687,16 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int
611
687
}
612
688
}
613
689
614
- abstract static class CCSActionListener <Response , FinalResponse > implements ActionListener <Response > {
615
- private final String clusterAlias ;
616
- private final boolean skipUnavailable ;
690
+ abstract static class CountDownActionListener <Response , FinalResponse > implements ActionListener <Response > {
617
691
private final CountDown countDown ;
618
- private final AtomicInteger skippedClusters ;
619
692
private final AtomicReference <Exception > exceptions ;
620
- private final ActionListener <FinalResponse > originalListener ;
693
+ private final ActionListener <FinalResponse > delegateListener ;
621
694
622
- CCSActionListener (String clusterAlias , boolean skipUnavailable , CountDown countDown , AtomicInteger skippedClusters ,
623
- AtomicReference <Exception > exceptions , ActionListener <FinalResponse > originalListener ) {
624
- this .clusterAlias = clusterAlias ;
625
- this .skipUnavailable = skipUnavailable ;
695
+ CountDownActionListener (CountDown countDown , AtomicReference <Exception > exceptions ,
696
+ ActionListener <FinalResponse > delegateListener ) {
626
697
this .countDown = countDown ;
627
- this .skippedClusters = skippedClusters ;
628
698
this .exceptions = exceptions ;
629
- this .originalListener = originalListener ;
699
+ this .delegateListener = delegateListener ;
630
700
}
631
701
632
702
@ Override
@@ -637,44 +707,64 @@ public final void onResponse(Response response) {
637
707
638
708
abstract void innerOnResponse (Response response );
639
709
640
- @ Override
641
- public final void onFailure (Exception e ) {
642
- if (skipUnavailable ) {
643
- skippedClusters .incrementAndGet ();
644
- } else {
645
- Exception exception = e ;
646
- if (RemoteClusterAware .LOCAL_CLUSTER_GROUP_KEY .equals (clusterAlias ) == false ) {
647
- exception = wrapRemoteClusterFailure (clusterAlias , e );
648
- }
649
- if (exceptions .compareAndSet (null , exception ) == false ) {
650
- exceptions .accumulateAndGet (exception , (previous , current ) -> {
651
- current .addSuppressed (previous );
652
- return current ;
653
- });
654
- }
655
- }
656
- maybeFinish ();
657
- }
658
-
659
- private void maybeFinish () {
710
+ final void maybeFinish () {
660
711
if (countDown .countDown ()) {
661
712
Exception exception = exceptions .get ();
662
713
if (exception == null ) {
663
714
FinalResponse response ;
664
715
try {
665
716
response = createFinalResponse ();
666
717
} catch (Exception e ) {
667
- originalListener .onFailure (e );
718
+ delegateListener .onFailure (e );
668
719
return ;
669
720
}
670
- originalListener .onResponse (response );
721
+ delegateListener .onResponse (response );
671
722
} else {
672
- originalListener .onFailure (exceptions .get ());
723
+ delegateListener .onFailure (exceptions .get ());
673
724
}
674
725
}
675
726
}
676
727
677
728
abstract FinalResponse createFinalResponse ();
729
+
730
+ @ Override
731
+ public void onFailure (Exception e ) {
732
+ if (exceptions .compareAndSet (null , e ) == false ) {
733
+ exceptions .accumulateAndGet (e , (previous , current ) -> {
734
+ current .addSuppressed (previous );
735
+ return current ;
736
+ });
737
+ }
738
+ maybeFinish ();
739
+ }
740
+ }
741
+
742
+ abstract static class CCSActionListener <Response , FinalResponse > extends CountDownActionListener <Response , FinalResponse > {
743
+ private final String clusterAlias ;
744
+ private final boolean skipUnavailable ;
745
+ private final AtomicInteger skippedClusters ;
746
+
747
+ CCSActionListener (String clusterAlias , boolean skipUnavailable , CountDown countDown , AtomicInteger skippedClusters ,
748
+ AtomicReference <Exception > exceptions , ActionListener <FinalResponse > originalListener ) {
749
+ super (countDown , exceptions , originalListener );
750
+ this .clusterAlias = clusterAlias ;
751
+ this .skipUnavailable = skipUnavailable ;
752
+ this .skippedClusters = skippedClusters ;
753
+ }
754
+
755
+ @ Override
756
+ public final void onFailure (Exception e ) {
757
+ if (skipUnavailable ) {
758
+ skippedClusters .incrementAndGet ();
759
+ maybeFinish ();
760
+ } else {
761
+ Exception exception = e ;
762
+ if (RemoteClusterAware .LOCAL_CLUSTER_GROUP_KEY .equals (clusterAlias ) == false ) {
763
+ exception = wrapRemoteClusterFailure (clusterAlias , e );
764
+ }
765
+ super .onFailure (exception );
766
+ }
767
+ }
678
768
}
679
769
680
770
private static RemoteTransportException wrapRemoteClusterFailure (String clusterAlias , Exception e ) {
0 commit comments