Skip to content

Commit 8d389f3

Browse files
committed
Separate LongAvgOperator into partial and final aggregator
1 parent cd5cf68 commit 8d389f3

File tree

4 files changed

+184
-74
lines changed

4 files changed

+184
-74
lines changed

x-pack/plugin/sql/src/benchmarks/java/org/elasticsearch/xpack/sql/action/OperatorBenchmark.java

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,31 @@
2525
import org.apache.lucene.search.ScoreMode;
2626
import org.apache.lucene.store.Directory;
2727
import org.apache.lucene.store.MMapDirectory;
28+
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.unit.ByteSizeValue;
2930
import org.elasticsearch.common.util.BigArrays;
3031
import org.elasticsearch.common.util.LongHash;
32+
import org.elasticsearch.node.Node;
33+
import org.elasticsearch.threadpool.ThreadPool;
3134
import org.elasticsearch.xpack.sql.action.compute.data.Block;
3235
import org.elasticsearch.xpack.sql.action.compute.data.LongBlock;
3336
import org.elasticsearch.xpack.sql.action.compute.data.Page;
3437
import org.elasticsearch.xpack.sql.action.compute.lucene.LuceneSourceOperator;
3538
import org.elasticsearch.xpack.sql.action.compute.lucene.NumericDocValuesExtractor;
3639
import org.elasticsearch.xpack.sql.action.compute.operator.Driver;
40+
import org.elasticsearch.xpack.sql.action.compute.operator.LongAvgOperator;
3741
import org.elasticsearch.xpack.sql.action.compute.operator.LongGroupingOperator;
3842
import org.elasticsearch.xpack.sql.action.compute.operator.LongMaxOperator;
3943
import org.elasticsearch.xpack.sql.action.compute.operator.LongTransformerOperator;
4044
import org.elasticsearch.xpack.sql.action.compute.operator.Operator;
4145
import org.elasticsearch.xpack.sql.action.compute.operator.PageConsumerOperator;
46+
import org.elasticsearch.xpack.sql.action.compute.operator.exchange.ExchangeSink;
47+
import org.elasticsearch.xpack.sql.action.compute.operator.exchange.ExchangeSinkOperator;
48+
import org.elasticsearch.xpack.sql.action.compute.operator.exchange.ExchangeSource;
49+
import org.elasticsearch.xpack.sql.action.compute.operator.exchange.ExchangeSourceOperator;
50+
import org.elasticsearch.xpack.sql.action.compute.operator.exchange.PassthroughExchanger;
51+
import org.elasticsearch.xpack.sql.action.compute.operator.exchange.RandomExchanger;
52+
import org.elasticsearch.xpack.sql.action.compute.operator.exchange.RandomUnionSourceOperator;
4253
import org.openjdk.jmh.annotations.Benchmark;
4354
import org.openjdk.jmh.annotations.BenchmarkMode;
4455
import org.openjdk.jmh.annotations.Fork;
@@ -60,6 +71,8 @@
6071
import java.util.Random;
6172
import java.util.concurrent.TimeUnit;
6273
import java.util.concurrent.atomic.AtomicInteger;
74+
import java.util.function.Consumer;
75+
import java.util.stream.Collectors;
6376

6477
@Fork(value = 1)
6578
@Warmup(iterations = 1)
@@ -75,6 +88,8 @@ public class OperatorBenchmark {
7588
@Param({ "100000000" }) // 100 million
7689
int numDocs;
7790

91+
ThreadPool threadPool;
92+
7893
@Setup
7994
public void setup() throws IOException {
8095
Path path = Files.createTempDirectory("test");
@@ -94,12 +109,14 @@ public void setup() throws IOException {
94109
indexWriter.flush();
95110
}
96111
indexReader = DirectoryReader.open(dir);
112+
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "OperatorBenchmark").build());
97113
}
98114

99115
@TearDown
100116
public void tearDown() throws IOException {
101117
indexReader.close();
102118
dir.close();
119+
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
103120
}
104121

