Skip to content

Commit fc9fb64

Browse files
authored
[Rollup] Improve ID scheme for rollup documents (#32558)
Previously, we were using a simple CRC32 for the IDs of rollup documents. This is a very poor choice however, since 32bit IDs leads to collisions between documents very quickly. This commit moves Rollups over to a 128bit ID. The ID is a concatenation of all the keys in the document (similar to the rolling CRC before), hashed with 128bit Murmur3, then base64 encoded. Finally, the job ID and a delimiter (`$`) are prepended to the ID. This gurantees that there are 128bits per-job. 128bits should essentially remove all chances of collisions, and the prepended job ID means that _if_ there is a collision, it stays "within" the job. BWC notes: We can only upgrade the ID scheme after we know there has been a good checkpoint during indexing. We don't rely on a STARTED/STOPPED status since we can't guarantee that resulted from a real checkpoint, or other state. So we only upgrade the ID after we have reached a checkpoint state during an active index run, and only after the checkpoint has been confirmed. Once a job has been upgraded and checkpointed, the version increments and the new ID is used in the future. All new jobs use the new ID from the start
1 parent 3d4c84f commit fc9fb64

File tree

21 files changed

+1054
-170
lines changed

21 files changed

+1054
-170
lines changed

x-pack/docs/en/rest-api/rollup/get-job.asciidoc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ Which will yield the following response:
9393
"page_size" : 1000
9494
},
9595
"status" : {
96-
"job_state" : "stopped"
96+
"job_state" : "stopped",
97+
"upgraded_doc_id": true
9798
},
9899
"stats" : {
99100
"pages_processed" : 0,
@@ -212,7 +213,8 @@ Which will yield the following response:
212213
"page_size" : 1000
213214
},
214215
"status" : {
215-
"job_state" : "stopped"
216+
"job_state" : "stopped",
217+
"upgraded_doc_id": true
216218
},
217219
"stats" : {
218220
"pages_processed" : 0,
@@ -260,7 +262,8 @@ Which will yield the following response:
260262
"page_size" : 1000
261263
},
262264
"status" : {
263-
"job_state" : "stopped"
265+
"job_state" : "stopped",
266+
"upgraded_doc_id": true
264267
},
265268
"stats" : {
266269
"pages_processed" : 0,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.core.rollup.job;
77

88

9+
import org.elasticsearch.Version;
910
import org.elasticsearch.common.Nullable;
1011
import org.elasticsearch.common.ParseField;
1112
import org.elasticsearch.common.io.stream.StreamInput;
@@ -39,12 +40,19 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState {
3940
@Nullable
4041
private final TreeMap<String, Object> currentPosition;
4142

43+
// Flag holds the state of the ID scheme, e.g. if it has been upgraded to the
44+
// concatenation scheme. See #32372 for more details
45+
private boolean upgradedDocumentID;
46+
4247
private static final ParseField STATE = new ParseField("job_state");
4348
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
49+
private static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
4450

4551
public static final ConstructingObjectParser<RollupJobStatus, Void> PARSER =
4652
new ConstructingObjectParser<>(NAME,
47-
args -> new RollupJobStatus((IndexerState) args[0], (HashMap<String, Object>) args[1]));
53+
args -> new RollupJobStatus((IndexerState) args[0],
54+
(HashMap<String, Object>) args[1],
55+
(Boolean)args[2]));
4856

4957
static {
5058
PARSER.declareField(constructorArg(), p -> {
@@ -62,16 +70,28 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState {
6270
}
6371
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
6472
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
73+
74+
// Optional to accommodate old versions of state
75+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), UPGRADED_DOC_ID);
6576
}
6677

67-
public RollupJobStatus(IndexerState state, @Nullable Map<String, Object> position) {
78+
public RollupJobStatus(IndexerState state, @Nullable Map<String, Object> position,
79+
@Nullable Boolean upgradedDocumentID) {
6880
this.state = state;
6981
this.currentPosition = position == null ? null : new TreeMap<>(position);
82+
this.upgradedDocumentID = upgradedDocumentID != null ? upgradedDocumentID : false; //default to false if missing
7083
}
7184

7285
public RollupJobStatus(StreamInput in) throws IOException {
7386
state = IndexerState.fromStream(in);
7487
currentPosition = in.readBoolean() ? new TreeMap<>(in.readMap()) : null;
88+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { //TODO change this after backport
89+
upgradedDocumentID = in.readBoolean();
90+
} else {
91+
// If we're getting this job from a pre-6.4.0 node,
92+
// it is using the old ID scheme
93+
upgradedDocumentID = false;
94+
}
7595
}
7696

7797
public IndexerState getIndexerState() {
@@ -82,6 +102,10 @@ public Map<String, Object> getPosition() {
82102
return currentPosition;
83103
}
84104

105+
public boolean isUpgradedDocumentID() {
106+
return upgradedDocumentID;
107+
}
108+
85109
public static RollupJobStatus fromXContent(XContentParser parser) {
86110
try {
87111
return PARSER.parse(parser, null);
@@ -97,6 +121,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
97121
if (currentPosition != null) {
98122
builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
99123
}
124+
builder.field(UPGRADED_DOC_ID.getPreferredName(), upgradedDocumentID);
100125
builder.endObject();
101126
return builder;
102127
}
@@ -113,6 +138,9 @@ public void writeTo(StreamOutput out) throws IOException {
113138
if (currentPosition != null) {
114139
out.writeMap(currentPosition);
115140
}
141+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { //TODO change this after backport
142+
out.writeBoolean(upgradedDocumentID);
143+
}
116144
}
117145

118146
@Override
@@ -128,11 +156,12 @@ public boolean equals(Object other) {
128156
RollupJobStatus that = (RollupJobStatus) other;
129157

130158
return Objects.equals(this.state, that.state)
131-
&& Objects.equals(this.currentPosition, that.currentPosition);
159+
&& Objects.equals(this.currentPosition, that.currentPosition)
160+
&& Objects.equals(this.upgradedDocumentID, that.upgradedDocumentID);
132161
}
133162

134163
@Override
135164
public int hashCode() {
136-
return Objects.hash(state, currentPosition);
165+
return Objects.hash(state, currentPosition, upgradedDocumentID);
137166
}
138167
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ protected GetRollupJobsAction.JobWrapper createTestInstance() {
4141

4242
return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(),
4343
new RollupJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()),
44-
new RollupJobStatus(state, Collections.emptyMap()));
44+
new RollupJobStatus(state, Collections.emptyMap(), randomBoolean()));
4545
}
4646
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatusTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import org.elasticsearch.common.io.stream.Writeable;
99
import org.elasticsearch.common.xcontent.XContentParser;
1010
import org.elasticsearch.test.AbstractSerializingTestCase;
11-
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
12-
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
1311

1412
import java.util.HashMap;
1513
import java.util.Map;
@@ -35,7 +33,7 @@ private Map<String, Object> randomPosition() {
3533

3634
@Override
3735
protected RollupJobStatus createTestInstance() {
38-
return new RollupJobStatus(randomFrom(IndexerState.values()), randomPosition());
36+
return new RollupJobStatus(randomFrom(IndexerState.values()), randomPosition(), randomBoolean());
3937
}
4038

4139
@Override

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,14 @@
8080
public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin {
8181

8282
public static final String BASE_PATH = "/_xpack/rollup/";
83-
public static final int ROLLUP_VERSION = 1;
83+
84+
// Introduced in ES version 6.3
85+
public static final int ROLLUP_VERSION_V1 = 1;
86+
// Introduced in ES Version 6.4
87+
// Bumped due to ID collision, see #32372
88+
public static final int ROLLUP_VERSION_V2 = 2;
89+
public static final int CURRENT_ROLLUP_VERSION = ROLLUP_VERSION_V2;
90+
8491
public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
8592
public static final String SCHEDULE_THREAD_POOL_NAME = RollupField.NAME + "_scheduler";
8693

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,9 @@ static MultiSearchRequest createMSearchRequest(SearchRequest request, NamedWrite
191191
copiedSource.query(new BoolQueryBuilder()
192192
.must(rewritten)
193193
.filter(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), id))
194-
.filter(new TermQueryBuilder(RollupField.formatMetaField(RollupField.VERSION_FIELD), Rollup.ROLLUP_VERSION)));
194+
// Both versions are acceptable right now since they are compatible at search time
195+
.filter(new TermsQueryBuilder(RollupField.formatMetaField(RollupField.VERSION_FIELD),
196+
new long[]{Rollup.ROLLUP_VERSION_V1, Rollup.ROLLUP_VERSION_V2})));
195197

196198
// And add a new msearch per JobID
197199
msearch.add(new SearchRequest(context.getRollupIndices(), copiedSource).types(request.types()));

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.apache.log4j.Logger;
99
import org.elasticsearch.ElasticsearchException;
1010
import org.elasticsearch.action.index.IndexRequest;
11-
import org.elasticsearch.common.Numbers;
1211
import org.elasticsearch.search.aggregations.Aggregation;
1312
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
1413
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
@@ -21,14 +20,12 @@
2120
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
2221
import org.elasticsearch.xpack.rollup.Rollup;
2322

24-
import java.nio.charset.StandardCharsets;
2523
import java.util.ArrayList;
2624
import java.util.HashMap;
2725
import java.util.List;
2826
import java.util.Map;
2927
import java.util.TreeMap;
3028
import java.util.stream.Collectors;
31-
import java.util.zip.CRC32;
3229

3330
/**
3431
* These utilities are used to convert agg responses into a set of rollup documents.
@@ -41,12 +38,16 @@ class IndexerUtils {
4138
* The only entry point in this class. You hand this method an aggregation and an index
4239
* pattern, and it returns a list of rolled documents that you can index
4340
*
44-
* @param agg The aggregation response that you want to rollup
45-
* @param rollupIndex The index that holds rollups for this job
41+
* @param agg The aggregation response that you want to rollup
42+
* @param rollupIndex The index that holds rollups for this job
43+
* @param stats The stats accumulator for this job's task
44+
* @param groupConfig The grouping configuration for the job
45+
* @param jobId The ID for the job
46+
* @param isUpgradedDocID `true` if this job is using the new ID scheme
4647
* @return A list of rolled documents derived from the response
4748
*/
4849
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats,
49-
GroupConfig groupConfig, String jobId) {
50+
GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) {
5051

5152
logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]");
5253
return agg.getBuckets().stream().map(b ->{
@@ -57,24 +58,30 @@ static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollup
5758
TreeMap<String, Object> keys = new TreeMap<>(b.getKey());
5859
List<Aggregation> metrics = b.getAggregations().asList();
5960

61+
RollupIDGenerator idGenerator;
62+
if (isUpgradedDocID) {
63+
idGenerator = new RollupIDGenerator.Murmur3(jobId);
64+
} else {
65+
idGenerator = new RollupIDGenerator.CRC();
66+
}
6067
Map<String, Object> doc = new HashMap<>(keys.size() + metrics.size());
61-
CRC32 docId = processKeys(keys, doc, b.getDocCount(), groupConfig);
62-
byte[] vs = jobId.getBytes(StandardCharsets.UTF_8);
63-
docId.update(vs, 0, vs.length);
68+
69+
processKeys(keys, doc, b.getDocCount(), groupConfig, idGenerator);
70+
idGenerator.add(jobId);
6471
processMetrics(metrics, doc);
6572

66-
doc.put(RollupField.ROLLUP_META + "." + RollupField.VERSION_FIELD, Rollup.ROLLUP_VERSION);
73+
doc.put(RollupField.ROLLUP_META + "." + RollupField.VERSION_FIELD,
74+
isUpgradedDocID ? Rollup.CURRENT_ROLLUP_VERSION : Rollup.ROLLUP_VERSION_V1);
6775
doc.put(RollupField.ROLLUP_META + "." + RollupField.ID.getPreferredName(), jobId);
6876

69-
IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, String.valueOf(docId.getValue()));
77+
IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, idGenerator.getID());
7078
request.source(doc);
7179
return request;
7280
}).collect(Collectors.toList());
7381
}
7482

