Skip to content

Commit d091c12

Browse files
[7.x] Generalize AsyncTwoPhaseIndexer first phase (#61739) (#62482)
Current implementations of the indexer are using aggregations. Thus each search step executes a search action. However, we can generalize that to allow for any action that returns a `SearchResponse`. This commit abstracts the search phase from the search action. Backport of #61739
1 parent e0a4a94 commit d091c12

File tree

9 files changed

+40
-77
lines changed

9 files changed

+40
-77
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.bulk.BulkRequest;
1313
import org.elasticsearch.action.bulk.BulkResponse;
1414
import org.elasticsearch.action.index.IndexRequest;
15-
import org.elasticsearch.action.search.SearchRequest;
1615
import org.elasticsearch.action.search.SearchResponse;
1716
import org.elasticsearch.common.unit.TimeValue;
1817
import org.elasticsearch.common.util.concurrent.RunOnce;
@@ -287,17 +286,6 @@ protected float getMaxDocsPerSecond() {
287286
*/
288287
protected abstract IterationResult<JobPosition> doProcess(SearchResponse searchResponse);
289288

290-
/**
291-
* Called to build the next search request.
292-
*
293-
* In case the indexer is throttled waitTimeInNanos can be used as hint for building a less resource hungry
294-
* search request.
295-
*
296-
* @param waitTimeInNanos duration in nanoseconds the indexer has waited due to throttling.
297-
* @return SearchRequest to be passed to the search phase.
298-
*/
299-
protected abstract SearchRequest buildSearchRequest(long waitTimeInNanos);
300-
301289
/**
302290
* Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
303291
* internal state is {@link IndexerState#STARTED}.
@@ -310,15 +298,18 @@ protected float getMaxDocsPerSecond() {
310298
protected abstract void onStart(long now, ActionListener<Boolean> listener);
311299

312300
/**
313-
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
301+
* Executes the next search and calls <code>nextPhase</code> with the
314302
* response or the exception if an error occurs.
315303
*
316-
* @param request
317-
* The search request to execute
304+
* In case the indexer is throttled waitTimeInNanos can be used as hint for doing a less resource hungry
305+
* search.
306+
*
307+
* @param waitTimeInNanos
308+
* Duration in nanoseconds the indexer has waited due to throttling
318309
* @param nextPhase
319310
* Listener for the next phase
320311
*/
321-
protected abstract void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase);
312+
protected abstract void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase);
322313

