Skip to content

Commit ff39225

Browse files
authored
Add parallel Lucene source operator (#189)
This PR allows the Lucene source operator (that runs the search) to be parallelized, either by slicing on the document id space, or by slicing on the segment space. It also comes with a bunch of benchmarks to show the effects of running in various configurations. The experiment I looked at was just running the calculation of an average on a long field. To allow for parallelization, the avg operator comes in two flavors, allowing a map/reduce pattern: The first one (map) takes raw input (the numbers) and emits a sum + a count at the end, and the second one (reduce) takes sum/count pairs, sums them up and emits the avg at the end. Various configurations are tested: - testLongAvgSingleThreadedAvg: Running everything single-threaded with a single driver (for baseline performance) - testLongAvgMultiThreadedAvgWithSingleThreadedSearch: Running the search part single-threaded, but then parallelize the numeric doc value extraction and avg computation - testLongAvgMultiThreadedAvgWithMultiThreadedSegmentSearch: Running the search part as well as avg computation in parallel, using segment-level parallelism - testLongAvgMultiThreadedAvgWithMultiThreadedSearch: Running the search part as well as avg computation in parallel, using document-id-space-level parallelism (see also https://issues.apache.org/jira/browse/LUCENE-8675) To understand the effect of number of segments, we're running the benchmark in two configurations (data force-merged to 1 segment, and data force-merged to 10 segments). Here are the results (from my MacBook Pro with 8 cores, albeit imprecise due to the warm temperatures in my office today with the extreme heat): ``` Benchmark (maxNumSegments) (numDocs) Mode Cnt Score Error Units OperatorBenchmark.testLongAvgSingleThreadedAvg 1 100000000 avgt 3 664.127 ± 63.200 ms/op OperatorBenchmark.testLongAvgSingleThreadedAvg 10 100000000 avgt 3 654.669 ± 88.197 ms/op OperatorBenchmark.testLongAvgMultiThreadedAvgWithSingleThreadedSearch 1 100000000 avgt 3 153.785 ± 69.273 ms/op OperatorBenchmark.testLongAvgMultiThreadedAvgWithSingleThreadedSearch 10 100000000 avgt 3 161.570 ± 172.318 ms/op OperatorBenchmark.testLongAvgMultiThreadedAvgWithMultiThreadedSegmentSearch 1 100000000 avgt 3 687.172 ± 41.166 ms/op OperatorBenchmark.testLongAvgMultiThreadedAvgWithMultiThreadedSegmentSearch 10 100000000 avgt 3 168.887 ± 81.306 ms/op OperatorBenchmark.testLongAvgMultiThreadedAvgWithMultiThreadedSearch 1 100000000 avgt 3 111.377 ± 60.332 ms/op OperatorBenchmark.testLongAvgMultiThreadedAvgWithMultiThreadedSearch 10 100000000 avgt 3 111.535 ± 87.793 ms/op ``` Some explanations for the results observed: - Even when keeping the search part single-threaded, it's useful to parallelize the aggregations running on-top. - The aggregations are very light-weight in this benchmark, so even if you have enough cores, the single-threaded search might still be the bottle-neck (as it's a match-all query, the bottle-neck in this case is creation of the arrays to store the doc ids). - Fully parallelizing things (i.e. the search part as well) can make things even faster. For segment-level parallelism, this obviously only works when you have multiple segments. In case you have only have a single segment, you can still parallelize only the aggregation bits, or you can do partitioning by id-space (will interfere with optimizations that leverage segment-level information)
1 parent 9b821c3 commit ff39225

File tree

4 files changed

+274
-36
lines changed

4 files changed

+274
-36
lines changed

x-pack/plugin/sql/src/benchmarks/java/org/elasticsearch/xpack/sql/action/OperatorBenchmark.java

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.unit.ByteSizeValue;
3030
import org.elasticsearch.common.util.BigArrays;
3131
import org.elasticsearch.common.util.LongHash;
32+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3233
import org.elasticsearch.node.Node;
3334
import org.elasticsearch.threadpool.ThreadPool;
3435
import org.elasticsearch.xpack.sql.action.compute.data.Block;
@@ -88,6 +89,9 @@ public class OperatorBenchmark {
8889
@Param({ "100000000" }) // 100 million
8990
int numDocs;
9091

92+
@Param({ "1", "10" })
93+
int maxNumSegments;
94+
9195
ThreadPool threadPool;
9296

9397
@Setup
@@ -105,7 +109,7 @@ public void setup() throws IOException {
105109
indexWriter.addDocument(doc);
106110
}
107111
indexWriter.commit();
108-
indexWriter.forceMerge(1);
112+
indexWriter.forceMerge(maxNumSegments);
109113
indexWriter.flush();
110114
}
111115
indexReader = DirectoryReader.open(dir);
@@ -340,7 +344,7 @@ public long testOperatorsWithLucene() {
340344
}
341345

342346
@Benchmark
343-
public long testSingleThreadedAvg() {
347+
public long testLongAvgSingleThreadedAvg() {
344348
return runWithDriver(
345349
ByteSizeValue.ofKb(16).bytesAsInt(),
346350
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
@@ -350,9 +354,9 @@ public long testSingleThreadedAvg() {
350354
}
351355

352356
@Benchmark
353-
public long testMultiThreadedAvg() {
357+
public long testLongAvgMultiThreadedAvgWithSingleThreadedSearch() {
354358
AtomicInteger rowCount = new AtomicInteger();
355-
int parallelCount = 8;
359+
int parallelCount = ThreadPool.searchThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY));
356360
List<Driver> drivers = new ArrayList<>(parallelCount);
357361
List<ExchangeSource> forkExchangeSources = new ArrayList<>(parallelCount);
358362
List<ExchangeSource> joinExchangeSources = new ArrayList<>(parallelCount);
@@ -404,7 +408,90 @@ public long testMultiThreadedAvg() {
404408
);
405409
drivers.add(reduceDriver);
406410

407-
Driver.runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers).actionGet();
411+
Driver.runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers);
412+
return rowCount.get();
413+
}
414+
415+
@Benchmark
416+
public long testLongAvgMultiThreadedAvgWithMultiThreadedSearch() {
417+
AtomicInteger rowCount = new AtomicInteger();
418+
int parallelCount = ThreadPool.searchThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY));
419+
List<Driver> drivers = new ArrayList<>(parallelCount);
420+
List<ExchangeSource> joinExchangeSources = new ArrayList<>(parallelCount);
421+
422+
for (LuceneSourceOperator luceneSourceOperator : new LuceneSourceOperator(
423+
indexReader,
424+
new MatchAllDocsQuery(),
425+
ByteSizeValue.ofKb(16).bytesAsInt()
426+
).slice(parallelCount)) {
427+
ExchangeSource joinExchangeSource = new ExchangeSource();
428+
joinExchangeSources.add(joinExchangeSource);
429+
Driver driver = new Driver(
430+
List.of(
431+
luceneSourceOperator,
432+
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
433+
new LongAvgOperator(2), // PARTIAL
434+
new ExchangeSinkOperator(
435+
new ExchangeSink(new PassthroughExchanger(joinExchangeSource, Integer.MAX_VALUE), s -> joinExchangeSource.finish())
436+
)
437+
),
438+
() -> {}
439+
);
440+
drivers.add(driver);
441+
}
442+
443+
Driver reduceDriver = new Driver(
444+
List.of(
445+
new RandomUnionSourceOperator(joinExchangeSources),
446+
new LongAvgOperator(0, 1), // FINAL
447+
new PageConsumerOperator(page -> rowCount.addAndGet(page.getPositionCount()))
448+
),
449+
() -> {}
450+
);
451+
drivers.add(reduceDriver);
452+
453+
Driver.runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers);
454+
return rowCount.get();
455+
}
456+
457+
@Benchmark
458+
public long testLongAvgMultiThreadedAvgWithMultiThreadedSegmentSearch() {
459+
AtomicInteger rowCount = new AtomicInteger();
460+
List<Driver> drivers = new ArrayList<>();
461+
List<ExchangeSource> joinExchangeSources = new ArrayList<>();
462+
463+
for (LuceneSourceOperator luceneSourceOperator : new LuceneSourceOperator(
464+
indexReader,
465+
new MatchAllDocsQuery(),
466+
ByteSizeValue.ofKb(16).bytesAsInt()
467+
).segmentSlice()) {
468+
ExchangeSource joinExchangeSource = new ExchangeSource();
469+
joinExchangeSources.add(joinExchangeSource);
470+
Driver driver = new Driver(
471+
List.of(
472+
luceneSourceOperator,
473+
new NumericDocValuesExtractor(indexReader, 0, 1, "value"),
474+
new LongAvgOperator(2), // PARTIAL
475+
new ExchangeSinkOperator(
476+
new ExchangeSink(new PassthroughExchanger(joinExchangeSource, Integer.MAX_VALUE), s -> joinExchangeSource.finish())
477+
)
478+
),
479+
() -> {}
480+
);
481+
drivers.add(driver);
482+
}
483+
484+
Driver reduceDriver = new Driver(
485+
List.of(
486+
new RandomUnionSourceOperator(joinExchangeSources),
487+
new LongAvgOperator(0, 1), // FINAL
488+
new PageConsumerOperator(page -> rowCount.addAndGet(page.getPositionCount()))
489+
),
490+
() -> {}
491+
);
492+
drivers.add(reduceDriver);
493+
494+
Driver.runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers);
408495
return rowCount.get();
409496
}
410497
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/action/compute/lucene/LuceneSourceOperator.java