75-
private static CRC32 processKeys(Map<String, Object> keys, Map<String, Object> doc, long count, GroupConfig groupConfig) {
76-
CRC32 docID = new CRC32();
77-
83+
private static void processKeys(Map<String, Object> keys, Map<String, Object> doc,
84+
long count, GroupConfig groupConfig, RollupIDGenerator idGenerator) {
7885
keys.forEach((k, v) -> {
7986
// Also add a doc count for each key. This will duplicate data, but makes search easier later
8087
doc.put(k + "." + RollupField.COUNT_FIELD, count);
@@ -83,37 +90,34 @@ private static CRC32 processKeys(Map<String, Object> keys, Map<String, Object> d
8390
assert v != null;
8491
doc.put(k + "." + RollupField.TIMESTAMP, v);
8592
doc.put(k + "." + RollupField.INTERVAL, groupConfig.getDateHisto().getInterval());
86-
doc.put(k + "." + DateHistogramGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone());
87-
docID.update(Numbers.longToBytes((Long)v), 0, 8);
93+
doc.put(k + "." + DateHistogramGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone().toString());
94+
idGenerator.add((Long)v);
8895
} else if (k.endsWith("." + HistogramAggregationBuilder.NAME)) {
8996
doc.put(k + "." + RollupField.VALUE, v);
9097
doc.put(k + "." + RollupField.INTERVAL, groupConfig.getHisto().getInterval());
9198
if (v == null) {
92-
// Arbitrary value to update the doc ID with for nulls
93-
docID.update(19);
99+
idGenerator.addNull();
94100
} else {
95-
docID.update(Numbers.doubleToBytes((Double) v), 0, 8);
101+
idGenerator.add((Double) v);
96102
}
97103
} else if (k.endsWith("." + TermsAggregationBuilder.NAME)) {
98104
doc.put(k + "." + RollupField.VALUE, v);
99105
if (v == null) {
100-
// Arbitrary value to update the doc ID with for nulls
101-
docID.update(19);
106+
idGenerator.addNull();
102107
} else if (v instanceof String) {
103-
byte[] vs = ((String) v).getBytes(StandardCharsets.UTF_8);
104-
docID.update(vs, 0, vs.length);
108+
idGenerator.add((String)v);
105109
} else if (v instanceof Long) {
106-
docID.update(Numbers.longToBytes((Long)v), 0, 8);
110+
idGenerator.add((Long)v);
107111
} else if (v instanceof Double) {
108-
docID.update(Numbers.doubleToBytes((Double)v), 0, 8);
112+
idGenerator.add((Double)v);
109113
} else {
110-
throw new RuntimeException("Encountered value of type [" + v.getClass() + "], which was unable to be processed.");
114+
throw new RuntimeException("Encountered value of type ["
115+
+ v.getClass() + "], which was unable to be processed.");
111116
}
112117
} else {
113118
throw new ElasticsearchException("Could not identify key in agg [" + k + "]");
114119
}
115120
});
116-
return docID;
117121
}
118122

119123
private static void processMetrics(List<Aggregation> metrics, Map<String, Object> doc) {

0 commit comments

Comments
 (0)