323314
/**
324315
* Executes the {@link BulkRequest} and calls <code>nextPhase</code> with the
@@ -575,10 +566,7 @@ private void triggerNextSearch(long waitTimeInNanos) {
575566
stats.markStartSearch();
576567
lastSearchStartTimeNanos = getTimeNanos();
577568

578-
// ensure that partial results are not accepted and cause a search failure
579-
SearchRequest searchRequest = buildSearchRequest(waitTimeInNanos).allowPartialSearchResults(false);
580-
581-
doNextSearch(searchRequest, searchResponseListener);
569+
doNextSearch(waitTimeInNanos, searchResponseListener);
582570
}
583571

584572
/**

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.bulk.BulkRequest;
1313
import org.elasticsearch.action.bulk.BulkResponse;
1414
import org.elasticsearch.action.index.IndexRequest;
15-
import org.elasticsearch.action.search.SearchRequest;
1615
import org.elasticsearch.action.search.SearchResponse;
1716
import org.elasticsearch.action.search.SearchResponseSections;
1817
import org.elasticsearch.action.search.ShardSearchFailure;
@@ -72,7 +71,7 @@ protected String getJobId() {
7271
@Override
7372
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
7473
assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished);
75-
assertThat(step, equalTo(3));
74+
assertThat(step, equalTo(2));
7675
++step;
7776
return new IterationResult<>(Collections.emptyList(), 3, true);
7877
}
@@ -85,13 +84,6 @@ private void awaitForLatch() {
8584
}
8685
}
8786

88-
@Override
89-
protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
90-
assertThat(step, equalTo(1));
91-
++step;
92-
return new SearchRequest();
93-
}
94-
9587
@Override
9688
protected void onStart(long now, ActionListener<Boolean> listener) {
9789
assertThat(step, equalTo(0));
@@ -100,8 +92,8 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
10092
}
10193

10294
@Override
103-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
104-
assertThat(step, equalTo(2));
95+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
96+
assertThat(step, equalTo(1));
10597
++step;
10698
final SearchResponseSections sections = new SearchResponseSections(
10799
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
@@ -121,7 +113,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
121113
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
122114
// for stop before finished we do not know if its stopped before are after the search
123115
if (stoppedBeforeFinished == false) {
124-
assertThat(step, equalTo(5));
116+
assertThat(step, equalTo(4));
125117
}
126118
++step;
127119
next.run();
@@ -134,7 +126,7 @@ protected void onFailure(Exception exc) {
134126

135127
@Override
136128
protected void onFinish(ActionListener<Void> listener) {
137-
assertThat(step, equalTo(4));
129+
assertThat(step, equalTo(3));
138130
++step;
139131
listener.onResponse(null);
140132
assertTrue(isFinished.compareAndSet(false, true));
@@ -164,7 +156,6 @@ private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer<Integer, MockJobS
164156
// counters
165157
private volatile boolean started = false;
166158
private volatile boolean waitingForLatch = false;
167-
private volatile int searchRequests = 0;
168159
private volatile int searchOps = 0;
169160
private volatile int processOps = 0;
170161
private volatile int bulkOps = 0;
@@ -208,12 +199,6 @@ else if (processOps % 2 == 0) {
208199
return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false);
209200
}
210201

211-
@Override
212-
protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
213-
++searchRequests;
214-
return new SearchRequest();
215-
}
216-
217202
@Override
218203
protected void onStart(long now, ActionListener<Boolean> listener) {
219204
started = true;
@@ -238,7 +223,7 @@ public boolean waitingForLatchCountDown() {
238223
}
239224

240225
@Override
241-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
226+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
242227
++searchOps;
243228
final SearchResponseSections sections = new SearchResponseSections(
244229
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
@@ -289,7 +274,6 @@ protected long getTimeNanos() {
289274

290275
public void assertCounters() {
291276
assertTrue(started);
292-
assertEquals(5L, searchRequests);
293277
assertEquals(5L, searchOps);
294278
assertEquals(5L, processOps);
295279
assertEquals(2L, bulkOps);
@@ -318,13 +302,6 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
318302
return null;
319303
}
320304

321-
@Override
322-
protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
323-
assertThat(step, equalTo(1));
324-
++step;
325-
return new SearchRequest();
326-
}
327-
328305
@Override
329306
protected void onStart(long now, ActionListener<Boolean> listener) {
330307
assertThat(step, equalTo(0));
@@ -333,7 +310,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
333310
}
334311

335312
@Override
336-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
313+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
337314
throw new RuntimeException("Failed to build search request");
338315
}
339316

@@ -349,7 +326,7 @@ protected void doSaveState(IndexerState state, Integer position, Runnable next)
349326

350327
@Override
351328
protected void onFailure(Exception exc) {
352-
assertThat(step, equalTo(2));
329+
assertThat(step, equalTo(1));
353330
++step;
354331
assertTrue(isFinished.compareAndSet(false, true));
355332
}
@@ -414,7 +391,7 @@ public void testStateMachine() throws Exception {
414391
assertThat(indexer.getPosition(), equalTo(3));
415392

416393
assertFalse(isStopped.get());
417-
assertThat(indexer.getStep(), equalTo(6));
394+
assertThat(indexer.getStep(), equalTo(5));
418395
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
419396
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
420397
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
@@ -434,7 +411,7 @@ public void testStateMachineBrokenSearch() throws Exception {
434411
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
435412
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
436413
assertBusy(() -> assertTrue(isFinished.get()), 10000, TimeUnit.SECONDS);
437-
assertThat(indexer.getStep(), equalTo(3));
414+
assertThat(indexer.getStep(), equalTo(2));
438415
} finally {
439416
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
440417
}

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,17 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
124124
}
125125
}
126126

127-
@Override
128-
protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
129-
// Indexer is single-threaded, and only place that the ID scheme can get upgraded is doSaveState(), so
130-
// we can pass down the boolean value rather than the atomic here
127+
protected SearchRequest buildSearchRequest() {
131128
final Map<String, Object> position = getPosition();
132129
SearchSourceBuilder searchSource = new SearchSourceBuilder()
133130
.size(0)
134131
.trackTotalHits(false)
135132
// make sure we always compute complete buckets that appears before the configured delay
136133
.query(createBoundaryQuery(position))
137134
.aggregation(compositeBuilder.aggregateAfter(position));
138-
return new SearchRequest(job.getConfig().getIndexPattern()).source(searchSource);
135+
return new SearchRequest(job.getConfig().getIndexPattern())
136+
.allowPartialSearchResults(false)
137+
.source(searchSource);
139138
}
140139

141140
@Override

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.bulk.BulkRequest;
1414
import org.elasticsearch.action.bulk.BulkResponse;
1515
import org.elasticsearch.action.search.SearchAction;
16-
import org.elasticsearch.action.search.SearchRequest;
1716
import org.elasticsearch.action.search.SearchResponse;
1817
import org.elasticsearch.client.Client;
1918
import org.elasticsearch.client.ParentTaskAssigningClient;
@@ -110,9 +109,9 @@ protected class ClientRollupPageManager extends RollupIndexer {
110109
}
111110

112111
@Override
113-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
114-
ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, SearchAction.INSTANCE, request,
115-
nextPhase);
112+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
113+
ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, SearchAction.INSTANCE,
114+
buildSearchRequest(), nextPhase);
116115
}
117116

118117
@Override

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,8 @@ protected void onFailure(Exception e) {
650650
}
651651

652652
@Override
653-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> listener) {
653+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> listener) {
654+
SearchRequest request = buildSearchRequest();
654655
assertNotNull(request.source());
655656

656657
// extract query

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private static class EmptyRollupIndexer extends RollupIndexer {
6969

7070

7171
@Override
72-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
72+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
7373
// TODO Should use InternalComposite constructor but it is package protected in core.
7474
Aggregations aggs = new Aggregations(Collections.singletonList(new CompositeAggregation() {
7575
@Override
@@ -160,14 +160,14 @@ private CountDownLatch newLatch() {
160160
}
161161

162162
@Override
163-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
163+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
164164
assert latch != null;
165165
try {
166166
latch.await();
167167
} catch (InterruptedException e) {
168168
throw new IllegalStateException(e);
169169
}
170-
super.doNextSearch(request, nextPhase);
170+
super.doNextSearch(waitTimeInNanos, nextPhase);
171171
}
172172
}
173173

@@ -200,7 +200,7 @@ private CountDownLatch newLatch(int count) {
200200
}
201201

202202
@Override
203-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
203+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
204204
assert latch != null;
205205
try {
206206
latch.await();
@@ -209,7 +209,7 @@ protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse
209209
}
210210

211211
try {
212-
SearchResponse response = searchFunction.apply(request);
212+
SearchResponse response = searchFunction.apply(buildSearchRequest());
213213
nextPhase.onResponse(response);
214214
} catch (Exception e) {
215215
nextPhase.onFailure(e);
@@ -376,14 +376,14 @@ protected void onFinish(ActionListener<Void> listener) {
376376
}
377377

378378
@Override
379-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
379+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
380380
try {
381381
latch.await();
382382
} catch (InterruptedException e) {
383383
throw new IllegalStateException(e);
384384
}
385385
state.set(IndexerState.ABORTING); // <-- Set to aborting right before we return the (empty) search response
386-
super.doNextSearch(request, nextPhase);
386+
super.doNextSearch(waitTimeInNanos, nextPhase);
387387
}
388388

389389
@Override
@@ -424,7 +424,7 @@ protected void onAbort() {
424424
}
425425

426426
@Override
427-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
427+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
428428
try {
429429
doNextSearchLatch.await();
430430
} catch (InterruptedException e) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ void persistShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListene
126126
}
127127

128128
@Override
129-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
129+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
130130
if (context.getTaskState() == TransformTaskState.FAILED) {
131131
logger.debug("[{}] attempted to search while failed.", getJobId());
132132
nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId()));
@@ -137,7 +137,7 @@ protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse
137137
ClientHelper.TRANSFORM_ORIGIN,
138138
client,
139139
SearchAction.INSTANCE,
140-
request,
140+
buildSearchRequest(),
141141
nextPhase
142142
);
143143
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -700,8 +700,7 @@ protected QueryBuilder buildFilterQuery() {
700700
return queryBuilder;
701701
}
702702

703-
@Override
704-
protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
703+
protected SearchRequest buildSearchRequest() {
705704
assert nextCheckpoint != null;
706705

707706
SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex()).allowPartialSearchResults(false)

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ protected String getJobId() {
141141
}
142142

143143
@Override
144-
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
144+
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
145145
assert latch != null;
146146
try {
147147
latch.await();
@@ -150,7 +150,7 @@ protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse
150150
}
151151

152152
try {
153-
SearchResponse response = searchFunction.apply(request);
153+
SearchResponse response = searchFunction.apply(buildSearchRequest());
154154
nextPhase.onResponse(response);
155155
} catch (Exception e) {
156156
nextPhase.onFailure(e);

0 commit comments

Comments
 (0)