Lines changed: 137 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,24 @@
1111
import org.apache.lucene.index.LeafReaderContext;
1212
import org.apache.lucene.search.BulkScorer;
1313
import org.apache.lucene.search.ConstantScoreQuery;
14-
import org.apache.lucene.search.DocIdSetIterator;
1514
import org.apache.lucene.search.IndexSearcher;
1615
import org.apache.lucene.search.LeafCollector;
1716
import org.apache.lucene.search.Query;
1817
import org.apache.lucene.search.Scorable;
1918
import org.apache.lucene.search.ScoreMode;
2019
import org.apache.lucene.search.Weight;
20+
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.xpack.sql.action.compute.data.ConstantIntBlock;
2222
import org.elasticsearch.xpack.sql.action.compute.data.IntBlock;
2323
import org.elasticsearch.xpack.sql.action.compute.data.Page;
2424
import org.elasticsearch.xpack.sql.action.compute.operator.Operator;
2525

2626
import java.io.IOException;
2727
import java.io.UncheckedIOException;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.List;
31+
import java.util.stream.Collectors;
2832

2933
/**
3034
* Source operator that incrementally runs Lucene searches
@@ -33,19 +37,22 @@ public class LuceneSourceOperator implements Operator {
3337

3438
private static final int PAGE_SIZE = 4096;
3539

36-
private final IndexReader reader;
40+
@Nullable
41+
private final IndexReader indexReader;
42+
@Nullable
3743
private final Query query;
44+
private final List<PartialLeafReaderContext> leaves;
3845
private final int maxPageSize;
3946
private final int minPageSize;
4047

4148
private Weight weight;
4249

4350
private int currentLeaf = 0;
44-
private LeafReaderContext currentLeafReaderContext = null;
51+
private PartialLeafReaderContext currentLeafReaderContext = null;
4552
private BulkScorer currentScorer = null;
4653

4754
private int currentPagePos;
48-
private int[] currentPage;
55+
private final int[] currentPage;
4956

5057
private int currentScorerPos;
5158

@@ -54,10 +61,22 @@ public LuceneSourceOperator(IndexReader reader, Query query) {
5461
}
5562

5663
public LuceneSourceOperator(IndexReader reader, Query query, int maxPageSize) {
57-
this.reader = reader;
64+
this.indexReader = reader;
65+
this.leaves = reader.leaves().stream().map(PartialLeafReaderContext::new).collect(Collectors.toList());
5866
this.query = query;
5967
this.maxPageSize = maxPageSize;
6068
this.minPageSize = maxPageSize / 2;
69+
currentPage = new int[maxPageSize];
70+
}
71+
72+
private LuceneSourceOperator(Weight weight, List<PartialLeafReaderContext> leaves, int maxPageSize) {
73+
this.indexReader = null;
74+
this.leaves = leaves;
75+
this.query = null;
76+
this.weight = weight;
77+
this.maxPageSize = maxPageSize;
78+
this.minPageSize = maxPageSize / 2;
79+
currentPage = new int[maxPageSize];
6180
}
6281

6382
@Override
@@ -77,36 +96,99 @@ public void finish() {
7796

7897
@Override
7998
public boolean isFinished() {
80-
return currentLeaf >= reader.leaves().size();
99+
return currentLeaf >= leaves.size();
81100
}
82101

102+
/**
103+
* Split this source operator into a given number of slices
104+
*/
105+
public List<LuceneSourceOperator> slice(int numSlices) {
106+
if (weight != null) {
107+
throw new IllegalStateException("can only call slice method once");
108+
}
109+
initializeWeightIfNecessary();
110+
final int totalDocCount = indexReader.maxDoc();
111+
final int maxDocsPerSlice = (totalDocCount / numSlices) + 1;
112+
113+
final List<List<PartialLeafReaderContext>> slices = new ArrayList<>();
114+
int docsAllocatedInCurrentSlice = 0;
115+
List<PartialLeafReaderContext> currentSlice = null;
116+
for (LeafReaderContext ctx : indexReader.leaves()) {
117+
int minDoc = 0;
118+
int numDocsInLeaf = ctx.reader().maxDoc();
119+
while (minDoc < numDocsInLeaf) {
120+
int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf);
121+
if (numDocsToUse <= 0) {
122+
break;
123+
}
124+
if (currentSlice == null) {
125+
currentSlice = new ArrayList<>();
126+
}
127+
currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse));
128+
minDoc += numDocsToUse;
129+
docsAllocatedInCurrentSlice += numDocsToUse;
130+
if (docsAllocatedInCurrentSlice >= maxDocsPerSlice) {
131+
slices.add(currentSlice);
132+
currentSlice = null;
133+
docsAllocatedInCurrentSlice = 0;
134+
}
135+
}
136+
}
137+
if (currentSlice != null) {
138+
slices.add(currentSlice);
139+
}
140+
141+
List<LuceneSourceOperator> operators = new ArrayList<>();
142+
for (List<PartialLeafReaderContext> slice : slices) {
143+
operators.add(new LuceneSourceOperator(weight, slice, maxPageSize));
144+
}
145+
return operators;
146+
}
147+
148+
/**
149+
* Uses Lucene's own slicing method, which creates per-segment level slices
150+
*/
151+
public List<LuceneSourceOperator> segmentSlice() {
152+
if (weight != null) {
153+
throw new IllegalStateException("can only call slice method once");
154+
}
155+
initializeWeightIfNecessary();
156+
List<LuceneSourceOperator> operators = new ArrayList<>();
157+
for (IndexSearcher.LeafSlice leafSlice : IndexSearcher.slices(indexReader.leaves(), MAX_DOCS_PER_SLICE, MAX_SEGMENTS_PER_SLICE)) {
158+
operators.add(
159+
new LuceneSourceOperator(
160+
weight,
161+
Arrays.asList(leafSlice.leaves).stream().map(PartialLeafReaderContext::new).collect(Collectors.toList()),
162+
maxPageSize
163+
)
164+
);
165+
}
166+
return operators;
167+
}
168+
169+
private static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
170+
private static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
171+
83172
@Override
84173
public Page getOutput() {
85174
if (isFinished()) {
86175
return null;
87176
}
88177

89178
// initialize weight if not done yet
90-
if (weight == null) {
91-
IndexSearcher indexSearcher = new IndexSearcher(reader);
92-
try {
93-
weight = indexSearcher.createWeight(indexSearcher.rewrite(new ConstantScoreQuery(query)), ScoreMode.COMPLETE_NO_SCORES, 1);
94-
} catch (IOException e) {
95-
throw new UncheckedIOException(e);
96-
}
97-
}
179+
initializeWeightIfNecessary();
98180

99181
Page page = null;
100182

101183
// initializes currentLeafReaderContext, currentScorer, and currentScorerPos when we switch to a new leaf reader
102184
if (currentLeafReaderContext == null) {
103-
currentLeafReaderContext = reader.leaves().get(currentLeaf);
185+
currentLeafReaderContext = leaves.get(currentLeaf);
104186
try {
105-
currentScorer = weight.bulkScorer(currentLeafReaderContext);
187+
currentScorer = weight.bulkScorer(currentLeafReaderContext.leafReaderContext);
106188
} catch (IOException e) {
107189
throw new UncheckedIOException(e);
108190
}
109-
currentScorerPos = 0;
191+
currentScorerPos = currentLeafReaderContext.minDoc;
110192
}
111193

112194
try {
@@ -118,26 +200,25 @@ public void setScorer(Scorable scorer) {
118200

119201
@Override
120202
public void collect(int doc) {
121-
if (currentPage == null) {
122-
currentPage = new int[maxPageSize];
123-
currentPagePos = 0;
124-
}
125203
currentPage[currentPagePos] = doc;
126204
currentPagePos++;
127205
}
128-
}, currentLeafReaderContext.reader().getLiveDocs(), currentScorerPos, currentScorerPos + maxPageSize - currentPagePos);
206+
},
207+
currentLeafReaderContext.leafReaderContext.reader().getLiveDocs(),
208+
currentScorerPos,
209+
Math.min(currentLeafReaderContext.maxDoc, currentScorerPos + maxPageSize - currentPagePos)
210+
);
129211