105122
private static class SimpleXORValueCollector implements Collector {
@@ -284,7 +301,7 @@ private int runWithDriver(int pageSize, Operator... operators) {
284301
}
285302

286303
@Benchmark
287-
public long testVisitAllNumbersBatched4K() throws InterruptedException {
304+
public long testVisitAllNumbersBatched4K() {
288305
return runWithDriver(
289306
ByteSizeValue.ofKb(4).bytesAsInt(),
290307
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
@@ -293,7 +310,7 @@ public long testVisitAllNumbersBatched4K() throws InterruptedException {
293310
}
294311

295312
@Benchmark
296-
public long testVisitAllNumbersBatched16K() throws InterruptedException {
313+
public long testVisitAllNumbersBatched16K() {
297314
return runWithDriver(
298315
ByteSizeValue.ofKb(16).bytesAsInt(),
299316
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
@@ -302,17 +319,17 @@ public long testVisitAllNumbersBatched16K() throws InterruptedException {
302319
}
303320

304321
@Benchmark
305-
public long testVisitAllDocsBatched4K() throws InterruptedException {
322+
public long testVisitAllDocsBatched4K() {
306323
return runWithDriver(ByteSizeValue.ofKb(4).bytesAsInt());
307324
}
308325

309326
@Benchmark
310-
public long testVisitAllDocsBatched16K() throws InterruptedException {
327+
public long testVisitAllDocsBatched16K() {
311328
return runWithDriver(ByteSizeValue.ofKb(16).bytesAsInt());
312329
}
313330

314331
@Benchmark
315-
public long testOperatorsWithLucene() throws InterruptedException {
332+
public long testOperatorsWithLucene() {
316333
return runWithDriver(
317334
ByteSizeValue.ofKb(16).bytesAsInt(),
318335
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
@@ -321,4 +338,73 @@ public long testOperatorsWithLucene() throws InterruptedException {
321338
new LongTransformerOperator(0, i -> i + 1) // adds +1 to group number (which start with 0) to get group count
322339
);
323340
}
341+
342+
@Benchmark
343+
public long testSingleThreadedAvg() {
344+
return runWithDriver(
345+
ByteSizeValue.ofKb(16).bytesAsInt(),
346+
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
347+
new LongAvgOperator(2), // partial reduction
348+
new LongAvgOperator(0, 1) // final reduction
349+
);
350+
}
351+
352+
@Benchmark
353+
public long testMultiThreadedAvg() {
354+
AtomicInteger rowCount = new AtomicInteger();
355+
int parallelCount = 8;
356+
List<Driver> drivers = new ArrayList<>(parallelCount);
357+
List<ExchangeSource> forkExchangeSources = new ArrayList<>(parallelCount);
358+
List<ExchangeSource> joinExchangeSources = new ArrayList<>(parallelCount);
359+
for (int i = 0; i < parallelCount; i++) {
360+
ExchangeSource forkExchangeSource = new ExchangeSource();
361+
forkExchangeSources.add(forkExchangeSource);
362+
ExchangeSource joinExchangeSource = new ExchangeSource();
363+
joinExchangeSources.add(joinExchangeSource);
364+
List<Operator> operatorList = new ArrayList<>();
365+
operatorList.add(new ExchangeSourceOperator(forkExchangeSource));
366+
operatorList.addAll(
367+
List.of(
368+
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
369+
new LongAvgOperator(2), // PARTIAL
370+
new ExchangeSinkOperator(
371+
new ExchangeSink(new PassthroughExchanger(joinExchangeSource, Integer.MAX_VALUE), s -> joinExchangeSource.finish())
372+
)
373+
)
374+
);
375+
Driver driver = new Driver(operatorList, () -> {});
376+
drivers.add(driver);
377+
}
378+
379+
Driver luceneDriver = new Driver(
380+
List.of(
381+
new LuceneSourceOperator(indexReader, new MatchAllDocsQuery(), ByteSizeValue.ofKb(16).bytesAsInt()),
382+
new ExchangeSinkOperator(
383+
new ExchangeSink(
384+
new RandomExchanger(
385+
forkExchangeSources.stream()
386+
.map(exchangeSource -> (Consumer<Page>) page -> exchangeSource.addPage(page, () -> {}))
387+
.collect(Collectors.toList())
388+
),
389+
sink -> forkExchangeSources.stream().forEach(ExchangeSource::finish)
390+
)
391+
)
392+
),
393+
() -> {}
394+
);
395+
drivers.add(luceneDriver);
396+
397+
Driver reduceDriver = new Driver(
398+
List.of(
399+
new RandomUnionSourceOperator(joinExchangeSources),
400+
new LongAvgOperator(0, 1), // FINAL
401+
new PageConsumerOperator(page -> rowCount.addAndGet(page.getPositionCount()))
402+
),
403+
() -> {}
404+
);
405+
drivers.add(reduceDriver);
406+
407+
Driver.runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers).actionGet();
408+
return rowCount.get();
409+
}
324410
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/action/compute/operator/Driver.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@
77

88
package org.elasticsearch.xpack.sql.action.compute.operator;
99

10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ActionRunnable;
1012
import org.elasticsearch.action.support.ListenableActionFuture;
13+
import org.elasticsearch.common.util.concurrent.BaseFuture;
1114
import org.elasticsearch.core.Releasable;
1215
import org.elasticsearch.core.TimeValue;
1316
import org.elasticsearch.xpack.sql.action.compute.data.Page;
1417

1518
import java.util.ArrayList;
1619
import java.util.List;
20+
import java.util.concurrent.Executor;
1721
import java.util.stream.Collectors;
1822

1923
/**
@@ -137,6 +141,41 @@ private ListenableActionFuture<Void> runSingleLoopIteration() {
137141
return Operator.NOT_BLOCKED;
138142
}
139143

144+
public static ListenableActionFuture<Void> runToCompletion(Executor executor, List<Driver> drivers) {
145+
TimeValue maxTime = TimeValue.timeValueMillis(200);
146+
int maxIterations = 10000;
147+
List<ListenableActionFuture<Void>> futures = new ArrayList<>();
148+
for (Driver driver : drivers) {
149+
futures.add(schedule(maxTime, maxIterations, executor, driver));
150+
}
151+
return Driver.allOf(futures);
152+
}
153+
154+
private static ListenableActionFuture<Void> schedule(TimeValue maxTime, int maxIterations, Executor executor, Driver driver) {
155+
ListenableActionFuture<Void> future = new ListenableActionFuture<>();
156+
executor.execute(new ActionRunnable<>(future) {
157+
@Override
158+
protected void doRun() {
159+
if (driver.isFinished()) {
160+
future.onResponse(null);
161+
return;
162+
}
163+
ListenableActionFuture<Void> fut = driver.run(maxTime, maxIterations);
164+
if (fut.isDone()) {
165+
schedule(maxTime, maxIterations, executor, driver).addListener(future);
166+
} else {
167+
fut.addListener(
168+
ActionListener.wrap(
169+
ignored -> schedule(maxTime, maxIterations, executor, driver).addListener(future),
170+
e -> future.onFailure(e)
171+
)
172+
);
173+
}
174+
}
175+
});
176+
return future;
177+
}
178+
140179
private static ListenableActionFuture<Void> oneOf(List<ListenableActionFuture<Void>> futures) {
141180
if (futures.isEmpty()) {
142181
return Operator.NOT_BLOCKED;
@@ -150,4 +189,22 @@ private static ListenableActionFuture<Void> oneOf(List<ListenableActionFuture<Vo
150189
}
151190
return oneOf;
152191
}
192+
193+
private static ListenableActionFuture<Void> allOf(List<ListenableActionFuture<Void>> futures) {
194+
if (futures.isEmpty()) {
195+
return Operator.NOT_BLOCKED;
196+
}
197+
if (futures.size() == 1) {
198+
return futures.get(0);
199+
}
200+
ListenableActionFuture<Void> allOf = new ListenableActionFuture<>();
201+
for (ListenableActionFuture<Void> fut : futures) {
202+
fut.addListener(ActionListener.wrap(ignored -> {
203+
if (futures.stream().allMatch(BaseFuture::isDone)) {
204+
allOf.onResponse(null);
205+
}
206+
}, e -> allOf.onFailure(e)));
207+
}
208+
return allOf;
209+
}
153210
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/action/compute/operator/LongAvgOperator.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,22 @@ public class LongAvgOperator implements Operator {
1616
boolean returnedResult;
1717
long count;
1818
long sum;
19-
private final int channel;
19+
private final int rawChannel;
20+
private final int sumChannel;
21+
private final int countChannel;
2022

21-
public LongAvgOperator(int channel) {
22-
this.channel = channel;
23+
// PARTIAL
24+
public LongAvgOperator(int rawChannel) {
25+
this.rawChannel = rawChannel;
26+
this.sumChannel = -1;
27+
this.countChannel = -1;
28+
}
2329

30+
// FINAL
31+
public LongAvgOperator(int sumChannel, int countChannel) {
32+
this.rawChannel = -1;
33+
this.sumChannel = sumChannel;
34+
this.countChannel = countChannel;
2435
}
2536

2637
@Override
@@ -30,7 +41,11 @@ public void close() { /* no-op */ }
3041
public Page getOutput() {
3142
if (finished && returnedResult == false) {
3243
returnedResult = true;
33-
return new Page(new LongBlock(new long[] { sum / count }, 1));
44+
if (rawChannel != -1) {
45+
return new Page(new LongBlock(new long[] { sum }, 1), new LongBlock(new long[] { count }, 1));
46+
} else {
47+
return new Page(new LongBlock(new long[] { sum / count }, 1));
48+
}
3449
}
3550
return null;
3651
}
@@ -52,10 +67,19 @@ public boolean needsInput() {
5267

5368
@Override
5469
public void addInput(Page page) {
55-
Block block = page.getBlock(channel);
56-
for (int i = 0; i < block.getPositionCount(); i++) {
57-
sum += block.getLong(i);
70+
if (rawChannel != -1) {
71+
Block block = page.getBlock(rawChannel);
72+
for (int i = 0; i < block.getPositionCount(); i++) {
73+
sum += block.getLong(i);
74+
}
75+
count += block.getPositionCount();
76+
} else {
77+
Block sumBlock = page.getBlock(sumChannel);
78+
Block countBlock = page.getBlock(countChannel);
79+
for (int i = 0; i < page.getPositionCount(); i++) {
80+
sum += sumBlock.getLong(i);
81+
count += countBlock.getLong(i);
82+
}
5883
}
59-
count += block.getPositionCount();
6084
}
6185
}

0 commit comments

Comments
 (0)