|
27 | 27 | import org.apache.lucene.search.CollectionStatistics;
|
28 | 28 | import org.apache.lucene.search.CollectionTerminatedException;
|
29 | 29 | import org.apache.lucene.search.Collector;
|
| 30 | +import org.apache.lucene.search.CollectorManager; |
30 | 31 | import org.apache.lucene.search.ConjunctionDISI;
|
31 | 32 | import org.apache.lucene.search.DocIdSetIterator;
|
32 | 33 | import org.apache.lucene.search.Explanation;
|
|
35 | 36 | import org.apache.lucene.search.Query;
|
36 | 37 | import org.apache.lucene.search.QueryCache;
|
37 | 38 | import org.apache.lucene.search.QueryCachingPolicy;
|
| 39 | +import org.apache.lucene.search.ScoreDoc; |
38 | 40 | import org.apache.lucene.search.ScoreMode;
|
39 | 41 | import org.apache.lucene.search.Scorer;
|
40 | 42 | import org.apache.lucene.search.TermStatistics;
|
| 43 | +import org.apache.lucene.search.TopFieldDocs; |
| 44 | +import org.apache.lucene.search.TotalHits; |
41 | 45 | import org.apache.lucene.search.Weight;
|
42 | 46 | import org.apache.lucene.search.similarities.Similarity;
|
43 | 47 | import org.apache.lucene.util.BitSet;
|
44 | 48 | import org.apache.lucene.util.BitSetIterator;
|
45 | 49 | import org.apache.lucene.util.Bits;
|
46 | 50 | import org.apache.lucene.util.CombinedBitSet;
|
47 | 51 | import org.apache.lucene.util.SparseFixedBitSet;
|
| 52 | +import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; |
| 53 | +import org.elasticsearch.search.DocValueFormat; |
48 | 54 | import org.elasticsearch.search.dfs.AggregatedDfs;
|
49 | 55 | import org.elasticsearch.search.profile.Timer;
|
50 | 56 | import org.elasticsearch.search.profile.query.ProfileWeight;
|
51 | 57 | import org.elasticsearch.search.profile.query.QueryProfileBreakdown;
|
52 | 58 | import org.elasticsearch.search.profile.query.QueryProfiler;
|
53 | 59 | import org.elasticsearch.search.profile.query.QueryTimingType;
|
| 60 | +import org.elasticsearch.search.query.QuerySearchResult; |
54 | 61 |
|
55 | 62 | import java.io.IOException;
|
| 63 | +import java.util.ArrayList; |
56 | 64 | import java.util.Arrays;
|
57 | 65 | import java.util.List;
|
58 | 66 | import java.util.Set;
|
@@ -131,12 +139,86 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
|
131 | 139 | }
|
132 | 140 | }
|
133 | 141 |
|
| 142 | + private void checkCancelled() { |
| 143 | + if (checkCancelled != null) { |
| 144 | + checkCancelled.run(); |
| 145 | + } |
| 146 | + } |
| 147 | + |
| 148 | + public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager, |
| 149 | + QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException { |
| 150 | + final List<Collector> collectors = new ArrayList<>(leaves.size()); |
| 151 | + for (LeafReaderContext ctx : leaves) { |
| 152 | + final Collector collector = manager.newCollector(); |
| 153 | + searchLeaf(ctx, weight, collector); |
| 154 | + collectors.add(collector); |
| 155 | + } |
| 156 | + TopFieldDocs mergedTopDocs = (TopFieldDocs) manager.reduce(collectors); |
| 157 | + // Lucene sets shards indexes during merging of topDocs from different collectors |
| 158 | + // We need to reset shard index; ES will set shard index later during reduce stage |
| 159 | + for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) { |
| 160 | + scoreDoc.shardIndex = -1; |
| 161 | + } |
| 162 | + if (totalHits != null) { // we have already precalculated totalHits for the whole index |
| 163 | + mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields); |
| 164 | + } |
| 165 | + result.topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), formats); |
| 166 | + } |
| 167 | + |
134 | 168 | @Override
|
135 | 169 | protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
|
136 |
| - final Weight cancellableWeight; |
137 |
| - if (checkCancelled != null) { |
138 |
| - cancellableWeight = new Weight(weight.getQuery()) { |
| 170 | + for (LeafReaderContext ctx : leaves) { // search each subreader |
| 171 | + searchLeaf(ctx, weight, collector); |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + /** |
| 176 | + * Lower-level search API. |
| 177 | + * |
| 178 | + * {@link LeafCollector#collect(int)} is called for every matching document in |
| 179 | + * the provided <code>ctx</code>. |
| 180 | + */ |
| 181 | + private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException { |
| 182 | + checkCancelled(); |
| 183 | + weight = wrapWeight(weight); |
| 184 | + final LeafCollector leafCollector; |
| 185 | + try { |
| 186 | + leafCollector = collector.getLeafCollector(ctx); |
| 187 | + } catch (CollectionTerminatedException e) { |
| 188 | + // there is no doc of interest in this reader context |
| 189 | + // continue with the following leaf |
| 190 | + return; |
| 191 | + } |
| 192 | + Bits liveDocs = ctx.reader().getLiveDocs(); |
| 193 | + BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs); |
| 194 | + if (liveDocsBitSet == null) { |
| 195 | + BulkScorer bulkScorer = weight.bulkScorer(ctx); |
| 196 | + if (bulkScorer != null) { |
| 197 | + try { |
| 198 | + bulkScorer.score(leafCollector, liveDocs); |
| 199 | + } catch (CollectionTerminatedException e) { |
| 200 | + // collection was terminated prematurely |
| 201 | + // continue with the following leaf |
| 202 | + } |
| 203 | + } |
| 204 | + } else { |
| 205 | + // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing: |
| 206 | + Scorer scorer = weight.scorer(ctx); |
| 207 | + if (scorer != null) { |
| 208 | + try { |
| 209 | + intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector, |
| 210 | + checkCancelled == null ? () -> { } : checkCancelled); |
| 211 | + } catch (CollectionTerminatedException e) { |
| 212 | + // collection was terminated prematurely |
| 213 | + // continue with the following leaf |
| 214 | + } |
| 215 | + } |
| 216 | + } |
| 217 | + } |
139 | 218 |
|
| 219 | + private Weight wrapWeight(Weight weight) { |
| 220 | + if (checkCancelled != null) { |
| 221 | + return new Weight(weight.getQuery()) { |
140 | 222 | @Override
|
141 | 223 | public void extractTerms(Set<Term> terms) {
|
142 | 224 | throw new UnsupportedOperationException();
|
@@ -168,48 +250,10 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
|
168 | 250 | }
|
169 | 251 | };
|
170 | 252 | } else {
|
171 |
| - cancellableWeight = weight; |
| 253 | + return weight; |
172 | 254 | }
|
173 |
| - searchInternal(leaves, cancellableWeight, collector); |
174 | 255 | }
|
175 | 256 |
|
176 |
| - private void searchInternal(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException { |
177 |
| - for (LeafReaderContext ctx : leaves) { // search each subreader |
178 |
| - final LeafCollector leafCollector; |
179 |
| - try { |
180 |
| - leafCollector = collector.getLeafCollector(ctx); |
181 |
| - } catch (CollectionTerminatedException e) { |
182 |
| - // there is no doc of interest in this reader context |
183 |
| - // continue with the following leaf |
184 |
| - continue; |
185 |
| - } |
186 |
| - Bits liveDocs = ctx.reader().getLiveDocs(); |
187 |
| - BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs); |
188 |
| - if (liveDocsBitSet == null) { |
189 |
| - BulkScorer bulkScorer = weight.bulkScorer(ctx); |
190 |
| - if (bulkScorer != null) { |
191 |
| - try { |
192 |
| - bulkScorer.score(leafCollector, liveDocs); |
193 |
| - } catch (CollectionTerminatedException e) { |
194 |
| - // collection was terminated prematurely |
195 |
| - // continue with the following leaf |
196 |
| - } |
197 |
| - } |
198 |
| - } else { |
199 |
| - // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing: |
200 |
| - Scorer scorer = weight.scorer(ctx); |
201 |
| - if (scorer != null) { |
202 |
| - try { |
203 |
| - intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector, |
204 |
| - checkCancelled == null ? () -> {} : checkCancelled); |
205 |
| - } catch (CollectionTerminatedException e) { |
206 |
| - // collection was terminated prematurely |
207 |
| - // continue with the following leaf |
208 |
| - } |
209 |
| - } |
210 |
| - } |
211 |
| - } |
212 |
| - } |
213 | 257 |
|
214 | 258 | private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
|
215 | 259 | if (liveDocs instanceof SparseFixedBitSet) {
|
|
0 commit comments