Skip to content

Commit e0ac15a

Browse files
committed
Parallelize knn query rewrite across slices rather than segments (#12325)
The concurrent query rewrite for knn vectory query introduced with #12160 requests one thread per segment to the executor. To align this with the IndexSearcher parallel behaviour, we should rather parallelize across slices. Also, we can reuse the same slice executor instance that the index searcher already holds, in that way we are using a QueueSizeBasedExecutor when a thread pool executor is provided.
1 parent 0b53670 commit e0ac15a

File tree

3 files changed

+42
-23
lines changed

3 files changed

+42
-23
lines changed

lucene/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ Improvements
3636

3737
* GITHUB#12305: Minor cleanup and improvements to DaciukMihovAutomatonBuilder. (Greg Miller)
3838

39+
* GITHUB#12325: Parallelize AbstractKnnVectorQuery rewrite across slices rather than segments. (Luca Cavanna)
40+
3941
Optimizations
4042
---------------------
4143

lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@
1919
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
2020

2121
import java.io.IOException;
22+
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Comparator;
2425
import java.util.List;
2526
import java.util.Objects;
2627
import java.util.concurrent.ExecutionException;
27-
import java.util.concurrent.Executor;
2828
import java.util.concurrent.FutureTask;
29-
import java.util.stream.Collectors;
3029
import org.apache.lucene.codecs.KnnVectorsReader;
3130
import org.apache.lucene.index.FieldInfo;
3231
import org.apache.lucene.index.IndexReader;
@@ -82,11 +81,12 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
8281
filterWeight = null;
8382
}
8483

85-
Executor executor = indexSearcher.getExecutor();
84+
SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
85+
// in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
8686
TopDocs[] perLeafResults =
87-
(executor == null)
87+
(sliceExecutor == null)
8888
? sequentialSearch(reader.leaves(), filterWeight)
89-
: parallelSearch(reader.leaves(), filterWeight, executor);
89+
: parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor);
9090

9191
// Merge sort the results
9292
TopDocs topK = TopDocs.merge(k, perLeafResults);
@@ -110,27 +110,40 @@ private TopDocs[] sequentialSearch(
110110
}
111111

112112
private TopDocs[] parallelSearch(
113-
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
114-
List<FutureTask<TopDocs>> tasks =
115-
leafReaderContexts.stream()
116-
.map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
117-
.collect(Collectors.toList());
113+
IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {
114+
115+
List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
116+
int segmentsCount = 0;
117+
for (IndexSearcher.LeafSlice slice : slices) {
118+
segmentsCount += slice.leaves.length;
119+
tasks.add(
120+
new FutureTask<>(
121+
() -> {
122+
TopDocs[] results = new TopDocs[slice.leaves.length];
123+
int i = 0;
124+
for (LeafReaderContext context : slice.leaves) {
125+
results[i++] = searchLeaf(context, filterWeight);
126+
}
127+
return results;
128+
}));
129+
}
118130

119-
SliceExecutor sliceExecutor = new SliceExecutor(executor);
120131
sliceExecutor.invokeAll(tasks);
121132

122-
return tasks.stream()
123-
.map(
124-
task -> {
125-
try {
126-
return task.get();
127-
} catch (ExecutionException e) {
128-
throw new RuntimeException(e.getCause());
129-
} catch (InterruptedException e) {
130-
throw new ThreadInterruptedException(e);
131-
}
132-
})
133-
.toArray(TopDocs[]::new);
133+
TopDocs[] topDocs = new TopDocs[segmentsCount];
134+
int i = 0;
135+
for (FutureTask<TopDocs[]> task : tasks) {
136+
try {
137+
for (TopDocs docs : task.get()) {
138+
topDocs[i++] = docs;
139+
}
140+
} catch (ExecutionException e) {
141+
throw new RuntimeException(e.getCause());
142+
} catch (InterruptedException e) {
143+
throw new ThreadInterruptedException(e);
144+
}
145+
}
146+
return topDocs;
134147
}
135148

136149
private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {

lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,10 @@ public Executor getExecutor() {
998998
return executor;
999999
}
10001000

1001+
SliceExecutor getSliceExecutor() {
1002+
return sliceExecutor;
1003+
}
1004+
10011005
/**
10021006
* Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
10031007
* typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to

0 commit comments

Comments
 (0)