Skip to content

Commit 748a108

Browse files
Reindex ScrollableHitSource pump data out (#43864)
Refactor ScrollableHitSource to pump data out and have a simplified interface (callers should no longer call startNextScroll, instead they simply mark that they are done with the previous result, triggering a new batch of data). This eases making reindex resilient, since we will sometimes need to rerun search during retries. Relates #43187 and #42612
1 parent 859709c commit 748a108

File tree

8 files changed

+461
-86
lines changed

8 files changed

+461
-86
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

+25-23
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
112112
* {@link RequestWrapper} completely.
113113
*/
114114
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
115+
private int lastBatchSize;
115116

116117
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
117118
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
@@ -211,7 +212,8 @@ private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs)
211212
}
212213

213214
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
214-
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim, client,
215+
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
216+
this::onScrollResponse, this::finishHim, client,
215217
mainRequest.getSearchRequest());
216218
}
217219

@@ -235,19 +237,26 @@ public void start() {
235237
}
236238
try {
237239
startTime.set(System.nanoTime());
238-
scrollSource.start(response -> onScrollResponse(timeValueNanos(System.nanoTime()), 0, response));
240+
scrollSource.start();
239241
} catch (Exception e) {
240242
finishHim(e);
241243
}
242244
}
243245

246+
void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
247+
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
248+
// like a bug?
249+
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
250+
}
251+
244252
/**
245253
* Process a scroll response.
246254
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
247255
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
248-
* @param response the scroll response to process
256+
* @param asyncResponse the response to process from ScrollableHitSource
249257
*/
250-
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.Response response) {
258+
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
259+
ScrollableHitSource.Response response = asyncResponse.response();
251260
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
252261
if (task.isCancelled()) {
253262
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
@@ -274,7 +283,7 @@ protected void doRun() throws Exception {
274283
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
275284
* waiting on the scroll doesn't count against this batch in the throttle.
276285
*/
277-
prepareBulkRequest(timeValueNanos(System.nanoTime()), response);
286+
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
278287
}
279288

280289
@Override
@@ -291,7 +300,8 @@ public void onFailure(Exception e) {
291300
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
292301
* thread may be blocked by the user script.
293302
*/
294-
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Response response) {
303+
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
304+
ScrollableHitSource.Response response = asyncResponse.response();
295305
logger.debug("[{}]: preparing bulk request", task.getId());
296306
if (task.isCancelled()) {
297307
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
@@ -316,18 +326,18 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon
316326
/*
317327
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
318328
*/
319-
startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), 0);
329+
notifyDone(thisBatchStartTime, asyncResponse, 0);
320330
return;
321331
}
322332
request.timeout(mainRequest.getTimeout());
323333
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
324-
sendBulkRequest(thisBatchStartTime, request);
334+
sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
325335
}
326336

327337
/**
328338
* Send a bulk request, handling retries.
329339
*/
330-
void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
340+
void sendBulkRequest(BulkRequest request, Runnable onSuccess) {
331341
if (logger.isDebugEnabled()) {
332342
logger.debug("[{}]: sending [{}] entry, [{}] bulk request", task.getId(), request.requests().size(),
333343
new ByteSizeValue(request.estimatedSizeInBytes()));
@@ -340,7 +350,7 @@ void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
340350
bulkRetry.withBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
341351
@Override
342352
public void onResponse(BulkResponse response) {
343-
onBulkResponse(thisBatchStartTime, response);
353+
onBulkResponse(response, onSuccess);
344354
}
345355

346356
@Override
@@ -353,7 +363,7 @@ public void onFailure(Exception e) {
353363
/**
354364
* Processes bulk responses, accounting for failures.
355365
*/
356-
void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
366+
void onBulkResponse(BulkResponse response, Runnable onSuccess) {
357367
try {
358368
List<Failure> failures = new ArrayList<>();
359369
Set<String> destinationIndicesThisBatch = new HashSet<>();
@@ -401,28 +411,20 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
401411
return;
402412
}
403413

404-
startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), response.getItems().length);
414+
onSuccess.run();
405415
} catch (Exception t) {
406416
finishHim(t);
407417
}
408418
}
409419

410-
/**
411-
* Start the next scroll request.
412-
*
413-
* @param lastBatchSize the number of requests sent in the last batch. This is used to calculate the throttling values which are applied
414-
* when the scroll returns
415-
*/
416-
void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
420+
void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
417421
if (task.isCancelled()) {
418422
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
419423
finishHim(null);
420424
return;
421425
}
422-
TimeValue extraKeepAlive = worker.throttleWaitTime(lastBatchStartTime, now, lastBatchSize);
423-
scrollSource.startNextScroll(extraKeepAlive, response -> {
424-
onScrollResponse(lastBatchStartTime, lastBatchSize, response);
425-
});
426+
this.lastBatchSize = batchSize;
427+
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
426428
}
427429

428430
private void recordFailure(Failure failure, List<Failure> failures) {

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP
279279
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
280280
createdThreads = synchronizedList(new ArrayList<>());
281281
RestClient restClient = buildRestClient(remoteInfo, mainAction.sslConfig, task.getId(), createdThreads);
282-
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim,
282+
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
283+
this::onScrollResponse, this::finishHim,
283284
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
284285
}
285286
return super.buildScrollableResultSource(backoffPolicy);

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.ElasticsearchStatusException;
3131
import org.elasticsearch.Version;
3232
import org.elasticsearch.action.bulk.BackoffPolicy;
33-
import org.elasticsearch.index.reindex.ScrollableHitSource;
3433
import org.elasticsearch.action.search.SearchRequest;
3534
import org.elasticsearch.client.Request;
3635
import org.elasticsearch.client.ResponseException;
@@ -44,9 +43,10 @@
4443
import org.elasticsearch.common.util.concurrent.ThreadContext;
4544
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
4645
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
47-
import org.elasticsearch.common.xcontent.XContentParser;
4846
import org.elasticsearch.common.xcontent.XContentParseException;
47+
import org.elasticsearch.common.xcontent.XContentParser;
4948
import org.elasticsearch.common.xcontent.XContentType;
49+
import org.elasticsearch.index.reindex.ScrollableHitSource;
5050
import org.elasticsearch.rest.RestStatus;
5151
import org.elasticsearch.threadpool.ThreadPool;
5252

@@ -68,8 +68,9 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
6868
Version remoteVersion;
6969

7070
public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry,
71-
Consumer<Exception> fail, RestClient client, BytesReference query, SearchRequest searchRequest) {
72-
super(logger, backoffPolicy, threadPool, countSearchRetry, fail);
71+
Consumer<AsyncResponse> onResponse, Consumer<Exception> fail,
72+
RestClient client, BytesReference query, SearchRequest searchRequest) {
73+
super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail);
7374
this.query = query;
7475
this.searchRequest = searchRequest;
7576
this.client = client;

0 commit comments

Comments
 (0)