Skip to content

Commit 981807f

Browse files
committed
First commit of the implementation for string_stats aggregation
1 parent 50ad802 commit 981807f

File tree

7 files changed

+928
-11
lines changed

7 files changed

+928
-11
lines changed
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@
66
package org.elasticsearch.xpack.analytics;
77

88
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
9+
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
910

10-
public class DataScienceAggregationBuilders {
11+
public class AnalyticsAggregationBuilders {
1112

12-
public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) {
13+
public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCardinality(String name, String bucketsPath) {
1314
return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath);
1415
}
16+
17+
public static StringStatsAggregationBuilder stringStats(String name) {
18+
return new StringStatsAggregationBuilder(name);
19+
}
1520
}

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@
1111
import org.elasticsearch.plugins.ActionPlugin;
1212
import org.elasticsearch.plugins.Plugin;
1313
import org.elasticsearch.plugins.SearchPlugin;
14-
import org.elasticsearch.xpack.core.XPackPlugin;
15-
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
16-
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
17-
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
1814
import org.elasticsearch.xpack.analytics.action.AnalyticsInfoTransportAction;
1915
import org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction;
2016
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
2117
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
2218
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregator;
19+
import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
20+
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
21+
import org.elasticsearch.xpack.core.XPackPlugin;
22+
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
23+
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
24+
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
2325

2426
import java.util.Arrays;
2527
import java.util.List;
@@ -38,11 +40,23 @@ public AnalyticsPlugin() { }
3840

3941
@Override
4042
public List<PipelineAggregationSpec> getPipelineAggregations() {
41-
return singletonList(new PipelineAggregationSpec(
42-
CumulativeCardinalityPipelineAggregationBuilder.NAME,
43-
CumulativeCardinalityPipelineAggregationBuilder::new,
44-
CumulativeCardinalityPipelineAggregator::new,
45-
CumulativeCardinalityPipelineAggregationBuilder::parse));
43+
return singletonList(
44+
new PipelineAggregationSpec(
45+
CumulativeCardinalityPipelineAggregationBuilder.NAME,
46+
CumulativeCardinalityPipelineAggregationBuilder::new,
47+
CumulativeCardinalityPipelineAggregator::new,
48+
CumulativeCardinalityPipelineAggregationBuilder::parse)
49+
);
50+
}
51+
52+
@Override
53+
public List<AggregationSpec> getAggregations() {
54+
return singletonList(
55+
new AggregationSpec(
56+
StringStatsAggregationBuilder.NAME,
57+
StringStatsAggregationBuilder::new,
58+
StringStatsAggregationBuilder::parse).addResultReader(InternalStringStats::new)
59+
);
4660
}
4761

