9
9
10
10
package org .elasticsearch .search .aggregations .bucket .sampler .random ;
11
11
12
+ import org .apache .lucene .search .BooleanClause ;
13
+ import org .apache .lucene .search .BooleanQuery ;
12
14
import org .apache .lucene .search .CollectionTerminatedException ;
13
15
import org .apache .lucene .search .DocIdSetIterator ;
16
+ import org .apache .lucene .search .Query ;
17
+ import org .apache .lucene .search .ScoreMode ;
14
18
import org .apache .lucene .search .Scorer ;
15
19
import org .apache .lucene .search .Weight ;
16
20
import org .apache .lucene .util .Bits ;
17
- import org .elasticsearch .common .CheckedSupplier ;
18
21
import org .elasticsearch .search .aggregations .AggregationExecutionContext ;
19
22
import org .elasticsearch .search .aggregations .Aggregator ;
20
23
import org .elasticsearch .search .aggregations .AggregatorFactories ;
21
24
import org .elasticsearch .search .aggregations .CardinalityUpperBound ;
22
25
import org .elasticsearch .search .aggregations .InternalAggregation ;
23
26
import org .elasticsearch .search .aggregations .LeafBucketCollector ;
27
+ import org .elasticsearch .search .aggregations .LeafBucketCollectorBase ;
24
28
import org .elasticsearch .search .aggregations .bucket .BucketsAggregator ;
25
29
import org .elasticsearch .search .aggregations .bucket .SingleBucketAggregator ;
26
30
import org .elasticsearch .search .aggregations .support .AggregationContext ;
@@ -33,14 +37,13 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single
33
37
private final int seed ;
34
38
private final Integer shardSeed ;
35
39
private final double probability ;
36
- private final CheckedSupplier < Weight , IOException > weightSupplier ;
40
+ private Weight weight ;
37
41
38
42
RandomSamplerAggregator (
39
43
String name ,
40
44
int seed ,
41
45
Integer shardSeed ,
42
46
double probability ,
43
- CheckedSupplier <Weight , IOException > weightSupplier ,
44
47
AggregatorFactories factories ,
45
48
AggregationContext context ,
46
49
Aggregator parent ,
@@ -55,10 +58,33 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single
55
58
RandomSamplerAggregationBuilder .NAME + " aggregation [" + name + "] must have sub aggregations configured"
56
59
);
57
60
}
58
- this .weightSupplier = weightSupplier ;
59
61
this .shardSeed = shardSeed ;
60
62
}
61
63
64
+ /**
65
+ * This creates the query weight which will be used in the aggregator.
66
+ *
67
+ * This weight is a boolean query between {@link RandomSamplingQuery} and the configured top level query of the search. This allows
68
+ * the aggregation to iterate the documents directly, thus sampling in the background instead of the foreground.
69
+ * @return weight to be used, is cached for additional usages
70
+ * @throws IOException when building the weight or queries fails;
71
+ */
72
+ private Weight getWeight () throws IOException {
73
+ if (weight == null ) {
74
+ ScoreMode scoreMode = scoreMode ();
75
+ BooleanQuery .Builder fullQuery = new BooleanQuery .Builder ().add (
76
+ context .query (),
77
+ scoreMode .needsScores () ? BooleanClause .Occur .MUST : BooleanClause .Occur .FILTER
78
+ );
79
+ if (probability < 1.0 ) {
80
+ Query sampleQuery = new RandomSamplingQuery (probability , seed , shardSeed == null ? context .shardRandomSeed () : shardSeed );
81
+ fullQuery .add (sampleQuery , BooleanClause .Occur .FILTER );
82
+ }
83
+ weight = context .searcher ().createWeight (context .searcher ().rewrite (fullQuery .build ()), scoreMode , 1f );
84
+ }
85
+ return weight ;
86
+ }
87
+
62
88
@ Override
63
89
public InternalAggregation [] buildAggregations (long [] owningBucketOrds ) throws IOException {
64
90
return buildAggregationsForSingleBucket (
@@ -100,22 +126,26 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
100
126
if (sub .isNoop ()) {
101
127
return LeafBucketCollector .NO_OP_COLLECTOR ;
102
128
}
129
+
130
+ Scorer scorer = getWeight ().scorer (aggCtx .getLeafReaderContext ());
131
+ // This means there are no docs to iterate, possibly due to the fields not existing
132
+ if (scorer == null ) {
133
+ return LeafBucketCollector .NO_OP_COLLECTOR ;
134
+ }
135
+ sub .setScorer (scorer );
136
+
103
137
// No sampling is being done, collect all docs
138
+ // TODO know when sampling would be much slower and skip sampling: https://github.com/elastic/elasticsearch/issues/84353
104
139
if (probability >= 1.0 ) {
105
140
grow (1 );
106
- return new LeafBucketCollector ( ) {
141
+ return new LeafBucketCollectorBase ( sub , null ) {
107
142
@ Override
108
143
public void collect (int doc , long owningBucketOrd ) throws IOException {
109
144
collectExistingBucket (sub , doc , 0 );
110
145
}
111
146
};
112
147
}
113
- // TODO know when sampling would be much slower and skip sampling: https://github.com/elastic/elasticsearch/issues/84353
114
- Scorer scorer = weightSupplier .get ().scorer (aggCtx .getLeafReaderContext ());
115
- // This means there are no docs to iterate, possibly due to the fields not existing
116
- if (scorer == null ) {
117
- return LeafBucketCollector .NO_OP_COLLECTOR ;
118
- }
148
+
119
149
final DocIdSetIterator docIt = scorer .iterator ();
120
150
final Bits liveDocs = aggCtx .getLeafReaderContext ().reader ().getLiveDocs ();
121
151
try {
@@ -135,5 +165,4 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
135
165
// Since we have done our own collection, there is nothing for the leaf collector to do
136
166
return LeafBucketCollector .NO_OP_COLLECTOR ;
137
167
}
138
-
139
168
}
0 commit comments