Skip to content

Commit dc5dfc5

Browse files
author
Hendrik Muhs
committed
[Rollup] improve handling of failures on first search (#35269)
Improve error handling in the Indexer if an exception occurs during the very 1st retrieval (query execution)
1 parent 5c8bd2f commit dc5dfc5

File tree

2 files changed

+98
-3
lines changed

2 files changed

+98
-3
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
151151

152152
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
153153
// fire off the search. Note this is async, the method will return from here
154-
executor.execute(() -> doNextSearch(buildSearchRequest(),
155-
ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc))));
154+
executor.execute(() -> {
155+
try {
156+
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)));
157+
} catch (Exception e) {
158+
finishWithFailure(e);
159+
}
160+
});
156161
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
157162
return true;
158163
} else {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,78 @@ public int getStep() {
110110

111111
}
112112

113+
private class MockIndexerThrowsFirstSearch extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
114+
115+
// test the execution order
116+
private int step;
117+
118+
protected MockIndexerThrowsFirstSearch(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) {
119+
super(executor, initialState, initialPosition, new MockJobStats());
120+
}
121+
122+
@Override
123+
protected String getJobId() {
124+
return "mock";
125+
}
126+
127+
@Override
128+
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
129+
fail("should not be called");
130+
return null;
131+
}
132+
133+
@Override
134+
protected SearchRequest buildSearchRequest() {
135+
assertThat(step, equalTo(1));
136+
++step;
137+
return null;
138+
}
139+
140+
@Override
141+
protected void onStartJob(long now) {
142+
assertThat(step, equalTo(0));
143+
++step;
144+
}
145+
146+
@Override
147+
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
148+
throw new RuntimeException("Failed to build search request");
149+
}
150+
151+
@Override
152+
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
153+
fail("should not be called");
154+
}
155+
156+
@Override
157+
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
158+
assertThat(step, equalTo(2));
159+
++step;
160+
next.run();
161+
}
162+
163+
@Override
164+
protected void onFailure(Exception exc) {
165+
assertThat(step, equalTo(3));
166+
++step;
167+
isFinished.set(true);
168+
}
169+
170+
@Override
171+
protected void onFinish() {
172+
fail("should not be called");
173+
}
174+
175+
@Override
176+
protected void onAbort() {
177+
fail("should not be called");
178+
}
179+
180+
public int getStep() {
181+
return step;
182+
}
183+
}
184+
113185
private static class MockJobStats extends IndexerJobStats {
114186

115187
@Override
@@ -121,7 +193,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
121193
public void testStateMachine() throws InterruptedException {
122194
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
123195
final ExecutorService executor = Executors.newFixedThreadPool(1);
124-
196+
isFinished.set(false);
125197
try {
126198

127199
MockIndexer indexer = new MockIndexer(executor, state, 2);
@@ -140,4 +212,22 @@ public void testStateMachine() throws InterruptedException {
140212
executor.shutdownNow();
141213
}
142214
}
215+
216+
public void testStateMachineBrokenSearch() throws InterruptedException {
217+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
218+
final ExecutorService executor = Executors.newFixedThreadPool(1);
219+
isFinished.set(false);
220+
try {
221+
222+
MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
223+
indexer.start();
224+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
225+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
226+
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get()));
227+
assertThat(indexer.getStep(), equalTo(4));
228+
229+
} finally {
230+
executor.shutdownNow();
231+
}
232+
}
143233
}

0 commit comments

Comments
 (0)