Skip to content

Split search in two when made against read-only and write indices #42510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -472,10 +473,89 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
String[] concreteIndices = new String[indices.length];
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

if (shouldSplitIndices(searchRequest)) {
//Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.
//Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other
//indices (possibly slower) being searched at the same time.
List<String> writeIndicesList = new ArrayList<>();
List<String> readOnlyIndicesList = new ArrayList<>();
splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);
String[] writeIndices = writeIndicesList.toArray(new String[0]);
String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);

if (readOnlyIndices.length == 0) {
executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
} else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) {
executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
} else {
//Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so
//that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices.
CountDown countDown = new CountDown(2);
AtomicReference<Exception> exceptions = new AtomicReference<>();
SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider,
searchService::createReduceContext);
CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener =
new CountDownActionListener<>(countDown, exceptions, listener) {
@Override
void innerOnResponse(SearchResponse searchResponse) {
searchResponseMerger.add(searchResponse);
}

@Override
SearchResponse createFinalResponse() {
return searchResponseMerger.getMergedResponse(clusters);
}
};

//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
//will be provided separately to executeSearch.
SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices,
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener,
SearchResponse.Clusters.EMPTY);

//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
//will be provided separately to executeSearch.
SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices,
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap,
aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener,
SearchResponse.Clusters.EMPTY);
}
} else {
String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new);
executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
}
}

static boolean shouldSplitIndices(SearchRequest searchRequest) {
return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH
&& (searchRequest.source() == null || searchRequest.source().size() != 0);
}

static void splitIndices(Index[] indices, ClusterState clusterState, List<String> writeIndices, List<String> readOnlyIndices) {
for (Index index : indices) {
ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName());
if (writeBlock == null) {
writeIndices.add(index.getName());
} else {
readOnlyIndices.add(index.getName());
}
}
}

private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {

Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
Expand All @@ -484,8 +564,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea

failIfOverShardCountLimit(clusterService, shardIterators.size());

Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

// optimize search type for cases where there is only one shard group to search on
if (shardIterators.size() == 1) {
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
Expand All @@ -498,11 +576,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
if (searchRequest.isSuggestOnly()) {
// disable request cache if we have only suggest
searchRequest.requestCache(false);
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
// convert to Q_T_F if we have only suggest
searchRequest.searchType(QUERY_THEN_FETCH);
break;
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
// convert to Q_T_F if we have only suggest
searchRequest.searchType(QUERY_THEN_FETCH);
}
}

Expand Down Expand Up @@ -611,22 +687,16 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int
}
}

abstract static class CCSActionListener<Response, FinalResponse> implements ActionListener<Response> {
private final String clusterAlias;
private final boolean skipUnavailable;
abstract static class CountDownActionListener<Response, FinalResponse> implements ActionListener<Response> {
private final CountDown countDown;
private final AtomicInteger skippedClusters;
private final AtomicReference<Exception> exceptions;
private final ActionListener<FinalResponse> originalListener;
private final ActionListener<FinalResponse> delegateListener;

CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
this.clusterAlias = clusterAlias;
this.skipUnavailable = skipUnavailable;
CountDownActionListener(CountDown countDown, AtomicReference<Exception> exceptions,
ActionListener<FinalResponse> delegateListener) {
this.countDown = countDown;
this.skippedClusters = skippedClusters;
this.exceptions = exceptions;
this.originalListener = originalListener;
this.delegateListener = delegateListener;
}

@Override
Expand All @@ -637,44 +707,64 @@ public final void onResponse(Response response) {

abstract void innerOnResponse(Response response);

@Override
public final void onFailure(Exception e) {
if (skipUnavailable) {
skippedClusters.incrementAndGet();
} else {
Exception exception = e;
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
exception = wrapRemoteClusterFailure(clusterAlias, e);
}
if (exceptions.compareAndSet(null, exception) == false) {
exceptions.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
}
maybeFinish();
}

private void maybeFinish() {
final void maybeFinish() {
if (countDown.countDown()) {
Exception exception = exceptions.get();
if (exception == null) {
FinalResponse response;
try {
response = createFinalResponse();
} catch(Exception e) {
originalListener.onFailure(e);
delegateListener.onFailure(e);
return;
}
originalListener.onResponse(response);
delegateListener.onResponse(response);
} else {
originalListener.onFailure(exceptions.get());
delegateListener.onFailure(exceptions.get());
}
}
}

abstract FinalResponse createFinalResponse();

@Override
public void onFailure(Exception e) {
if (exceptions.compareAndSet(null, e) == false) {
exceptions.accumulateAndGet(e, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
maybeFinish();
}
}

abstract static class CCSActionListener<Response, FinalResponse> extends CountDownActionListener<Response, FinalResponse> {
private final String clusterAlias;
private final boolean skipUnavailable;
private final AtomicInteger skippedClusters;

CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
super(countDown, exceptions, originalListener);
this.clusterAlias = clusterAlias;
this.skipUnavailable = skipUnavailable;
this.skippedClusters = skippedClusters;
}

@Override
public final void onFailure(Exception e) {
if (skipUnavailable) {
skippedClusters.incrementAndGet();
maybeFinish();
} else {
Exception exception = e;
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
exception = wrapRemoteClusterFailure(clusterAlias, e);
}
super.onFailure(exception);
}
}
}

private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package org.elasticsearch.action.search;

import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -174,4 +177,62 @@ public void testFinalReduce() {
assertEquals(2, longTerms.getBuckets().size());
}
}

public void testSplitIndices() {
{
CreateIndexResponse response = client().admin().indices().prepareCreate("write").get();
assertTrue(response.isAcknowledged());
}
{
CreateIndexResponse response = client().admin().indices().prepareCreate("readonly").get();
assertTrue(response.isAcknowledged());
}
{
SearchResponse response = client().prepareSearch("readonly").get();
assertEquals(1, response.getTotalShards());
assertEquals(1, response.getSuccessfulShards());
assertEquals(1, response.getNumReducePhases());
}
{
SearchResponse response = client().prepareSearch("write").get();
assertEquals(1, response.getTotalShards());
assertEquals(1, response.getSuccessfulShards());
assertEquals(1, response.getNumReducePhases());
}
{
SearchResponse response = client().prepareSearch("readonly", "write").get();
assertEquals(2, response.getTotalShards());
assertEquals(2, response.getSuccessfulShards());
assertEquals(1, response.getNumReducePhases());
}
{
Settings settings = Settings.builder().put("index.blocks.read_only", "true").build();
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
assertTrue(response.isAcknowledged());
}
try {
{
SearchResponse response = client().prepareSearch("readonly").get();
assertEquals(1, response.getTotalShards());
assertEquals(1, response.getSuccessfulShards());
assertEquals(1, response.getNumReducePhases());
}
{
SearchResponse response = client().prepareSearch("write").get();
assertEquals(1, response.getTotalShards());
assertEquals(1, response.getSuccessfulShards());
assertEquals(1, response.getNumReducePhases());
}
{
SearchResponse response = client().prepareSearch("readonly", "write").get();
assertEquals(2, response.getTotalShards());
assertEquals(2, response.getSuccessfulShards());
assertEquals(3, response.getNumReducePhases());
}
} finally {
Settings settings = Settings.builder().put("index.blocks.read_only", "false").build();
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
assertTrue(response.isAcknowledged());
}
}
}
Loading