4862
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.analytics.stringstats;
7+
8+
import org.elasticsearch.common.io.stream.StreamInput;
9+
import org.elasticsearch.common.io.stream.StreamOutput;
10+
import org.elasticsearch.common.xcontent.XContentBuilder;
11+
import org.elasticsearch.search.DocValueFormat;
12+
import org.elasticsearch.search.aggregations.InternalAggregation;
13+
import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
14+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
15+
16+
import java.io.IOException;
17+
import java.util.HashMap;
18+
import java.util.LinkedHashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.stream.Collectors;
23+
24+
public class InternalStringStats extends InternalAggregation {
25+
26+
enum Metrics {
27+
count, min_length, max_length, avg_length, entropy;
28+
29+
public static Metrics resolve(String name) {
30+
return Metrics.valueOf(name);
31+
}
32+
}
33+
34+
private static final DocValueFormat DEFAULT_FORMAT = DocValueFormat.RAW;
35+
36+
private DocValueFormat format = DEFAULT_FORMAT;
37+
private final boolean showDistribution;
38+
private final long count;
39+
private final long totalLength;
40+
private final int minLength;
41+
private final int maxLength;
42+
private final Map<String, Long> charOccurrences;
43+
44+
public InternalStringStats(String name, long count, long totalLength, int minLength, int maxLength,
45+
Map<String, Long> charOccurences, boolean showDistribution,
46+
DocValueFormat formatter,
47+
List<PipelineAggregator> pipelineAggregators,
48+
Map<String, Object> metaData) {
49+
super(name, pipelineAggregators, metaData);
50+
this.format = formatter;
51+
this.showDistribution = showDistribution;
52+
this.count = count;
53+
this.totalLength = totalLength;
54+
this.minLength = minLength;
55+
this.maxLength = maxLength;
56+
this.charOccurrences = charOccurences;
57+
}
58+
59+
/** Read from a stream. */
60+
public InternalStringStats(StreamInput in) throws IOException {
61+
super(in);
62+
format = in.readNamedWriteable(DocValueFormat.class);
63+
showDistribution = in.readBoolean();
64+
count = in.readVLong();
65+
totalLength = in.readVLong();
66+
minLength = in.readVInt();
67+
maxLength = in.readVInt();
68+
charOccurrences = in.<String, Long>readMap(StreamInput::readString, StreamInput::readLong);
69+
}
70+
71+
@Override
72+
protected final void doWriteTo(StreamOutput out) throws IOException {
73+
out.writeNamedWriteable(format);
74+
out.writeBoolean(showDistribution);
75+
out.writeVLong(count);
76+
out.writeVLong(totalLength);
77+
out.writeVInt(minLength);
78+
out.writeVInt(maxLength);
79+
out.writeMap(charOccurrences, StreamOutput::writeString, StreamOutput::writeLong);
80+
}
81+
82+
public String getWriteableName() {
83+
return StatsAggregationBuilder.NAME;
84+
}
85+
86+
public long getCount() {
87+
return count;
88+
}
89+
90+
public int getMinLength() {
91+
return minLength;
92+
}
93+
94+
public int getMaxLength() {
95+
return maxLength;
96+
}
97+
98+
public double getAvgLength() {
99+
return (double) totalLength / count;
100+
}
101+
102+
public double getEntropy() {
103+
double sum = 0.0;
104+
double compensation = 0.0;
105+
for (double p : getDistribution().values()) {
106+
if (p > 0) {
107+
// Compute the sum of double values with Kahan summation algorithm which is more
108+
// accurate than naive summation.
109+
double value = p * log2(p);
110+
if (Double.isFinite(value) == false) {
111+
sum += value;
112+
} else if (Double.isFinite(sum)) {
113+
double corrected = value - compensation;
114+
double newSum = sum + corrected;
115+
compensation = (newSum - sum) - corrected;
116+
sum = newSum;
117+
}
118+
}
119+
}
120+
return -sum;
121+
}
122+
123+
/**
124+
* Convert the character occurrences map to character frequencies.
125+
*
126+
* @return A map with the character as key and the probability of
127+
* this character to occur as value. The map is ordered by frequency descending.
128+
*/
129+
public Map<String, Double> getDistribution() {
130+
return charOccurrences.entrySet().stream()
131+
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
132+
.collect(
133+
Collectors.toMap(e -> e.getKey(), e -> (double) e.getValue() / totalLength,
134+
(e1, e2) -> e2, LinkedHashMap::new)
135+
);
136+
}
137+
138+
/** Calculate base 2 logarithm */
139+
static double log2(double d) {
140+
return Math.log(d) / Math.log(2.0);
141+
}
142+
143+
public String getCountAsString() {
144+
return format.format(getCount()).toString();
145+
}
146+
147+
public String getMinLengthAsString() {
148+
return format.format(getMinLength()).toString();
149+
}
150+
151+
public String getMaxLengthAsString() {
152+
return format.format(getMaxLength()).toString();
153+
}
154+
155+
public String getAvgLengthAsString() {
156+
return format.format(getAvgLength()).toString();
157+
}
158+
159+
public String getEntropyAsString() {
160+
return format.format(getEntropy()).toString();
161+
}
162+
163+
public Object value(String name) {
164+
Metrics metrics = Metrics.valueOf(name);
165+
switch (metrics) {
166+
case count: return this.count;
167+
case min_length: return this.minLength;
168+
case max_length: return this.maxLength;
169+
case avg_length: return this.getAvgLength();
170+
case entropy: return this.getEntropy();
171+
default:
172+
throw new IllegalArgumentException("Unknown value [" + name + "] in common stats aggregation");
173+
}
174+
}
175+
176+
@Override
177+
public InternalStringStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
178+
long count = 0;
179+
long totalLength = 0;
180+
int minLength = Integer.MAX_VALUE;
181+
int maxLength = Integer.MIN_VALUE;
182+
Map<String, Long> occurs = new HashMap<>();
183+
184+
for (InternalAggregation aggregation : aggregations) {
185+
InternalStringStats stats = (InternalStringStats) aggregation;
186+
count += stats.getCount();
187+
minLength = Math.min(minLength, stats.getMinLength());
188+
maxLength = Math.max(maxLength, stats.getMaxLength());
189+
totalLength += stats.totalLength;
190+
stats.charOccurrences.forEach((k, v) ->
191+
occurs.merge(k, v, (oldValue, newValue) -> oldValue + newValue)
192+
);
193+
}
194+
195+
return new InternalStringStats(name, count, totalLength, minLength, maxLength, occurs,
196+
showDistribution, format, pipelineAggregators(), getMetaData());
197+
}
198+
199+
@Override
200+
public Object getProperty(List<String> path) {
201+
if (path.isEmpty()) {
202+
return this;
203+
} else if (path.size() == 1) {
204+
return value(path.get(0));
205+
} else {
206+
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
207+
}
208+
}
209+
210+
static class Fields {
211+
public static final String COUNT = "count";
212+
public static final String MIN_LENGTH = "min_length";
213+
public static final String MIN_LENGTH_AS_STRING = "min_length_as_string";
214+
public static final String MAX_LENGTH = "max_length";
215+
public static final String MAX_LENGTH_AS_STRING = "max_as_string";
216+
public static final String AVG_LENGTH = "avg_length";
217+
public static final String AVG_LENGTH_AS_STRING = "avg_length_as_string";
218+
public static final String ENTROPY = "entropy";
219+
public static final String ENTROPY_AS_STRING = "entropy_string";
220+
public static final String DISTRIBUTION = "distribution";
221+
public static final String DISTRIBUTION_AS_STRING = "distribution_string";
222+
}
223+
224+
@Override
225+
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
226+
builder.field(Fields.COUNT, count);
227+
if (count > 0) {
228+
builder.field(Fields.MIN_LENGTH, minLength);
229+
builder.field(Fields.MAX_LENGTH, maxLength);
230+
builder.field(Fields.AVG_LENGTH, getAvgLength());
231+
builder.field(Fields.ENTROPY, getEntropy());
232+
if (showDistribution == true) {
233+
builder.field(Fields.DISTRIBUTION, getDistribution());
234+
}
235+
if (format != DocValueFormat.RAW) {
236+
builder.field(Fields.MIN_LENGTH_AS_STRING, format.format(getMinLength()));
237+
builder.field(Fields.MAX_LENGTH_AS_STRING, format.format(getMaxLength()));
238+
builder.field(Fields.AVG_LENGTH_AS_STRING, format.format(getAvgLength()));
239+
builder.field(Fields.ENTROPY_AS_STRING, format.format(getEntropy()));
240+
if (showDistribution == true) {
241+
builder.startObject(Fields.DISTRIBUTION_AS_STRING);
242+
for (Map.Entry<String, Double> e: getDistribution().entrySet()) {
243+
builder.field(e.getKey(), format.format(e.getValue()).toString());
244+
}
245+
builder.endObject();
246+
}
247+
}
248+
} else {
249+
builder.nullField(Fields.MIN_LENGTH);
250+
builder.nullField(Fields.MAX_LENGTH);
251+
builder.nullField(Fields.AVG_LENGTH);
252+
builder.field(Fields.ENTROPY, 0.0d);
253+
254+
if (showDistribution == true) {
255+
builder.nullField(Fields.DISTRIBUTION);
256+
}
257+
}
258+
return builder;
259+
}
260+
261+
@Override
262+
public int hashCode() {
263+
return Objects.hash(super.hashCode(), count, minLength, maxLength, totalLength, charOccurrences, showDistribution);
264+
}
265+
266+
@Override
267+
public boolean equals(Object obj) {
268+
if (this == obj) return true;
269+
if (obj == null || getClass() != obj.getClass()) return false;
270+
if (super.equals(obj) == false) return false;
271+
272+
InternalStringStats other = (InternalStringStats) obj;
273+
return count == other.count &&
274+
minLength == other.minLength &&
275+
maxLength == other.maxLength &&
276+
totalLength == other.totalLength &&
277+
showDistribution == other.showDistribution;
278+
}
279+
}

0 commit comments

Comments
 (0)