Skip to content

Commit fd8d5a6

Browse files
committed
showcase
1 parent 6e6e832 commit fd8d5a6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+127
-509
lines changed

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/EvaluatorImplementer.java

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.ArrayList;
2222
import java.util.Arrays;
2323
import java.util.List;
24-
import java.util.function.BiConsumer;
2524
import java.util.function.Function;
2625
import java.util.stream.Collectors;
2726

@@ -41,6 +40,7 @@
4140
import static org.elasticsearch.compute.gen.Types.BOOLEAN_BLOCK;
4241
import static org.elasticsearch.compute.gen.Types.BYTES_REF;
4342
import static org.elasticsearch.compute.gen.Types.BYTES_REF_BLOCK;
43+
import static org.elasticsearch.compute.gen.Types.BYTES_REF_VECTOR;
4444
import static org.elasticsearch.compute.gen.Types.DOUBLE_BLOCK;
4545
import static org.elasticsearch.compute.gen.Types.DRIVER_CONTEXT;
4646
import static org.elasticsearch.compute.gen.Types.EXPRESSION_EVALUATOR;
@@ -257,13 +257,7 @@ private MethodSpec realEval(boolean blockStyle) {
257257
builder.beginControlFlow("try");
258258
}
259259
if (executionCost > 0) {
260-
builder.addStatement("accumulatedCost += " + executionCost);
261-
builder.beginControlFlow("if (accumulatedCost >= DriverContext.CHECK_FOR_EARLY_TERMINATION_COST_THRESHOLD)");
262-
{
263-
builder.addStatement("accumulatedCost = 0");
264-
builder.addStatement("driverContext.checkForEarlyTermination()");
265-
}
266-
builder.endControlFlow();
260+
addEarlyTerminationCheck(builder, executionCost);
267261
}
268262
builder.addStatement(builtPattern, args.toArray());
269263

@@ -287,49 +281,49 @@ private MethodSpec realEval(boolean blockStyle) {
287281
}
288282

