Skip to content

Commit cfc003d

Browse files
Hendrik Muhspolyfractal
Hendrik Muhs
authored andcommitted
[Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability (#32743)
This extracts a super class out of the rollup indexer called the AsyncTwoPhaseIterator. The implementor of it can define the query, transformation of the response, indexing and the object to persist the position/state of the indexer. The stats object used by the indexer to record progress is also now abstract, allowing the implementation provide custom stats beyond what the indexer provides. It also allows the implementation to decide how the stats are presented (leaves toXContent() up to the implementation). This should allow new projects to reuse the search-then-index persistent task that Rollup uses, but without the restrictions/baggage of how Rollup has to work internally to satisfy time-based rollups.
1 parent b52818e commit cfc003d

23 files changed

+900
-548
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.indexing;
8+
9+
import org.apache.log4j.Logger;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.bulk.BulkRequest;
12+
import org.elasticsearch.action.bulk.BulkResponse;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.search.SearchRequest;
15+
import org.elasticsearch.action.search.SearchResponse;
16+
17+
import java.util.Arrays;
18+
import java.util.List;
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
/**
23+
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
24+
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
25+
* Only one background job can run simultaneously and {@link #onFinish()} is called when the job
26+
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
27+
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
28+
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
29+
*
30+
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
31+
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
32+
*
33+
* @param <JobPosition> Type that defines a job position to be defined by the implementation.
34+
*/
35+
public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends IndexerJobStats> {
36+
private static final Logger logger = Logger.getLogger(AsyncTwoPhaseIndexer.class.getName());
37+
38+
private final JobStats stats;
39+
40+
private final AtomicReference<IndexerState> state;
41+
private final AtomicReference<JobPosition> position;
42+
private final Executor executor;
43+
44+
protected AsyncTwoPhaseIndexer(Executor executor, AtomicReference<IndexerState> initialState,
45+
JobPosition initialPosition, JobStats jobStats) {
46+
this.executor = executor;
47+
this.state = initialState;
48+
this.position = new AtomicReference<>(initialPosition);
49+
this.stats = jobStats;
50+
}
51+
52+
/**
53+
* Get the current state of the indexer.
54+
*/
55+
public IndexerState getState() {
56+
return state.get();
57+
}
58+
59+
/**
60+
* Get the current position of the indexer.
61+
*/
62+
public JobPosition getPosition() {
63+
return position.get();
64+
}
65+
66+
/**
67+
* Get the stats of this indexer.
68+
*/
69+
public JobStats getStats() {
70+
return stats;
71+
}
72+
73+
/**
74+
* Sets the internal state to {@link IndexerState#STARTED} if the previous state
75+
* was {@link IndexerState#STOPPED}. Setting the state to STARTED allows a job
76+
* to run in the background when {@link #maybeTriggerAsyncJob(long)} is called.
77+
*
78+
* @return The new state for the indexer (STARTED, INDEXING or ABORTING if the
79+
* job was already aborted).
80+
*/
81+
public synchronized IndexerState start() {
82+
state.compareAndSet(IndexerState.STOPPED, IndexerState.STARTED);
83+
return state.get();
84+
}
85+
86+
/**
87+
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
88+
* running in the background and in such case {@link #onFinish()} will be called
89+
* as soon as the background job detects that the indexer is stopped. If there
90+
* is no job running when this function is called, the state is directly set to
91+
* {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called.
92+
*
93+
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the
94+
* job was already aborted).
95+
*/
96+
public synchronized IndexerState stop() {
97+
IndexerState currentState = state.updateAndGet(previousState -> {
98+
if (previousState == IndexerState.INDEXING) {
99+
return IndexerState.STOPPING;
100+
} else if (previousState == IndexerState.STARTED) {
101+
return IndexerState.STOPPED;
102+
} else {
103+
return previousState;
104+
}
105+
});
106+
return currentState;
107+
}
108+
109+
/**
110+
* Sets the internal state to {@link IndexerState#ABORTING}. It returns false if
111+
* an async job is running in the background and in such case {@link #onAbort}
112+
* will be called as soon as the background job detects that the indexer is
113+
* aborted. If there is no job running when this function is called, it returns
114+
* true and {@link #onAbort()} will never be called.
115+
*
116+
* @return true if the indexer is aborted, false if a background job is running
117+
* and abort is delayed.
118+
*/
119+
public synchronized boolean abort() {
120+
IndexerState prevState = state.getAndUpdate((prev) -> IndexerState.ABORTING);
121+
return prevState == IndexerState.STOPPED || prevState == IndexerState.STARTED;
122+
}
123+
124+
/**
125+
* Triggers a background job that builds the index asynchronously iff
126+
* there is no other job that runs and the indexer is started
127+
* ({@link IndexerState#STARTED}.
128+
*
129+
* @param now
130+
* The current time in milliseconds (used to limit the job to
131+
* complete buckets)
132+
* @return true if a job has been triggered, false otherwise
133+
*/
134+
public synchronized boolean maybeTriggerAsyncJob(long now) {
135+
final IndexerState currentState = state.get();
136+
switch (currentState) {
137+
case INDEXING:
138+
case STOPPING:
139+
case ABORTING:
140+
logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running.");
141+
return false;
142+
143+
case STOPPED:
144+
logger.debug("Schedule was triggered for job [" + getJobId() + "] but job is stopped. Ignoring trigger.");
145+
return false;
146+
147+
case STARTED:
148+
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
149+
stats.incrementNumInvocations(1);
150+
onStartJob(now);
151+
152+
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
153+
// fire off the search. Note this is async, the method will return from here
154+
executor.execute(() -> doNextSearch(buildSearchRequest(),
155+
ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc))));
156+
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
157+
return true;
158+
} else {
159+
logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]");
160+
return false;
161+
}
162+
163+
default:
164+
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
165+
throw new IllegalStateException("Job encountered an illegal state [" + currentState + "]");
166+
}
167+
}
168+
169+
/**
170+
* Called to get the Id of the job, used for logging.
171+
*
172+
* @return a string with the id of the job
173+
*/
174+
protected abstract String getJobId();
175+
176+
/**
177+
* Called to process a response from the 1 search request in order to turn it into a {@link IterationResult}.
178+
*
179+
* @param searchResponse response from the search phase.
180+
* @return Iteration object to be passed to indexing phase.
181+
*/
182+
protected abstract IterationResult<JobPosition> doProcess(SearchResponse searchResponse);
183+
184+
/**
185+
* Called to build the next search request.
186+
*
187+
* @return SearchRequest to be passed to the search phase.
188+
*/
189+
protected abstract SearchRequest buildSearchRequest();
190+
191+
/**
192+
* Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
193+
* internal state is {@link IndexerState#STARTED}.
194+
*
195+
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
196+
*/
197+
protected abstract void onStartJob(long now);
198+
199+
/**
200+
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
201+
* response or the exception if an error occurs.
202+
*
203+
* @param request
204+
* The search request to execute
205+
* @param nextPhase
206+
* Listener for the next phase
207+
*/
208+
protected abstract void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase);
209+
210+
/**
211+
* Executes the {@link BulkRequest} and calls <code>nextPhase</code> with the
212+
* response or the exception if an error occurs.
213+
*
214+
* @param request
215+
* The bulk request to execute
216+
* @param nextPhase
217+
* Listener for the next phase
218+
*/
219+
protected abstract void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase);
220+
221+
/**
222+
* Called periodically during the execution of a background job. Implementation
223+
* should persists the state somewhere and continue the execution asynchronously
224+
* using <code>next</code>.
225+
*
226+
* @param state
227+
* The current state of the indexer
228+
* @param position
229+
* The current position of the indexer
230+
* @param next
231+
* Runnable for the next phase
232+
*/
233+
protected abstract void doSaveState(IndexerState state, JobPosition position, Runnable next);
234+
235+
/**
236+
* Called when a failure occurs in an async job causing the execution to stop.
237+
*
238+
* @param exc
239+
* The exception
240+
*/
241+
protected abstract void onFailure(Exception exc);
242+
243+
/**
244+
* Called when a background job finishes.
245+
*/
246+
protected abstract void onFinish();
247+
248+
/**
249+
* Called when a background job detects that the indexer is aborted causing the
250+
* async execution to stop.
251+
*/
252+
protected abstract void onAbort();
253+
254+
private void finishWithFailure(Exception exc) {
255+
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
256+
}
257+
258+
private IndexerState finishAndSetState() {
259+
return state.updateAndGet(prev -> {
260+
switch (prev) {
261+
case INDEXING:
262+
// ready for another job
263+
return IndexerState.STARTED;
264+
265+
case STOPPING:
266+
// must be started again
267+
return IndexerState.STOPPED;
268+
269+
case ABORTING:
270+
// abort and exit
271+
onAbort();
272+
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first
273+
274+
case STOPPED:
275+
// No-op. Shouldn't really be possible to get here (should have to go through
276+
// STOPPING
277+
// first which will be handled) but is harmless to no-op and we don't want to
278+
// throw exception here
279+
return IndexerState.STOPPED;
280+
281+
default:
282+
// any other state is unanticipated at this point
283+
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
284+
}
285+
});
286+
}
287+
288+
private void onSearchResponse(SearchResponse searchResponse) {
289+
try {
290+
if (checkState(getState()) == false) {
291+
return;
292+
}
293+
if (searchResponse.getShardFailures().length != 0) {
294+
throw new RuntimeException("Shard failures encountered while running indexer for job [" + getJobId() + "]: "
295+
+ Arrays.toString(searchResponse.getShardFailures()));
296+
}
297+
298+
stats.incrementNumPages(1);
299+
IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
300+
301+
if (iterationResult.isDone()) {
302+
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
303+
304+
// Change state first, then try to persist. This prevents in-progress
305+
// STOPPING/ABORTING from
306+
// being persisted as STARTED but then stop the job
307+
doSaveState(finishAndSetState(), position.get(), this::onFinish);
308+
return;
309+
}
310+
311+
final List<IndexRequest> docs = iterationResult.getToIndex();
312+
final BulkRequest bulkRequest = new BulkRequest();
313+
docs.forEach(bulkRequest::add);
314+
315+
// TODO this might be a valid case, e.g. if implementation filters
316+
assert bulkRequest.requests().size() > 0;
317+
318+
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
319+
// TODO we should check items in the response and move after accordingly to
320+
// resume the failing buckets ?
321+
if (bulkResponse.hasFailures()) {
322+
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
323+
}
324+
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
325+
if (checkState(getState()) == false) {
326+
return;
327+
}
328+
329+
JobPosition newPosition = iterationResult.getPosition();
330+
position.set(newPosition);
331+
332+
onBulkResponse(bulkResponse, newPosition);
333+
}, exc -> finishWithFailure(exc)));
334+
} catch (Exception e) {
335+
finishWithFailure(e);
336+
}
337+
}
338+
339+
private void onBulkResponse(BulkResponse response, JobPosition position) {
340+
try {
341+
342+
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure);
343+
// TODO probably something more intelligent than every-50 is needed
344+
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
345+
doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener));
346+
} else {
347+
doNextSearch(buildSearchRequest(), listener);
348+
}
349+
} catch (Exception e) {
350+
finishWithFailure(e);
351+
}
352+
}
353+
354+
/**
355+
* Checks the {@link IndexerState} and returns false if the execution should be
356+
* stopped.
357+
*/
358+
private boolean checkState(IndexerState currentState) {
359+
switch (currentState) {
360+
case INDEXING:
361+
// normal state;
362+
return true;
363+
364+
case STOPPING:
365+
logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer.");
366+
doSaveState(finishAndSetState(), getPosition(), () -> {
367+
});
368+
return false;
369+
370+
case STOPPED:
371+
return false;
372+
373+
case ABORTING:
374+
logger.info("Requested shutdown of indexer for job [" + getJobId() + "]");
375+
onAbort();
376+
return false;
377+
378+
default:
379+
// Anything other than indexing, aborting or stopping is unanticipated
380+
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
381+
throw new IllegalStateException("Indexer job encountered an illegal state [" + currentState + "]");
382+
}
383+
}
384+
385+
}

0 commit comments

Comments
 (0)