Skip to content

Reindex search resiliency #45497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ActionListener<BulkByScrollResponse> listener,
@Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig) {
@Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig,
@Nullable String restartFromField) {
this.task = task;
this.scriptService = scriptService;
this.sslConfig = sslConfig;
Expand All @@ -135,7 +136,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(backoffPolicy);
// todo: this is trappy, since if a subclass override relies on subclass fields, they are not initialized. We should fix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This was causing me problems in a recent PR.

// to simply pass in the hit-source.
scrollSource = buildScrollableResultSource(backoffPolicy, restartFromField);
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
/*
* Default to sorting by doc. We can't do this in the request itself because it is normal to *add* to the sorts rather than replace
Expand Down Expand Up @@ -213,10 +216,11 @@ private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs)
return bulkRequest;
}

protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy,
String restartFromField) {
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim, client,
mainRequest.getSearchRequest());
mainRequest.getSearchRequest(), restartFromField);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<De
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
ActionListener<BulkByScrollResponse> listener) {
super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
super(task, false, true, logger, client, threadPool, request, listener,
scriptService, null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.client.RestClient;
Expand All @@ -47,10 +48,14 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -92,17 +97,42 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
}

public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
request.getSearchRequest().allowPartialSearchResults(false);
// Notice that this is called both on leader and workers when slicing.
String resumableSortingField = request.getRemoteInfo() == null ? getOrAddRestartFromField(request.getSearchRequest()) : null;

BulkByScrollParallelizationHelper.executeSlicedAction(task, request, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool,
scriptService, reindexSslConfig, request, listener);
scriptService, reindexSslConfig, request, resumableSortingField, listener);
searchAction.start();
});

}

private static String getOrAddRestartFromField(SearchRequest searchRequest) {
// we keep with the tradition of modifying the input request, though this can lead to strange results (in transport clients).
List<SortBuilder<?>> sorts = searchRequest.source().sorts();
if (sorts != null && sorts.size() >= 1) {
SortBuilder<?> firstSort = sorts.get(0);
if (firstSort instanceof FieldSortBuilder) {
FieldSortBuilder fieldSort = (FieldSortBuilder) firstSort;
if (SeqNoFieldMapper.NAME.equals(fieldSort.getFieldName())
&& fieldSort.order() == SortOrder.ASC) {
return SeqNoFieldMapper.NAME;
}
// todo: support non seq_no fields and descending, but need to check field is numeric and handle missing values too then.
}
return null;
}

// use unmapped_type to ensure that sorting works when index is newly created without mappings
searchRequest.source().sort(new FieldSortBuilder(SeqNoFieldMapper.NAME).unmappedType("long"));
return SeqNoFieldMapper.NAME;
}

/**
* Build the {@link RestClient} used for reindexing from remote clusters.
* @param remoteInfo connection information for the remote cluster
Expand Down Expand Up @@ -170,18 +200,20 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re

AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
ActionListener<BulkByScrollResponse> listener) {
String restartFromField, ActionListener<BulkByScrollResponse> listener) {
super(task,
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false, logger, client, threadPool, request, listener, scriptService, sslConfig);
SeqNoFieldMapper.NAME.equals(restartFromField), logger, client, threadPool, request, listener,
scriptService, sslConfig, restartFromField);
}

@Override
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy,
String restartFromField) {
if (mainRequest.getRemoteInfo() != null) {
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
createdThreads = synchronizedList(new ArrayList<>());
Expand All @@ -191,7 +223,7 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP
this::onScrollResponse, this::finishHim,
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
}
return super.buildScrollableResultSource(backoffPolicy);
return super.buildScrollableResultSource(backoffPolicy, restartFromField);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Up
super(task,
// use sequence number powered optimistic concurrency control
false, true,
logger, client, threadPool, request, listener, scriptService, null);
logger, client, threadPool, request, listener, scriptService, null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry,
Consumer<AsyncResponse> onResponse, Consumer<Exception> fail,
RestClient client, BytesReference query, SearchRequest searchRequest) {
super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail);
super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail, null);// todo: handle resume or grace
this.query = query;
this.searchRequest = searchRequest;
this.client = client;
}

@Override
protected void doStart(RejectAwareActionListener<Response> searchListener) {
protected void doStart(TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
remoteVersion = version;
execute(RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
Expand All @@ -97,12 +97,28 @@ private void onStartResponse(RejectAwareActionListener<Response> searchListener,
}
}

@Override
protected void doRestart(TimeValue extraKeepAlive, long restartFromValue, RejectAwareActionListener<Response> searchListener) {
assert false;
throw new UnsupportedOperationException("restart during remote reindex not supported yet");
}

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
}

@Override
protected boolean canRestart() {
return false;
}

@Override
protected String[] indices() {
return searchRequest.indices();
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public void setup() {
@SuppressWarnings("unused")
public void testReindex() {
Client client = client();
// todo: this is only necessary to ensure the seqno mapping is created.
client().prepareIndex(INDEX_NAME, "_doc", "1").setSource("data", "x").get();
// tag::reindex1
BulkByScrollResponse response =
new ReindexRequestBuilder(client, ReindexAction.INSTANCE)
Expand All @@ -94,6 +96,8 @@ public void testReindex() {
.filter(QueryBuilders.matchQuery("category", "xzy")) // <1>
.get();
// end::reindex1

client().prepareDelete(INDEX_NAME, "_doc", "1").get();
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception
ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, buildTestBackoffPolicy(),
threadPool,
testTask.getWorkerState()::countSearchRetry, r -> fail(), ExceptionsHelper::reThrowIfNotNull,
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest());
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest(), null);
hitSource.setScroll(scrollId());
hitSource.startNextScroll(TimeValue.timeValueSeconds(0));
assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
Expand All @@ -240,7 +240,7 @@ public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() t
ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, buildTestBackoffPolicy(),
threadPool,
testTask.getWorkerState()::countSearchRetry, r -> fail(), validingOnFail,
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest());
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest(), null);
hitSource.setScroll(scrollId());
hitSource.startNextScroll(TimeValue.timeValueSeconds(0));
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
Expand Down Expand Up @@ -733,7 +733,8 @@ private class DummyAsyncBulkByScrollAction
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, DummyTransportAsyncBulkByScrollAction> {
DummyAsyncBulkByScrollAction() {
super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger,
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener, null, null);
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener,
null, null, null);
}

@Override
Expand Down
Loading