289283
private void realEvalWithVectorizedStyle(MethodSpec.Builder builder, ClassName resultDataType) {
290-
boolean checkForEarlyTerminationPerRow = processFunction.args.stream()
291-
.anyMatch(a -> a.dataType(false).getClass().getSimpleName().startsWith("BytesRef"));
292-
BiConsumer<String, Object[]> innerLoop = (label, bounds) -> {
293-
builder.beginControlFlow(label + "for (int p = " + bounds[0] + "; p < " + bounds[1] + "; p++)");
294-
{
295-
processFunction.args.forEach(a -> a.unpackValues(builder, false));
296-
StringBuilder pattern = new StringBuilder();
297-
List<Object> args = new ArrayList<>();
298-
pattern.append("$T.$N(");
299-
args.add(declarationType);
300-
args.add(processFunction.function.getSimpleName());
301-
processFunction.args.forEach(a -> {
302-
if (args.size() > 2) {
303-
pattern.append(", ");
304-
}
305-
a.buildInvocation(pattern, args, false);
306-
});
307-
pattern.append(")");
308-
String builtPattern;
309-
if (processFunction.builderArg == null) {
310-
builtPattern = "result.$L(p, " + pattern + ")";
311-
args.add(0, appendMethod(resultDataType));
312-
} else {
313-
builtPattern = pattern.toString();
284+
boolean checkEarlyTerminationPerRow = executionCost > 0
285+
&& processFunction.args.stream().anyMatch(a -> a.dataType(false).equals(BYTES_REF_VECTOR));
286+
if (checkEarlyTerminationPerRow) {
287+
builder.addStatement("int accumulatedCost = 0");
288+
}
289+
builder.beginControlFlow("position: for (int p = 0; p < positionCount; p++)");
290+
{
291+
processFunction.args.forEach(a -> a.unpackValues(builder, false));
292+
StringBuilder pattern = new StringBuilder();
293+
List<Object> args = new ArrayList<>();
294+
pattern.append("$T.$N(");
295+
args.add(declarationType);
296+
args.add(processFunction.function.getSimpleName());
297+
processFunction.args.forEach(a -> {
298+
if (args.size() > 2) {
299+
pattern.append(", ");
314300
}
315-
builder.addStatement(builtPattern, args.toArray());
301+
a.buildInvocation(pattern, args, false);
302+
});
303+
pattern.append(")");
304+
String builtPattern;
305+
if (processFunction.builderArg == null) {
306+
builtPattern = "result.$L(p, " + pattern + ")";
307+
args.add(0, appendMethod(resultDataType));
308+
} else {
309+
builtPattern = pattern.toString();
316310
}
317-
builder.endControlFlow();
318-
};
319-
if (executionCost == 0) {
320-
innerLoop.accept("position: ", new Object[] { 0, "positionCount" });
321-
} else {
322-
// generate tight loops to allow vectorization
323-
builder.addStatement("final int batchSize = DriverContext.batchSizeForEarlyTermination($L)", executionCost);
324-
builder.beginControlFlow("for (int start = 0; start < positionCount; )");
325-
{
326-
builder.addStatement("int end = start + Math.min(positionCount - start, batchSize)");
327-
builder.addStatement("driverContext.checkForEarlyTermination()");
328-
innerLoop.accept("", new Object[] { "start", "end" });
329-
builder.addStatement("start = end");
311+
if (checkEarlyTerminationPerRow) {
312+
addEarlyTerminationCheck(builder, executionCost);
330313
}
331-
builder.endControlFlow();
314+
builder.addStatement(builtPattern, args.toArray());
332315
}
316+
builder.endControlFlow();
317+
}
318+
319+
static void addEarlyTerminationCheck(MethodSpec.Builder builder, int executionCost) {
320+
builder.addStatement("accumulatedCost += $L", executionCost);
321+
builder.beginControlFlow("if (accumulatedCost >= DriverContext.CHECK_FOR_EARLY_TERMINATION_COST_THRESHOLD)");
322+
{
323+
builder.addStatement("accumulatedCost = 0");
324+
builder.addStatement("driverContext.checkForEarlyTermination()");
325+
}
326+
builder.endControlFlow();
333327
}
334328

335329
private static void skipNull(MethodSpec.Builder builder, String value) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,7 @@ public Driver(
139139
DriverSleeps.empty()
140140
)
141141
);
142-
driverContext.initializeEarlyTerminationChecker(() -> {
143-
ensureNotCancelled();
144-
checkForEarlyTermination();
145-
});
142+
driverContext.initializeEarlyTerminationChecker(this::checkForEarlyTerminationOrCancellation);
146143
}
147144

148145
/**
@@ -252,7 +249,6 @@ public void abort(Exception reason, ActionListener<Void> listener) {
252249
}
253250

254251
private IsBlockedResult runSingleLoopIteration() {
255-
ensureNotCancelled();
256252
boolean movedPage = false;
257253

258254
for (int i = 0; i < activeOperators.size() - 1; i++) {
@@ -265,6 +261,7 @@ private IsBlockedResult runSingleLoopIteration() {
265261
}
266262

267263
if (op.isFinished() == false && nextOp.needsInput()) {
264+
checkForEarlyTerminationOrCancellation();
268265
Page page = op.getOutput();
269266
if (page == null) {
270267
// No result, just move to the next iteration
@@ -273,12 +270,19 @@ private IsBlockedResult runSingleLoopIteration() {
273270
page.releaseBlocks();
274271
} else {
275272
// Non-empty result from the previous operation, move it to the next operation
273+
try {
274+
checkForEarlyTerminationOrCancellation();
275+
} catch (DriverEarlyTerminationException | TaskCancelledException e) {
276+
page.releaseBlocks();
277+
throw e;
278+
}
276279
nextOp.addInput(page);
277280
movedPage = true;
278281
}
279282
}
280283

281284
if (op.isFinished()) {
285+
checkForEarlyTerminationOrCancellation();
282286
nextOp.finish();
283287
}
284288
}
@@ -350,7 +354,8 @@ private static class DriverEarlyTerminationException extends RuntimeException {
350354

351355
}
352356

353-
private void checkForEarlyTermination() throws DriverEarlyTerminationException {
357+
private void checkForEarlyTerminationOrCancellation() throws DriverEarlyTerminationException, TaskCancelledException {
358+
ensureNotCancelled();
354359
// If the last operation is finished, then we can discard all operations in the driver
355360
if (activeOperators.size() >= 2 && activeOperators.getLast().isFinished()) {
356361
for (int i = 0; i < activeOperators.size() - 1; i++) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverContext.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,17 +194,6 @@ public void initializeEarlyTerminationChecker(Runnable checker) {
194194
this.earlyTerminationChecker = checker;
195195
}
196196

197-
public static int batchSizeForEarlyTermination(int executionCost) {
198-
if (executionCost <= 0) {
199-
throw new IllegalStateException("executionCost must be positive");
200-
}
201-
final int size = (DriverContext.CHECK_FOR_EARLY_TERMINATION_COST_THRESHOLD + executionCost - 1) / executionCost;
202-
if (size <= 32) {
203-
return 32;
204-
}
205-
return Integer.highestOneBit(size - 1) << 1;
206-
}
207-
208197
/**
209198
* Evaluators should use this function to decide their warning behavior.
210199
* @return an appropriate {@link WarningsMode}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverContextTests.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424
import org.junit.Before;
2525

2626
import java.util.Collections;
27-
import java.util.HashMap;
2827
import java.util.HashSet;
2928
import java.util.IdentityHashMap;
3029
import java.util.List;
31-
import java.util.Map;
3230
import java.util.Set;
3331
import java.util.concurrent.Callable;
3432
import java.util.concurrent.ExecutorService;
@@ -158,23 +156,6 @@ public void testWaitForAsyncActions() {
158156
Releasables.closeExpectNoException(driverContext.getSnapshot());
159157
}
160158

161-
public void testBatchSizeForEarlyTermination() {
162-
Map<Integer, Integer> tests = new HashMap<>();
163-
tests.put(1, 2048);
164-
tests.put(between(2, 3), 1024);
165-
tests.put(between(4, 7), 512);
166-
tests.put(between(8, 15), 256);
167-
tests.put(between(16, 31), 128);
168-
tests.put(between(32, 63), 64);
169-
tests.put(between(64, 127), 32);
170-
tests.put(atLeast(128), 32);
171-
for (Map.Entry<Integer, Integer> e : tests.entrySet()) {
172-
Integer cost = e.getKey();
173-
Integer batchSize = e.getValue();
174-
assertThat("cost=" + cost, DriverContext.batchSizeForEarlyTermination(cost), equalTo(batchSize));
175-
}
176-
}
177-
178159
static TestDriver newTestDriver(int unused) {
179160
var driverContext = new AssertingDriverContext();
180161
return new TestDriver(driverContext, randomInt(128), driverContext.bigArrays());

x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/AutomataMatchEvaluator.java

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCaseEvaluator.java

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ConcatEvaluator.java

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/EndsWithEvaluator.java

Lines changed: 2 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)