130-
if (currentPagePos >= minPageSize || currentScorerPos == DocIdSetIterator.NO_MORE_DOCS) {
212+
if (currentPagePos >= minPageSize || currentScorerPos >= currentLeafReaderContext.maxDoc) {
131213
page = new Page(
132214
currentPagePos,
133-
new IntBlock(currentPage, currentPagePos),
134-
new ConstantIntBlock(currentPagePos, currentLeafReaderContext.ord)
215+
new IntBlock(Arrays.copyOf(currentPage, currentPagePos), currentPagePos),
216+
new ConstantIntBlock(currentPagePos, currentLeafReaderContext.leafReaderContext.ord)
135217
);
136-
currentPage = null;
137218
currentPagePos = 0;
138219
}
139220

140-
if (currentScorerPos == DocIdSetIterator.NO_MORE_DOCS) {
221+
if (currentScorerPos >= currentLeafReaderContext.maxDoc) {
141222
currentLeaf++;
142223
currentLeafReaderContext = null;
143224
currentScorer = null;
@@ -150,6 +231,35 @@ public void collect(int doc) {
150231
return page;
151232
}
152233

234+
private void initializeWeightIfNecessary() {
235+
if (weight == null) {
236+
try {
237+
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
238+
weight = indexSearcher.createWeight(indexSearcher.rewrite(new ConstantScoreQuery(query)), ScoreMode.COMPLETE_NO_SCORES, 1);
239+
} catch (IOException e) {
240+
throw new UncheckedIOException(e);
241+
}
242+
}
243+
}
244+
245+
static class PartialLeafReaderContext {
246+
247+
final LeafReaderContext leafReaderContext;
248+
final int minDoc; // incl
249+
final int maxDoc; // excl
250+
251+
PartialLeafReaderContext(LeafReaderContext leafReaderContext, int minDoc, int maxDoc) {
252+
this.leafReaderContext = leafReaderContext;
253+
this.minDoc = minDoc;
254+
this.maxDoc = maxDoc;
255+
}
256+
257+
PartialLeafReaderContext(LeafReaderContext leafReaderContext) {
258+
this(leafReaderContext, 0, leafReaderContext.reader().maxDoc());
259+
}
260+
261+
}
262+
153263
@Override
154264
public void close() {
155265

0 commit comments

Comments
 (0)