Skip to content

Change the recommended tie-breaking fields from [_id] to [_seq_no, _shard]. #25797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
final ShardId shardId = replica.shardId();
final SourceToParse sourceToParse =
SourceToParse.source(shardId.getIndexName(),
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType(), replica.shardId().getId())
.routing(indexRequest.routing()).parent(indexRequest.parent());
return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
Expand All @@ -535,7 +535,7 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
final SourceToParse sourceToParse =
SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType())
SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType(), primary.shardId().getId())
.routing(request.routing()).parent(request.parent());
try {
// if a mapping update is required to index this request, issue a mapping update on the master, and abort the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public IndexService(
// The sort order is validated right after the merge of the mapping later in the process.
this.indexSortSupplier = () -> indexSettings.getIndexSortConfig().buildIndexSort(
mapperService::fullName,
indexFieldData::getForField
field -> indexFieldData.getForField(field, -1)
);
} else {
this.indexSortSupplier = () -> null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public TerminationHandle warmReader(final IndexShard indexShard, final Engine.Se
executor.execute(() -> {
try {
final long start = System.nanoTime();
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType, indexShard.shardId().getId());
DirectoryReader reader = searcher.getDirectoryReader();
IndexFieldData<?> global = ifd.loadGlobal(reader);
if (reader.leaves().isEmpty() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public Object missingValue(boolean reversed) {
interface Builder {

IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService);
CircuitBreakerService breakerService, MapperService mapperService, int shardId);
}

interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public synchronized void clearField(final String fieldName) {
}

@SuppressWarnings("unchecked")
public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType) {
public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType, int shardId) {
final String fieldName = fieldType.name();
IndexFieldData.Builder builder = fieldType.fielddataBuilder();

Expand All @@ -126,7 +126,7 @@ public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType
}
}

return (IFD) builder.build(indexSettings, fieldType, cache, circuitBreakerService, mapperService);
return (IFD) builder.build(indexSettings, fieldType, cache, circuitBreakerService, mapperService, shardId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static void checkCompatible(FieldInfo fieldInfo) {
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
CircuitBreakerService breakerService, MapperService mapperService, int shardId) {
// ignore breaker
return new LatLonPointDVIndexFieldData(indexSettings.getIndex(), fieldType.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static class Builder implements IndexFieldData.Builder {

@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
CircuitBreakerService breakerService, MapperService mapperService, int shardId) {
// Ignore breaker
final String fieldName = fieldType.name();
return new BytesBinaryDVIndexFieldData(indexSettings.getIndex(), fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import java.util.function.BiFunction;

public class ConstantIndexFieldData extends AbstractIndexOrdinalsFieldData {

public static class Builder implements IndexFieldData.Builder {

private final Function<MapperService, String> valueFunction;
private final BiFunction<MapperService, Integer, String> valueFunction;

public Builder(Function<MapperService, String> valueFunction) {
public Builder(BiFunction<MapperService, Integer, String> valueFunction) {
this.valueFunction = valueFunction;
}

@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
return new ConstantIndexFieldData(indexSettings, fieldType.name(), valueFunction.apply(mapperService));
CircuitBreakerService breakerService, MapperService mapperService, int shardId) {
return new ConstantIndexFieldData(indexSettings, fieldType.name(), valueFunction.apply(mapperService, shardId));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public Builder scriptFunction(Function<SortedSetDocValues, ScriptDocValues<?>> s

@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
CircuitBreakerService breakerService, MapperService mapperService, int shardId) {
// Ignore Circuit Breaker
final String fieldName = fieldType.name();
if (BINARY_INDEX_FIELD_NAMES.contains(fieldName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.lucene.codecs.blocktree.FieldReader;
import org.apache.lucene.codecs.blocktree.Stats;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
Expand Down Expand Up @@ -66,7 +65,8 @@ public Builder(double minFrequency, double maxFrequency, int minSegmentSize) {

@Override
public IndexOrdinalsFieldData build(IndexSettings indexSettings, MappedFieldType fieldType,
IndexFieldDataCache cache, CircuitBreakerService breakerService, MapperService mapperService) {
IndexFieldDataCache cache, CircuitBreakerService breakerService,
MapperService mapperService, int shardId) {
return new PagedBytesIndexFieldData(indexSettings, fieldType.name(), cache, breakerService,
minFrequency, maxFrequency, minSegmentSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public IndexFieldData.Builder fielddataBuilder() {
return new IndexFieldData.Builder() {
@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
final IndexFieldData<?> fieldData = fieldDataBuilder.build(indexSettings, fieldType, cache, breakerService, mapperService);
CircuitBreakerService breakerService, MapperService mapperService, int shardId) {
final IndexFieldData<?> fieldData = fieldDataBuilder.build(indexSettings, fieldType, cache, breakerService, mapperService, shardId);
if (indexSettings.getIndexVersionCreated().before(Version.V_6_0_0_beta1)) {
// ids were indexed as utf-8
return fieldData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private boolean isSameIndex(Object value, String indexName) {

@Override
public IndexFieldData.Builder fielddataBuilder() {
return new ConstantIndexFieldData.Builder(mapperService -> mapperService.index().getName());
return new ConstantIndexFieldData.Builder((mapperService, shardId) -> mapperService.index().getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ public IndexFieldData.Builder fielddataBuilder() {
return new IndexFieldData.Builder() {
@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
CircuitBreakerService breakerService, MapperService mapperService, int shardId) {
final IndexNumericFieldData scaledValues = (IndexNumericFieldData) new DocValuesIndexFieldData.Builder()
.numericType(IndexNumericFieldData.NumericType.LONG)
.build(indexSettings, fieldType, cache, breakerService, mapperService);
.build(indexSettings, fieldType, cache, breakerService, mapperService, shardId);
return new ScaledFloatIndexFieldData(scaledValues, scalingFactor);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.mapper;

import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.plain.ConstantIndexFieldData;
import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData;
import org.elasticsearch.index.mapper.IdFieldMapper.IdFieldType;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.indices.breaker.CircuitBreakerService;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* A mapper that stores the shard id that documents are assigned to.
* This field is doc-valued instead of being a view like _index so that (shard_id, seq_no) pairs are still unique after shrinking.
*/
public class ShardIdFieldMapper extends MetadataFieldMapper {

public static final String NAME = "_shard";

public static class TypeParser implements MetadataFieldMapper.TypeParser {
@Override
public MetadataFieldMapper.Builder<?,?> parse(String name, Map<String, Object> node,
ParserContext parserContext) throws MapperParsingException {
throw new MapperParsingException(NAME + " is not configurable");
}

@Override
public MetadataFieldMapper getDefault(MappedFieldType fieldType, ParserContext context) {
assert fieldType == null || fieldType.equals(new ShardIdFieldType());
final IndexSettings indexSettings = context.mapperService().getIndexSettings();
return new ShardIdFieldMapper(indexSettings);
}
}

static class ShardIdFieldType extends MappedFieldType {

ShardIdFieldType() {
setName(NAME);
setHasDocValues(true);
setIndexOptions(IndexOptions.NONE);
}

protected ShardIdFieldType(ShardIdFieldType ref) {
super(ref);
}

@Override
public MappedFieldType clone() {
return new ShardIdFieldType(this);
}

@Override
public String typeName() {
return NAME;
}

@Override
public Query termQuery(Object value, QueryShardContext context) {
BytesRef binaryValue;
if (value instanceof BytesRef) {
binaryValue = (BytesRef) value;
} else {
binaryValue = new BytesRef(value.toString());
}

if (context.indexVersionCreated().before(Version.V_6_0_0_beta1)) {
if (binaryValue.bytesEquals(new BytesRef(Integer.toString(context.getShardId())))) {
return new MatchAllDocsQuery();
} else {
return new MatchNoDocsQuery("Different shard than the requested one");
}
} else {
// We use doc values for querying.
// This is fine since in the common case either we are on the right shard
// and we will return the doc values iterator, or we are on a different shard
// and we will return a null scorer since the term can't be found in the terms
// dict.
return SortedDocValuesField.newExactQuery(NAME, binaryValue);
}
}

@Override
public IndexFieldData.Builder fielddataBuilder() {
IndexFieldData.Builder view = new ConstantIndexFieldData.Builder((mapperService, shardId) -> Integer.toString(shardId));
IndexFieldData.Builder dv = new DocValuesIndexFieldData.Builder();
return new IndexFieldData.Builder() {
@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService, int shardId) {
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_6_0_0_beta1)) {
return dv.build(indexSettings, fieldType, cache, breakerService, mapperService, shardId);
} else {
if (shardId == -1) {
throw new IllegalArgumentException("The shard id is not usable with 5.x indices and index sorting or scripts");
}
return view.build(indexSettings, fieldType, cache, breakerService, mapperService, shardId);
}
}
};
}
}

private final boolean pre6x;

protected ShardIdFieldMapper(IndexSettings indexSettings) {
super(NAME, new ShardIdFieldType(), new ShardIdFieldType(), indexSettings.getSettings());
pre6x = indexSettings.getIndexVersionCreated().before(Version.V_6_0_0_beta1);
}

@Override
public void preParse(ParseContext context) throws IOException {
super.parse(context);
}

@Override
public void postParse(ParseContext context) throws IOException {}

@Override
protected void parseCreateField(ParseContext context, List<IndexableField> fields) throws IOException {
if (pre6x == false) {
fields.add(new SortedDocValuesField(NAME, new BytesRef(Integer.toString(context.sourceToParse().shardId()))));
}
}

@Override
protected String contentType() {
return NAME;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// nothing to serialize since it is not configurable
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

public class SourceToParse {

public static SourceToParse source(String index, String type, String id, BytesReference source,
XContentType contentType) {
return new SourceToParse(index, type, id, source, contentType);
public static SourceToParse source(String index, String type, String id,BytesReference source,
XContentType contentType, int shardId) {
return new SourceToParse(index, type, id, source, contentType, shardId);
}

private final BytesReference source;
Expand All @@ -40,20 +40,23 @@ public static SourceToParse source(String index, String type, String id, BytesRe

private final String id;

private final int shardId;

private String routing;

private String parentId;

private XContentType xContentType;

private SourceToParse(String index, String type, String id, BytesReference source, XContentType xContentType) {
private SourceToParse(String index, String type, String id, BytesReference source, XContentType xContentType, int shardId) {
this.index = Objects.requireNonNull(index);
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
// we always convert back to byte array, since we store it and Field only supports bytes..
// so, we might as well do it here, and improve the performance of working with direct byte arrays
this.source = new BytesArray(Objects.requireNonNull(source).toBytesRef());
this.xContentType = Objects.requireNonNull(xContentType);
this.shardId = shardId;
}

public BytesReference source() {
Expand Down Expand Up @@ -94,6 +97,10 @@ public SourceToParse routing(String routing) {
return this;
}

public int shardId() {
return shardId;
}

public enum Origin {
PRIMARY,
REPLICA
Expand Down
Loading