From 09aaa545030c5c5e63bb2125d25872a65621c8ed Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 18 Oct 2021 09:38:39 -0400 Subject: [PATCH 1/5] Validate tsdb's routing_path `routing_path`'s job is to put the same time series on the same node. It does that by extracting a String directly from the xcontent, hashing it, and feeding the hash into the shard selection algorithm. I'm happy with it! But it won't work properly if `routing_path` matches non-dimension fields or if it matches non-keyword dimensions. This prevents us from mapping any fields that aren't keyword dimensions that "line up" with `routing_path`. Let's talk about why `routing_path` can't do it's job if it matches any non-dimension fields. Well, imagine `routing_path` is `[dim, foo]` and only `dim` is a dimension. It'll *still* hash `foo`'s values into the routing key. So, say you get documents like: ``` {"dim": "a", "foo": "1"} {"dim": "a", "foo": "1"} {"dim": "a", "foo": "2"} ``` The third document could be routed to a different shard than the first two! Which would be a disaster because it'd cut the time series identified by `"dim":"a"` into pieces! Now let's talk about when `routing_path` matches a non-keyword dimension. Imagine `routing_path` is `[kwd, int]` and we send these documents: ``` {"kwd": "a", "int": "1"} {"kwd": "a", "int": "01"} ``` Both of these documents belong to the time series identified by `"dim":"a","int":1` but the `routing_path` code just reads strings so it'll route them into separate shards. Also bad! Also forbidden by this change. --- .../rest-api-spec/test/tsdb/10_settings.yml | 42 +++++++++++ .../rest-api-spec/test/tsdb/20_mapping.yml | 57 +++++++++++++- .../org/elasticsearch/index/IndexMode.java | 2 + .../index/mapper/DocumentMapper.java | 8 ++ .../index/mapper/KeywordFieldMapper.java | 10 +++ .../elasticsearch/index/mapper/Mapper.java | 28 +++++++ .../index/TimeSeriesModeTests.java | 74 +++++++++++++++++++ .../index/mapper/KeywordFieldMapperTests.java | 14 ++++ .../index/mapper/MapperTestCase.java | 12 +++ 9 files changed, 246 insertions(+), 1 deletion(-) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/10_settings.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/10_settings.yml index 861b3292bd520..5ee3d8cc49bf7 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/10_settings.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/10_settings.yml @@ -168,3 +168,45 @@ routing required: mappings: _routing: required: true + +--- +bad routing_path: + - skip: + version: " - 7.99.99" + reason: introduced in 8.0.0 + + - do: + catch: /All fields that match routing_path must be keyword time_series_dimensions but \[@timestamp\] was \[date\]/ + indices.create: + index: test_index + body: + settings: + index: + mode: time_series + routing_path: [metricset, k8s.pod.uid, "@timestamp"] + number_of_replicas: 0 + number_of_shards: 2 + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + ip: + type: ip + network: + properties: + tx: + type: long + rx: + type: long diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml index 597c6488e6827..3f46f4bb4e359 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml @@ -1,4 +1,4 @@ -add time series mappings: +ecs style: - skip: version: " - 7.99.99" reason: introduced in 8.0.0 @@ -49,3 +49,58 @@ add time series mappings: latency: type: double time_series_metric: gauge + +--- +top level dim object: + - skip: + version: " - 7.99.99" + reason: introduced in 8.0.0 + + - do: + indices.create: + index: tsdb_index + body: + settings: + index: + mode: time_series + routing_path: [dim.*] + number_of_replicas: 0 + number_of_shards: 2 + mappings: + properties: + "@timestamp": + type: date + dim: + properties: + metricset: + type: keyword + time_series_dimension: true + uid: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + availability_zone: + type: short + time_series_dimension: true + name: + type: keyword + ip: + type: ip + time_series_dimension: true + network: + properties: + tx: + type: long + time_series_metric: counter + rx: + type: integer + time_series_metric: gauge + packets_dropped: + type: long + time_series_metric: gauge + latency: + type: double + time_series_metric: gauge diff --git a/server/src/main/java/org/elasticsearch/index/IndexMode.java b/server/src/main/java/org/elasticsearch/index/IndexMode.java index a5680f52e76c0..5d3600242a7e3 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexMode.java +++ b/server/src/main/java/org/elasticsearch/index/IndexMode.java @@ -39,6 +39,7 @@ void validateWithOtherSettings(Map, Object> settings) { } } + @Override public void validateMapping(MappingLookup lookup) {}; @Override @@ -66,6 +67,7 @@ private String error(Setting unsupported) { return tsdbMode() + " is incompatible with [" + unsupported.getKey() + "]"; } + @Override public void validateMapping(MappingLookup lookup) { if (((RoutingFieldMapper) lookup.getMapper(RoutingFieldMapper.NAME)).required()) { throw new IllegalArgumentException(routingRequiredBad()); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index 7a919c21e5dfd..fd4ef67792876 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -10,6 +10,10 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xcontent.support.filtering.FilterPathBasedFilter; + +import java.util.List; +import java.util.Set; public class DocumentMapper { private final String type; @@ -87,6 +91,10 @@ public void validate(IndexSettings settings, boolean checkLimits) { if (settings.getIndexSortConfig().hasIndexSort() && mappers().hasNested()) { throw new IllegalArgumentException("cannot have nested fields when index sort is activated"); } + List routingPaths = settings.getIndexMetadata().getRoutingPaths(); + if (false == routingPaths.isEmpty()) { + mappingLookup.getMapping().getRoot().validateRoutingPath(new FilterPathBasedFilter(Set.copyOf(routingPaths), true)); + } if (checkLimits) { this.mappingLookup.checkLimits(settings); } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java index 2197baffe598a..7a3dbf50e6049 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java @@ -584,4 +584,14 @@ public FieldMapper.Builder getMergeBuilder() { return new Builder(simpleName(), indexAnalyzers, scriptCompiler).dimension(dimension).init(this); } + @Override + protected void validateMatchedRoutingPath() { + if (false == fieldType().isDimension()) { + throw new IllegalArgumentException( + "All fields that match routing_path must be keyword time_series_dimensions but [" + + name() + + "] was not a time_series_dimension" + ); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java index ebb743dca2402..657ff2597b231 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java @@ -8,6 +8,9 @@ package org.elasticsearch.index.mapper; +import com.fasterxml.jackson.core.filter.TokenFilter; + +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.xcontent.ToXContentFragment; import java.util.Map; @@ -66,4 +69,29 @@ public final String simpleName() { */ public abstract void validate(MappingLookup mappers); + /** + * Validate a {@link TokenFilter} made from {@link IndexMetadata#INDEX_ROUTING_PATH}. + */ + public final void validateRoutingPath(TokenFilter filter) { + if (filter == TokenFilter.INCLUDE_ALL) { + validateMatchedRoutingPath(); + } + for (Mapper m : this) { + TokenFilter next = filter.includeProperty(m.simpleName()); + if (next == null) { + // null means "do not include" + continue; + } + m.validateRoutingPath(next); + } + } + + /** + * Validate that this field can be the target of {@link IndexMetadata#INDEX_ROUTING_PATH}. + */ + protected void validateMatchedRoutingPath() { + throw new IllegalArgumentException( + "All fields that match routing_path must be keyword time_series_dimensions but [" + name() + "] was [" + typeName() + "]" + ); + } } diff --git a/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java b/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java index 00d4bc38dfb25..fc878a89f3f71 100644 --- a/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java +++ b/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.MapperServiceTestCase; +import java.io.IOException; + import static org.hamcrest.Matchers.equalTo; public class TimeSeriesModeTests extends MapperServiceTestCase { @@ -103,4 +105,76 @@ public void testValidateAliasWithSearchRouting() { assertThat(e.getMessage(), equalTo("routing is forbidden on CRUD operations that target indices in [index.mode=time_series]")); } + public void testRoutingPathMatchesObject() { + Settings s = Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.o" : "dim.*") + .build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(s, mapping(b -> { + b.startObject("dim").startObject("properties"); + { + b.startObject("o").startObject("properties"); + b.startObject("inner_dim").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.endObject().endObject(); + } + b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.endObject().endObject(); + }))); + assertThat( + e.getMessage(), + equalTo("All fields that match routing_path must be keyword time_series_dimensions but [dim.o] was [object]") + ); + } + + public void testRoutingPathMatchesNonDimensionKeyword() { + Settings s = Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.non_dim" : "dim.*") + .build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(s, mapping(b -> { + b.startObject("dim").startObject("properties"); + b.startObject("non_dim").field("type", "keyword").endObject(); + b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.endObject().endObject(); + }))); + assertThat( + e.getMessage(), + equalTo( + "All fields that match routing_path must be keyword time_series_dimensions but " + + "[dim.non_dim] was not a time_series_dimension" + ) + ); + } + + public void testRoutingPathMatchesNonKeyword() { + Settings s = Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.non_kwd" : "dim.*") + .build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(s, mapping(b -> { + b.startObject("dim").startObject("properties"); + b.startObject("non_kwd").field("type", "integer").field("time_series_dimension", true).endObject(); + b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.endObject().endObject(); + }))); + assertThat( + e.getMessage(), + equalTo("All fields that match routing_path must be keyword time_series_dimensions but [dim.non_kwd] was [integer]") + ); + } + + public void testRoutingPathMatchesOnlyKeywordDimensions() throws IOException { + Settings s = Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.metric_type,dim.server,dim.species,dim.uuid" : "dim.*") + .build(); + createMapperService(s, mapping(b -> { + b.startObject("dim").startObject("properties"); + b.startObject("metric_type").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.startObject("server").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.startObject("species").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.startObject("uuid").field("type", "keyword").field("time_series_dimension", true).endObject(); + b.endObject().endObject(); + })); // doesn't throw + } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java index 3e33f32020169..a5e438b1ad76f 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.index.mapper; +import com.fasterxml.jackson.core.filter.TokenFilter; + import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.MockLowerCaseFilter; import org.apache.lucene.analysis.MockTokenizer; @@ -579,4 +581,16 @@ protected Object generateRandomInputValue(MappedFieldType ft) { protected boolean dedupAfterFetch() { return true; } + + @Override + protected String minimalIsInvalidRoutingPathErrorMessage(Mapper mapper) { + return "All fields that match routing_path must be keyword time_series_dimensions but [field] was not a time_series_dimension"; + } + + public void testDimensionInRoutingPath() throws IOException { + Mapper mapper = createMapperService(fieldMapping(b -> b.field("type", "keyword").field("time_series_dimension", true))) + .mappingLookup() + .getMapper("field"); + mapper.validateRoutingPath(TokenFilter.INCLUDE_ALL); // Doesn't throw + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java index 244aa80ab62d4..c59e6858c890c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java @@ -8,6 +8,8 @@ package org.elasticsearch.index.mapper; +import com.fasterxml.jackson.core.filter.TokenFilter; + import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexableField; @@ -750,4 +752,14 @@ public final void testNullInput() throws Exception { protected boolean allowsNullValues() { return true; } + + public final void testMinimalIsInvalidInRoutingPath() throws IOException { + Mapper mapper = createMapperService(fieldMapping(this::minimalMapping)).mappingLookup().getMapper("field"); + Exception e = expectThrows(IllegalArgumentException.class, () -> mapper.validateRoutingPath(TokenFilter.INCLUDE_ALL)); + assertThat(e.getMessage(), equalTo(minimalIsInvalidRoutingPathErrorMessage(mapper))); + } + + protected String minimalIsInvalidRoutingPathErrorMessage(Mapper mapper) { + return "All fields that match routing_path must be keyword time_series_dimensions but [field] was [" + mapper.typeName() + "]"; + } } From 9a040d421bf17a0731054a7d188514f285bd4260 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 18 Oct 2021 16:37:09 -0400 Subject: [PATCH 2/5] Move routing calculation This slightly moves the routing calculation out of a difficult to reason about `switch` statement and into real OO method implementation. Its a tiny tiny change but it makes me feel much better about it. --- .../java/org/elasticsearch/action/DocWriteRequest.java | 7 +++++++ .../elasticsearch/action/bulk/TransportBulkAction.java | 10 +--------- .../org/elasticsearch/action/delete/DeleteRequest.java | 6 ++++++ .../org/elasticsearch/action/index/IndexRequest.java | 6 ++++++ .../org/elasticsearch/action/update/UpdateRequest.java | 6 ++++++ 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 4c5189a50a896..cdd3fcbac6dbc 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -139,6 +140,12 @@ public interface DocWriteRequest extends IndicesRequest, Accountable { * @return boolean flag, when true specifically requires an alias */ boolean isRequireAlias(); + + /** + * Pick the appropriate shard id to receive this request. + */ + int route(IndexRouting indexRouting); + /** * Requested operation type to perform on the document */ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index bdd8c77f85f24..1b44235417505 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -491,7 +491,6 @@ protected void doRun() { IndexRouting indexRouting = concreteIndices.routing(concreteIndex); - int shardId; switch (docWriteRequest.opType()) { case CREATE: case INDEX: @@ -503,17 +502,10 @@ protected void doRun() { Version indexCreated = indexMetadata.getCreationVersion(); indexRequest.resolveRouting(metadata); indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); - shardId = indexRouting.indexShard( - docWriteRequest.id(), - docWriteRequest.routing(), - indexRequest.getContentType(), - indexRequest.source() - ); break; case UPDATE: TransportUpdateAction.resolveAndValidateRouting(metadata, concreteIndex.getName(), (UpdateRequest) docWriteRequest); - shardId = indexRouting.updateShard(docWriteRequest.id(), docWriteRequest.routing()); break; case DELETE: docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); @@ -521,10 +513,10 @@ protected void doRun() { if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) { throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.id()); } - shardId = indexRouting.deleteShard(docWriteRequest.id(), docWriteRequest.routing()); break; default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } + int shardId = docWriteRequest.route(indexRouting); List shardRequests = requestsByShard.computeIfAbsent( new ShardId(concreteIndex, shardId), shard -> new ArrayList<>() diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index c30b418d44d68..930240449a2d6 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -230,6 +231,11 @@ public boolean isRequireAlias() { return false; } + @Override + public int route(IndexRouting indexRouting) { + return indexRouting.deleteShard(id, routing); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 945b89cb41edf..c422393b81aa2 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -720,6 +721,11 @@ public boolean isRequireAlias() { return requireAlias; } + @Override + public int route(IndexRouting indexRouting) { + return indexRouting.indexShard(id, routing, contentType, source); + } + public IndexRequest setRequireAlias(boolean requireAlias) { this.requireAlias = requireAlias; return this; diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 7b369dae46295..0ad328f19bc64 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; +import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.common.Strings; @@ -817,6 +818,11 @@ public boolean isRequireAlias() { return requireAlias; } + @Override + public int route(IndexRouting indexRouting) { + return indexRouting.updateShard(id, routing); + } + public UpdateRequest setRequireAlias(boolean requireAlias) { this.requireAlias = requireAlias; return this; From 95873eab28a6d3a340ddd579e0c567717e3afb3f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 18 Oct 2021 16:38:58 -0400 Subject: [PATCH 3/5] Revert "Move routing calculation" This reverts commit 9a040d421bf17a0731054a7d188514f285bd4260. --- .../java/org/elasticsearch/action/DocWriteRequest.java | 7 ------- .../elasticsearch/action/bulk/TransportBulkAction.java | 10 +++++++++- .../org/elasticsearch/action/delete/DeleteRequest.java | 6 ------ .../org/elasticsearch/action/index/IndexRequest.java | 6 ------ .../org/elasticsearch/action/update/UpdateRequest.java | 6 ------ 5 files changed, 9 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index cdd3fcbac6dbc..4c5189a50a896 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -140,12 +139,6 @@ public interface DocWriteRequest extends IndicesRequest, Accountable { * @return boolean flag, when true specifically requires an alias */ boolean isRequireAlias(); - - /** - * Pick the appropriate shard id to receive this request. - */ - int route(IndexRouting indexRouting); - /** * Requested operation type to perform on the document */ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 1b44235417505..bdd8c77f85f24 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -491,6 +491,7 @@ protected void doRun() { IndexRouting indexRouting = concreteIndices.routing(concreteIndex); + int shardId; switch (docWriteRequest.opType()) { case CREATE: case INDEX: @@ -502,10 +503,17 @@ protected void doRun() { Version indexCreated = indexMetadata.getCreationVersion(); indexRequest.resolveRouting(metadata); indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); + shardId = indexRouting.indexShard( + docWriteRequest.id(), + docWriteRequest.routing(), + indexRequest.getContentType(), + indexRequest.source() + ); break; case UPDATE: TransportUpdateAction.resolveAndValidateRouting(metadata, concreteIndex.getName(), (UpdateRequest) docWriteRequest); + shardId = indexRouting.updateShard(docWriteRequest.id(), docWriteRequest.routing()); break; case DELETE: docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); @@ -513,10 +521,10 @@ protected void doRun() { if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) { throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.id()); } + shardId = indexRouting.deleteShard(docWriteRequest.id(), docWriteRequest.routing()); break; default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } - int shardId = docWriteRequest.route(indexRouting); List shardRequests = requestsByShard.computeIfAbsent( new ShardId(concreteIndex, shardId), shard -> new ArrayList<>() diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 930240449a2d6..c30b418d44d68 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; -import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -231,11 +230,6 @@ public boolean isRequireAlias() { return false; } - @Override - public int route(IndexRouting indexRouting) { - return indexRouting.deleteShard(id, routing); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index c422393b81aa2..945b89cb41edf 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -21,7 +21,6 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -721,11 +720,6 @@ public boolean isRequireAlias() { return requireAlias; } - @Override - public int route(IndexRouting indexRouting) { - return indexRouting.indexShard(id, routing, contentType, source); - } - public IndexRequest setRequireAlias(boolean requireAlias) { this.requireAlias = requireAlias; return this; diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 0ad328f19bc64..7b369dae46295 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -17,7 +17,6 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; -import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.common.Strings; @@ -818,11 +817,6 @@ public boolean isRequireAlias() { return requireAlias; } - @Override - public int route(IndexRouting indexRouting) { - return indexRouting.updateShard(id, routing); - } - public UpdateRequest setRequireAlias(boolean requireAlias) { this.requireAlias = requireAlias; return this; From c1423a5b8de4a54818f41f258ca4e966007f5e56 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 18 Oct 2021 16:56:05 -0400 Subject: [PATCH 4/5] privacy --- .../index/mapper/DocumentMapper.java | 4 +--- .../elasticsearch/index/mapper/Mapper.java | 12 +++++++++- .../index/mapper/KeywordFieldMapperTests.java | 20 ++++++++++------ .../index/mapper/MapperTestCase.java | 24 ++++++++++++------- 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index fd4ef67792876..4d93660afe85f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -10,10 +10,8 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.xcontent.support.filtering.FilterPathBasedFilter; import java.util.List; -import java.util.Set; public class DocumentMapper { private final String type; @@ -93,7 +91,7 @@ public void validate(IndexSettings settings, boolean checkLimits) { } List routingPaths = settings.getIndexMetadata().getRoutingPaths(); if (false == routingPaths.isEmpty()) { - mappingLookup.getMapping().getRoot().validateRoutingPath(new FilterPathBasedFilter(Set.copyOf(routingPaths), true)); + mappingLookup.getMapping().getRoot().validateRoutingPath(routingPaths); } if (checkLimits) { this.mappingLookup.checkLimits(settings); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java index 657ff2597b231..70a3c5330ca19 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java @@ -12,9 +12,12 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.support.filtering.FilterPathBasedFilter; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; public abstract class Mapper implements ToXContentFragment, Iterable { @@ -72,7 +75,14 @@ public final String simpleName() { /** * Validate a {@link TokenFilter} made from {@link IndexMetadata#INDEX_ROUTING_PATH}. */ - public final void validateRoutingPath(TokenFilter filter) { + public final void validateRoutingPath(List routingPaths) { + validateRoutingPath(new FilterPathBasedFilter(Set.copyOf(routingPaths), true)); + } + + /** + * Validate a {@link TokenFilter} made from {@link IndexMetadata#INDEX_ROUTING_PATH}. + */ + private void validateRoutingPath(TokenFilter filter) { if (filter == TokenFilter.INCLUDE_ALL) { validateMatchedRoutingPath(); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java index a5e438b1ad76f..b7239efe4dc51 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/KeywordFieldMapperTests.java @@ -8,8 +8,6 @@ package org.elasticsearch.index.mapper; -import com.fasterxml.jackson.core.filter.TokenFilter; - import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.MockLowerCaseFilter; import org.apache.lucene.analysis.MockTokenizer; @@ -22,8 +20,10 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexableFieldType; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.CharFilterFactory; @@ -38,6 +38,7 @@ import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.Arrays; @@ -588,9 +589,14 @@ protected String minimalIsInvalidRoutingPathErrorMessage(Mapper mapper) { } public void testDimensionInRoutingPath() throws IOException { - Mapper mapper = createMapperService(fieldMapping(b -> b.field("type", "keyword").field("time_series_dimension", true))) - .mappingLookup() - .getMapper("field"); - mapper.validateRoutingPath(TokenFilter.INCLUDE_ALL); // Doesn't throw + MapperService mapper = createMapperService(fieldMapping(b -> b.field("type", "keyword").field("time_series_dimension", true))); + IndexSettings settings = createIndexSettings( + Version.CURRENT, + Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "field") + .build() + ); + mapper.documentMapper().validate(settings, false); // Doesn't throw } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java index c59e6858c890c..9e8b69122c7b4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java @@ -8,8 +8,6 @@ package org.elasticsearch.index.mapper; -import com.fasterxml.jackson.core.filter.TokenFilter; - import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexableField; @@ -22,13 +20,13 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.xcontent.ToXContent; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.ScriptDocValues; import org.elasticsearch.index.query.SearchExecutionContext; @@ -37,6 +35,9 @@ import org.elasticsearch.search.lookup.LeafStoredFieldsLookup; import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.search.lookup.SourceLookup; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; import java.util.ArrayList; @@ -754,9 +755,16 @@ protected boolean allowsNullValues() { } public final void testMinimalIsInvalidInRoutingPath() throws IOException { - Mapper mapper = createMapperService(fieldMapping(this::minimalMapping)).mappingLookup().getMapper("field"); - Exception e = expectThrows(IllegalArgumentException.class, () -> mapper.validateRoutingPath(TokenFilter.INCLUDE_ALL)); - assertThat(e.getMessage(), equalTo(minimalIsInvalidRoutingPathErrorMessage(mapper))); + MapperService mapper = createMapperService(fieldMapping(this::minimalMapping)); + IndexSettings settings = createIndexSettings( + Version.CURRENT, + Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "field") + .build() + ); + Exception e = expectThrows(IllegalArgumentException.class, () -> mapper.documentMapper().validate(settings, false)); + assertThat(e.getMessage(), equalTo(minimalIsInvalidRoutingPathErrorMessage(mapper.mappingLookup().getMapper("field")))); } protected String minimalIsInvalidRoutingPathErrorMessage(Mapper mapper) { From 2f3d5fd19d674a16c9043fefc90b3505527bdd36 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 18 Oct 2021 17:15:50 -0400 Subject: [PATCH 5/5] Handle warnings --- .../index/mapper/MapperTestCase.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java index 9e8b69122c7b4..069793cda0169 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java @@ -756,15 +756,19 @@ protected boolean allowsNullValues() { public final void testMinimalIsInvalidInRoutingPath() throws IOException { MapperService mapper = createMapperService(fieldMapping(this::minimalMapping)); - IndexSettings settings = createIndexSettings( - Version.CURRENT, - Settings.builder() - .put(IndexSettings.MODE.getKey(), "time_series") - .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "field") - .build() - ); - Exception e = expectThrows(IllegalArgumentException.class, () -> mapper.documentMapper().validate(settings, false)); - assertThat(e.getMessage(), equalTo(minimalIsInvalidRoutingPathErrorMessage(mapper.mappingLookup().getMapper("field")))); + try { + IndexSettings settings = createIndexSettings( + Version.CURRENT, + Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "field") + .build() + ); + Exception e = expectThrows(IllegalArgumentException.class, () -> mapper.documentMapper().validate(settings, false)); + assertThat(e.getMessage(), equalTo(minimalIsInvalidRoutingPathErrorMessage(mapper.mappingLookup().getMapper("field")))); + } finally { + assertParseMinimalWarnings(); + } } protected String minimalIsInvalidRoutingPathErrorMessage(Mapper mapper) {