Skip to content

Commit 20ab248

Browse files
authored
Check for early termination in Driver (#118188) (#120238)
This change introduces support for periodically checking for early termination. This enables early exits in the following scenarios: 1. The query has accumulated sufficient data (e.g., reaching the LIMIT). 2. The query is stopped (either by users or due to failures). Other changes will be addressed in follow-up PRs.
1 parent 404a67e commit 20ab248

File tree

7 files changed

+195
-152
lines changed

7 files changed

+195
-152
lines changed

docs/changelog/118188.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118188
2+
summary: Check for early termination in Driver
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 76 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,13 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
186186
long nextStatus = startTime + statusNanos;
187187
int iter = 0;
188188
while (true) {
189-
IsBlockedResult isBlocked = runSingleLoopIteration();
189+
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
190+
try {
191+
isBlocked = runSingleLoopIteration();
192+
} catch (DriverEarlyTerminationException unused) {
193+
closeEarlyFinishedOperators();
194+
assert isFinished() : "not finished after early termination";
195+
}
190196
iter++;
191197
if (isBlocked.listener().isDone() == false) {
192198
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
@@ -242,39 +248,59 @@ public void abort(Exception reason, ActionListener<Void> listener) {
242248
}
243249

244250
private IsBlockedResult runSingleLoopIteration() {
245-
ensureNotCancelled();
251+
driverContext.checkForEarlyTermination();
246252
boolean movedPage = false;
247253

248-
if (activeOperators.isEmpty() == false && activeOperators.get(activeOperators.size() - 1).isFinished() == false) {
249-
for (int i = 0; i < activeOperators.size() - 1; i++) {
250-
Operator op = activeOperators.get(i);
251-
Operator nextOp = activeOperators.get(i + 1);
254+
for (int i = 0; i < activeOperators.size() - 1; i++) {
255+
Operator op = activeOperators.get(i);
256+
Operator nextOp = activeOperators.get(i + 1);
252257

253-
// skip blocked operator
254-
if (op.isBlocked().listener().isDone() == false) {
255-
continue;
256-
}
258+
// skip blocked operator
259+
if (op.isBlocked().listener().isDone() == false) {
260+
continue;
261+
}
257262

258-
if (op.isFinished() == false && nextOp.needsInput()) {
259-
Page page = op.getOutput();
260-
if (page == null) {
261-
// No result, just move to the next iteration
262-
} else if (page.getPositionCount() == 0) {
263-
// Empty result, release any memory it holds immediately and move to the next iteration
263+
if (op.isFinished() == false && nextOp.needsInput()) {
264+
driverContext.checkForEarlyTermination();
265+
Page page = op.getOutput();
266+
if (page == null) {
267+
// No result, just move to the next iteration
268+
} else if (page.getPositionCount() == 0) {
269+
// Empty result, release any memory it holds immediately and move to the next iteration
270+
page.releaseBlocks();
271+
} else {
272+
// Non-empty result from the previous operation, move it to the next operation
273+
try {
274+
driverContext.checkForEarlyTermination();
275+
} catch (DriverEarlyTerminationException | TaskCancelledException e) {
264276
page.releaseBlocks();
265-
} else {
266-
// Non-empty result from the previous operation, move it to the next operation
267-
nextOp.addInput(page);
268-
movedPage = true;
277+
throw e;
269278
}
279+
nextOp.addInput(page);
280+
movedPage = true;
270281
}
282+
}
271283

272-
if (op.isFinished()) {
273-
nextOp.finish();
274-
}
284+
if (op.isFinished()) {
285+
driverContext.checkForEarlyTermination();
286+
nextOp.finish();
275287
}
276288
}
277289

290+
closeEarlyFinishedOperators();
291+
292+
if (movedPage == false) {
293+
return oneOf(
294+
activeOperators.stream()
295+
.map(Operator::isBlocked)
296+
.filter(laf -> laf.listener().isDone() == false)
297+
.collect(Collectors.toList())
298+
);
299+
}
300+
return Operator.NOT_BLOCKED;
301+
}
302+
303+
private void closeEarlyFinishedOperators() {
278304
for (int index = activeOperators.size() - 1; index >= 0; index--) {
279305
if (activeOperators.get(index).isFinished()) {
280306
/*
@@ -300,16 +326,6 @@ private IsBlockedResult runSingleLoopIteration() {
300326
break;
301327
}
302328
}
303-
304-
if (movedPage == false) {
305-
return oneOf(
306-
activeOperators.stream()
307-
.map(Operator::isBlocked)
308-
.filter(laf -> laf.listener().isDone() == false)
309-
.collect(Collectors.toList())
310-
);
311-
}
312-
return Operator.NOT_BLOCKED;
313329
}
314330

315331
public void cancel(String reason) {
@@ -318,13 +334,6 @@ public void cancel(String reason) {
318334
}
319335
}
320336

321-
private void ensureNotCancelled() {
322-
String reason = cancelReason.get();
323-
if (reason != null) {
324-
throw new TaskCancelledException(reason);
325-
}
326-
}
327-
328337
public static void start(
329338
ThreadContext threadContext,
330339
Executor executor,
@@ -335,20 +344,37 @@ public static void start(
335344
driver.completionListener.addListener(listener);
336345
if (driver.started.compareAndSet(false, true)) {
337346
driver.updateStatus(0, 0, DriverStatus.Status.STARTING, "driver starting");
338-
// Register a listener to an exchange sink to handle early completion scenarios:
339-
// 1. When the query accumulates sufficient data (e.g., reaching the LIMIT).
340-
// 2. When users abort the query but want to retain the current result.
341-
// This allows the Driver to finish early without waiting for the scheduled task.
342-
final List<Operator> operators = driver.activeOperators;
343-
if (operators.isEmpty() == false) {
344-
if (operators.get(operators.size() - 1) instanceof ExchangeSinkOperator sinkOperator) {
345-
sinkOperator.addCompletionListener(ActionListener.running(driver.scheduler::runPendingTasks));
346-
}
347-
}
347+
initializeEarlyTerminationChecker(driver);
348348
schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, driver.completionListener);
349349
}
350350
}
351351

352+
private static void initializeEarlyTerminationChecker(Driver driver) {
353+
// Register a listener to an exchange sink to handle early completion scenarios:
354+
// 1. When the query accumulates sufficient data (e.g., reaching the LIMIT).
355+
// 2. When users abort the query but want to retain the current result.
356+
// This allows the Driver to finish early without waiting for the scheduled task.
357+
final AtomicBoolean earlyFinished = new AtomicBoolean();
358+
driver.driverContext.initializeEarlyTerminationChecker(() -> {
359+
final String reason = driver.cancelReason.get();
360+
if (reason != null) {
361+
throw new TaskCancelledException(reason);
362+
}
363+
if (earlyFinished.get()) {
364+
throw new DriverEarlyTerminationException("Exchange sink is closed");
365+
}
366+
});
367+
final List<Operator> operators = driver.activeOperators;
368+
if (operators.isEmpty() == false) {
369+
if (operators.get(operators.size() - 1) instanceof ExchangeSinkOperator sinkOperator) {
370+
sinkOperator.addCompletionListener(ActionListener.running(() -> {
371+
earlyFinished.set(true);
372+
driver.scheduler.runPendingTasks();
373+
}));
374+
}
375+
}
376+
}
377+
352378
// Drains all active operators and closes them.
353379
private void drainAndCloseOperators(@Nullable Exception e) {
354380
Iterator<Operator> itr = activeOperators.iterator();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class DriverContext {
6060

6161
private final WarningsMode warningsMode;
6262

63+
private Runnable earlyTerminationChecker = () -> {};
64+
6365
public DriverContext(BigArrays bigArrays, BlockFactory blockFactory) {
6466
this(bigArrays, blockFactory, WarningsMode.COLLECT);
6567
}
@@ -175,6 +177,21 @@ public void removeAsyncAction() {
175177
asyncActions.removeInstance();
176178
}
177179

180+
/**
181+
* Checks if the Driver associated with this DriverContext has been cancelled or early terminated.
182+
*/
183+
public void checkForEarlyTermination() {
184+
earlyTerminationChecker.run();
185+
}
186+
187+
/**
188+
* Initializes the early termination or cancellation checker for this DriverContext.
189+
* This method should be called when associating this DriverContext with a driver.
190+
*/
191+
public void initializeEarlyTerminationChecker(Runnable checker) {
192+
this.earlyTerminationChecker = checker;
193+
}
194+
178195
/**
179196
* Evaluators should use this function to decide their warning behavior.
180197
* @return an appropriate {@link WarningsMode}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
12+
/**
13+
* An exception indicates that a compute should be terminated early as the downstream pipeline has enough or no long requires more data.
14+
*/
15+
public final class DriverEarlyTerminationException extends ElasticsearchException {
16+
public DriverEarlyTerminationException(String message) {
17+
super(message);
18+
}
19+
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.util.concurrent.ThreadContext;
2020
import org.elasticsearch.common.util.set.Sets;
2121
import org.elasticsearch.compute.data.BasicBlockTests;
22+
import org.elasticsearch.compute.data.Block;
2223
import org.elasticsearch.compute.data.BlockFactory;
2324
import org.elasticsearch.compute.data.ElementType;
2425
import org.elasticsearch.compute.data.Page;
@@ -40,6 +41,7 @@
4041
import java.util.concurrent.CountDownLatch;
4142
import java.util.concurrent.CyclicBarrier;
4243
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.atomic.AtomicInteger;
4345
import java.util.function.Function;
4446
import java.util.function.LongSupplier;
4547

@@ -280,6 +282,49 @@ public Page getOutput() {
280282
}
281283
}
282284

285+
public void testEarlyTermination() {
286+
DriverContext driverContext = driverContext();
287+
ThreadPool threadPool = threadPool();
288+
try {
289+
int positions = between(1000, 5000);
290+
List<Page> inPages = randomList(1, 100, () -> {
291+
var block = driverContext.blockFactory().newConstantIntBlockWith(randomInt(), positions);
292+
return new Page(block);
293+
});
294+
final var sourceOperator = new CannedSourceOperator(inPages.iterator());
295+
final int maxAllowedRows = between(1, 100);
296+
final AtomicInteger processedRows = new AtomicInteger(0);
297+
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
298+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
299+
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
300+
@Override
301+
public Block eval(Page page) {
302+
for (int i = 0; i < page.getPositionCount(); i++) {
303+
driverContext.checkForEarlyTermination();
304+
if (processedRows.incrementAndGet() >= maxAllowedRows) {
305+
sinkHandler.fetchPageAsync(true, ActionListener.noop());
306+
}
307+
}
308+
return driverContext.blockFactory().newConstantBooleanBlockWith(true, page.getPositionCount());
309+
}
310+
311+
@Override
312+
public void close() {
313+
314+
}
315+
});
316+
Driver driver = new Driver(driverContext, sourceOperator, List.of(delayOperator), sinkOperator, () -> {});
317+
ThreadContext threadContext = threadPool.getThreadContext();
318+
PlainActionFuture<Void> future = new PlainActionFuture<>();
319+
320+
Driver.start(threadContext, threadPool.executor("esql"), driver, between(1, 1000), future);
321+
future.actionGet(30, TimeUnit.SECONDS);
322+
assertThat(processedRows.get(), equalTo(maxAllowedRows));
323+
} finally {
324+
terminate(threadPool);
325+
}
326+
}
327+
283328
public void testResumeOnEarlyFinish() throws Exception {
284329
DriverContext driverContext = driverContext();
285330
ThreadPool threadPool = threadPool();

x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/util/DelayEvaluator.java

Lines changed: 0 additions & 91 deletions
This file was deleted.

0 commit comments

Comments
 (0)