diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index c9246223f0923..e20f21b4cecfc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -19,9 +19,9 @@ package org.elasticsearch.cluster; +import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.cluster.DiffableUtils.KeyedReader; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -469,6 +469,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endArray(); + builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS); + for (IntObjectCursor> cursor : indexMetaData.getActiveAllocationIds()) { + builder.startArray(String.valueOf(cursor.key)); + for (String allocationId : cursor.value) { + builder.value(allocationId); + } + builder.endArray(); + } + builder.endObject(); + builder.endObject(); } builder.endObject(); @@ -584,6 +594,7 @@ public Builder nodes(DiscoveryNodes nodes) { public Builder routingResult(RoutingAllocation.Result routingResult) { this.routingTable = routingResult.routingTable(); + this.metaData = routingResult.metaData(); return this; } @@ -759,7 +770,7 @@ public ClusterStateDiff(ClusterState before, ClusterState after) { nodes = after.nodes.diff(before.nodes); metaData = after.metaData.diff(before.metaData); blocks = after.blocks.diff(before.blocks); - customs = DiffableUtils.diff(before.customs, after.customs); + customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer()); } public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException { @@ -771,14 +782,15 @@ public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException { nodes = proto.nodes.readDiffFrom(in); metaData = proto.metaData.readDiffFrom(in); blocks = proto.blocks.readDiffFrom(in); - customs = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader() { + customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), + new DiffableUtils.DiffableValueSerializer() { @Override - public Custom readFrom(StreamInput in, String key) throws IOException { + public Custom read(StreamInput in, String key) throws IOException { return lookupPrototypeSafe(key).readFrom(in); } @Override - public Diff readDiffFrom(StreamInput in, String key) throws IOException { + public Diff readDiff(StreamInput in, String key) throws IOException { return lookupPrototypeSafe(key).readDiffFrom(in); } }); diff --git a/core/src/main/java/org/elasticsearch/cluster/Diff.java b/core/src/main/java/org/elasticsearch/cluster/Diff.java index 1a9fff246a984..76535a4b763c8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/Diff.java +++ b/core/src/main/java/org/elasticsearch/cluster/Diff.java @@ -29,7 +29,7 @@ public interface Diff { /** - * Applies difference to the specified part and retunrs the resulted part + * Applies difference to the specified part and returns the resulted part */ T apply(T part); diff --git a/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java index 84e0021ee00ff..1488f0594379b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java +++ b/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java @@ -19,263 +19,630 @@ package org.elasticsearch.cluster; +import com.carrotsearch.hppc.cursors.IntCursor; +import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public final class DiffableUtils { private DiffableUtils() { } + /** + * Returns a map key serializer for String keys + */ + public static KeySerializer getStringKeySerializer() { + return StringKeySerializer.INSTANCE; + } + + /** + * Returns a map key serializer for Integer keys. Encodes as Int. + */ + public static KeySerializer getIntKeySerializer() { + return IntKeySerializer.INSTANCE; + } + + /** + * Returns a map key serializer for Integer keys. Encodes as VInt. + */ + public static KeySerializer getVIntKeySerializer() { + return VIntKeySerializer.INSTANCE; + } + /** * Calculates diff between two ImmutableOpenMaps of Diffable objects */ - public static > Diff> diff(ImmutableOpenMap before, ImmutableOpenMap after) { + public static > MapDiff> diff(ImmutableOpenMap before, ImmutableOpenMap after, KeySerializer keySerializer) { assert after != null && before != null; - return new ImmutableOpenMapDiff<>(before, after); + return new ImmutableOpenMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance()); + } + + /** + * Calculates diff between two ImmutableOpenMaps of non-diffable objects + */ + public static MapDiff> diff(ImmutableOpenMap before, ImmutableOpenMap after, KeySerializer keySerializer, NonDiffableValueSerializer valueSerializer) { + assert after != null && before != null; + return new ImmutableOpenMapDiff<>(before, after, keySerializer, valueSerializer); + } + + /** + * Calculates diff between two ImmutableOpenIntMaps of Diffable objects + */ + public static > MapDiff> diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after, KeySerializer keySerializer) { + assert after != null && before != null; + return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance()); + } + + /** + * Calculates diff between two ImmutableOpenIntMaps of non-diffable objects + */ + public static MapDiff> diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after, KeySerializer keySerializer, NonDiffableValueSerializer valueSerializer) { + assert after != null && before != null; + return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, valueSerializer); } /** * Calculates diff between two Maps of Diffable objects. */ - public static > Diff> diff(Map before, Map after) { + public static > MapDiff> diff(Map before, Map after, KeySerializer keySerializer) { assert after != null && before != null; - return new JdkMapDiff<>(before, after); + return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance()); } /** - * Loads an object that represents difference between two ImmutableOpenMaps + * Calculates diff between two Maps of non-diffable objects */ - public static > Diff> readImmutableOpenMapDiff(StreamInput in, KeyedReader keyedReader) throws IOException { - return new ImmutableOpenMapDiff<>(in, keyedReader); + public static MapDiff> diff(Map before, Map after, KeySerializer keySerializer, NonDiffableValueSerializer valueSerializer) { + assert after != null && before != null; + return new JdkMapDiff<>(before, after, keySerializer, valueSerializer); } /** - * Loads an object that represents difference between two Maps. + * Loads an object that represents difference between two ImmutableOpenMaps */ - public static > Diff> readJdkMapDiff(StreamInput in, KeyedReader keyedReader) throws IOException { - return new JdkMapDiff<>(in, keyedReader); + public static MapDiff> readImmutableOpenMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + return new ImmutableOpenMapDiff<>(in, keySerializer, valueSerializer); } /** * Loads an object that represents difference between two ImmutableOpenMaps */ - public static > Diff> readImmutableOpenMapDiff(StreamInput in, T proto) throws IOException { - return new ImmutableOpenMapDiff<>(in, new PrototypeReader<>(proto)); + public static MapDiff> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + return new ImmutableOpenIntMapDiff<>(in, keySerializer, valueSerializer); } /** - * Loads an object that represents difference between two Maps. + * Loads an object that represents difference between two Maps of Diffable objects */ - public static > Diff> readJdkMapDiff(StreamInput in, T proto) throws IOException { - return new JdkMapDiff<>(in, new PrototypeReader<>(proto)); + public static MapDiff> readJdkMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + return new JdkMapDiff<>(in, keySerializer, valueSerializer); } /** - * A reader that can deserialize an object. The reader can select the deserialization type based on the key. It's - * used in custom metadata deserialization. + * Loads an object that represents difference between two ImmutableOpenMaps of Diffable objects using Diffable proto object */ - public interface KeyedReader { + public static > MapDiff> readImmutableOpenMapDiff(StreamInput in, KeySerializer keySerializer, T proto) throws IOException { + return new ImmutableOpenMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto)); + } - /** - * reads an object of the type T from the stream input - */ - T readFrom(StreamInput in, String key) throws IOException; + /** + * Loads an object that represents difference between two ImmutableOpenIntMaps of Diffable objects using Diffable proto object + */ + public static > MapDiff> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer keySerializer, T proto) throws IOException { + return new ImmutableOpenIntMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto)); + } - /** - * reads an object that respresents differences between two objects with the type T from the stream input - */ - Diff readDiffFrom(StreamInput in, String key) throws IOException; + /** + * Loads an object that represents difference between two Maps of Diffable objects using Diffable proto object + */ + public static > MapDiff> readJdkMapDiff(StreamInput in, KeySerializer keySerializer, T proto) throws IOException { + return new JdkMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto)); } /** - * Implementation of the KeyedReader that is using a prototype object for reading operations + * Represents differences between two Maps of (possibly diffable) objects. * - * Note: this implementation is ignoring the key. + * @param the diffable object */ - public static class PrototypeReader> implements KeyedReader { - private T proto; + private static class JdkMapDiff extends MapDiff> { - public PrototypeReader(T proto) { - this.proto = proto; + protected JdkMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + super(in, keySerializer, valueSerializer); } - @Override - public T readFrom(StreamInput in, String key) throws IOException { - return proto.readFrom(in); + public JdkMapDiff(Map before, Map after, + KeySerializer keySerializer, ValueSerializer valueSerializer) { + super(keySerializer, valueSerializer); + assert after != null && before != null; + + for (K key : before.keySet()) { + if (!after.containsKey(key)) { + deletes.add(key); + } + } + + for (Map.Entry partIter : after.entrySet()) { + T beforePart = before.get(partIter.getKey()); + if (beforePart == null) { + upserts.put(partIter.getKey(), partIter.getValue()); + } else if (partIter.getValue().equals(beforePart) == false) { + if (valueSerializer.supportsDiffableValues()) { + diffs.put(partIter.getKey(), valueSerializer.diff(partIter.getValue(), beforePart)); + } else { + upserts.put(partIter.getKey(), partIter.getValue()); + } + } + } } @Override - public Diff readDiffFrom(StreamInput in, String key) throws IOException { - return proto.readDiffFrom(in); + public Map apply(Map map) { + Map builder = new HashMap<>(); + builder.putAll(map); + + for (K part : deletes) { + builder.remove(part); + } + + for (Map.Entry> diff : diffs.entrySet()) { + builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey()))); + } + + for (Map.Entry upsert : upserts.entrySet()) { + builder.put(upsert.getKey(), upsert.getValue()); + } + return builder; } } /** - * Represents differences between two Maps of Diffable objects. + * Represents differences between two ImmutableOpenMap of (possibly diffable) objects * - * @param the diffable object + * @param the object type */ - private static class JdkMapDiff> extends MapDiff> { + private static class ImmutableOpenMapDiff extends MapDiff> { - protected JdkMapDiff(StreamInput in, KeyedReader reader) throws IOException { - super(in, reader); + protected ImmutableOpenMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + super(in, keySerializer, valueSerializer); } - public JdkMapDiff(Map before, Map after) { + public ImmutableOpenMapDiff(ImmutableOpenMap before, ImmutableOpenMap after, + KeySerializer keySerializer, ValueSerializer valueSerializer) { + super(keySerializer, valueSerializer); assert after != null && before != null; - for (String key : before.keySet()) { - if (!after.containsKey(key)) { - deletes.add(key); + + for (ObjectCursor key : before.keys()) { + if (!after.containsKey(key.value)) { + deletes.add(key.value); } } - for (Map.Entry partIter : after.entrySet()) { - T beforePart = before.get(partIter.getKey()); + + for (ObjectObjectCursor partIter : after) { + T beforePart = before.get(partIter.key); if (beforePart == null) { - adds.put(partIter.getKey(), partIter.getValue()); - } else if (partIter.getValue().equals(beforePart) == false) { - diffs.put(partIter.getKey(), partIter.getValue().diff(beforePart)); + upserts.put(partIter.key, partIter.value); + } else if (partIter.value.equals(beforePart) == false) { + if (valueSerializer.supportsDiffableValues()) { + diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart)); + } else { + upserts.put(partIter.key, partIter.value); + } } } } @Override - public Map apply(Map map) { - Map builder = new HashMap<>(); + public ImmutableOpenMap apply(ImmutableOpenMap map) { + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.putAll(map); - for (String part : deletes) { + for (K part : deletes) { builder.remove(part); } - for (Map.Entry> diff : diffs.entrySet()) { + for (Map.Entry> diff : diffs.entrySet()) { builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey()))); } - for (Map.Entry additon : adds.entrySet()) { - builder.put(additon.getKey(), additon.getValue()); + for (Map.Entry upsert : upserts.entrySet()) { + builder.put(upsert.getKey(), upsert.getValue()); } - return builder; + return builder.build(); } } /** - * Represents differences between two ImmutableOpenMap of diffable objects + * Represents differences between two ImmutableOpenIntMap of (possibly diffable) objects * - * @param the diffable object + * @param the object type */ - private static class ImmutableOpenMapDiff> extends MapDiff> { + private static class ImmutableOpenIntMapDiff extends MapDiff> { - protected ImmutableOpenMapDiff(StreamInput in, KeyedReader reader) throws IOException { - super(in, reader); + protected ImmutableOpenIntMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + super(in, keySerializer, valueSerializer); } - public ImmutableOpenMapDiff(ImmutableOpenMap before, ImmutableOpenMap after) { + public ImmutableOpenIntMapDiff(ImmutableOpenIntMap before, ImmutableOpenIntMap after, + KeySerializer keySerializer, ValueSerializer valueSerializer) { + super(keySerializer, valueSerializer); assert after != null && before != null; - for (ObjectCursor key : before.keys()) { + + for (IntCursor key : before.keys()) { if (!after.containsKey(key.value)) { deletes.add(key.value); } } - for (ObjectObjectCursor partIter : after) { + + for (IntObjectCursor partIter : after) { T beforePart = before.get(partIter.key); if (beforePart == null) { - adds.put(partIter.key, partIter.value); + upserts.put(partIter.key, partIter.value); } else if (partIter.value.equals(beforePart) == false) { - diffs.put(partIter.key, partIter.value.diff(beforePart)); + if (valueSerializer.supportsDiffableValues()) { + diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart)); + } else { + upserts.put(partIter.key, partIter.value); + } } } } @Override - public ImmutableOpenMap apply(ImmutableOpenMap map) { - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + public ImmutableOpenIntMap apply(ImmutableOpenIntMap map) { + ImmutableOpenIntMap.Builder builder = ImmutableOpenIntMap.builder(); builder.putAll(map); - for (String part : deletes) { + for (Integer part : deletes) { builder.remove(part); } - for (Map.Entry> diff : diffs.entrySet()) { + for (Map.Entry> diff : diffs.entrySet()) { builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey()))); } - for (Map.Entry additon : adds.entrySet()) { - builder.put(additon.getKey(), additon.getValue()); + for (Map.Entry upsert : upserts.entrySet()) { + builder.put(upsert.getKey(), upsert.getValue()); } return builder.build(); } } /** - * Represents differences between two maps of diffable objects + * Represents differences between two maps of objects and is used as base class for different map implementations. * - * This class is used as base class for different map implementations + * Implements serialization. How differences are applied is left to subclasses. * - * @param the diffable object + * @param the type of map keys + * @param the type of map values + * @param the map implementation type */ - private static abstract class MapDiff, M> implements Diff { + public static abstract class MapDiff implements Diff { - protected final List deletes; - protected final Map> diffs; - protected final Map adds; + protected final List deletes; + protected final Map> diffs; // incremental updates + protected final Map upserts; // additions or full updates + protected final KeySerializer keySerializer; + protected final ValueSerializer valueSerializer; - protected MapDiff() { + protected MapDiff(KeySerializer keySerializer, ValueSerializer valueSerializer) { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; deletes = new ArrayList<>(); diffs = new HashMap<>(); - adds = new HashMap<>(); + upserts = new HashMap<>(); } - protected MapDiff(StreamInput in, KeyedReader reader) throws IOException { + protected MapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; deletes = new ArrayList<>(); diffs = new HashMap<>(); - adds = new HashMap<>(); + upserts = new HashMap<>(); int deletesCount = in.readVInt(); for (int i = 0; i < deletesCount; i++) { - deletes.add(in.readString()); + deletes.add(keySerializer.readKey(in)); } - int diffsCount = in.readVInt(); for (int i = 0; i < diffsCount; i++) { - String key = in.readString(); - Diff diff = reader.readDiffFrom(in, key); + K key = keySerializer.readKey(in); + Diff diff = valueSerializer.readDiff(in, key); diffs.put(key, diff); } - - int addsCount = in.readVInt(); - for (int i = 0; i < addsCount; i++) { - String key = in.readString(); - T part = reader.readFrom(in, key); - adds.put(key, part); + int upsertsCount = in.readVInt(); + for (int i = 0; i < upsertsCount; i++) { + K key = keySerializer.readKey(in); + T newValue = valueSerializer.read(in, key); + upserts.put(key, newValue); } } + + /** + * The keys that, when this diff is applied to a map, should be removed from the map. + * + * @return the list of keys that are deleted + */ + public List getDeletes() { + return deletes; + } + + /** + * Map entries that, when this diff is applied to a map, should be + * incrementally updated. The incremental update is represented using + * the {@link Diff} interface. + * + * @return the map entries that are incrementally updated + */ + public Map> getDiffs() { + return diffs; + } + + /** + * Map entries that, when this diff is applied to a map, should be + * added to the map or fully replace the previous value. + * + * @return the map entries that are additions or full updates + */ + public Map getUpserts() { + return upserts; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(deletes.size()); - for (String delete : deletes) { - out.writeString(delete); + for (K delete : deletes) { + keySerializer.writeKey(delete, out); } - out.writeVInt(diffs.size()); - for (Map.Entry> entry : diffs.entrySet()) { - out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + for (Map.Entry> entry : diffs.entrySet()) { + keySerializer.writeKey(entry.getKey(), out); + valueSerializer.writeDiff(entry.getValue(), out); + } + out.writeVInt(upserts.size()); + for (Map.Entry entry : upserts.entrySet()) { + keySerializer.writeKey(entry.getKey(), out); + valueSerializer.write(entry.getValue(), out); } + } + } + + /** + * Provides read and write operations to serialize keys of map + * @param type of key + */ + public interface KeySerializer { + void writeKey(K key, StreamOutput out) throws IOException; + K readKey(StreamInput in) throws IOException; + } + + /** + * Serializes String keys of a map + */ + private static final class StringKeySerializer implements KeySerializer { + private static final StringKeySerializer INSTANCE = new StringKeySerializer(); + + @Override + public void writeKey(String key, StreamOutput out) throws IOException { + out.writeString(key); + } - out.writeVInt(adds.size()); - for (Map.Entry entry : adds.entrySet()) { - out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + @Override + public String readKey(StreamInput in) throws IOException { + return in.readString(); + } + } + + /** + * Serializes Integer keys of a map as an Int + */ + private static final class IntKeySerializer implements KeySerializer { + public static final IntKeySerializer INSTANCE = new IntKeySerializer(); + + @Override + public void writeKey(Integer key, StreamOutput out) throws IOException { + out.writeInt(key); + } + + @Override + public Integer readKey(StreamInput in) throws IOException { + return in.readInt(); + } + } + + /** + * Serializes Integer keys of a map as a VInt. Requires keys to be positive. + */ + private static final class VIntKeySerializer implements KeySerializer { + public static final IntKeySerializer INSTANCE = new IntKeySerializer(); + + @Override + public void writeKey(Integer key, StreamOutput out) throws IOException { + if (key < 0) { + throw new IllegalArgumentException("Map key [" + key + "] must be positive"); } + out.writeVInt(key); + } + + @Override + public Integer readKey(StreamInput in) throws IOException { + return in.readVInt(); + } + } + + /** + * Provides read and write operations to serialize map values. + * Reading of values can be made dependent on map key. + * + * Also provides operations to distinguish whether map values are diffable. + * + * Should not be directly implemented, instead implement either + * {@link DiffableValueSerializer} or {@link NonDiffableValueSerializer}. + * + * @param key type of map + * @param value type of map + */ + public interface ValueSerializer { + + /** + * Writes value to stream + */ + void write(V value, StreamOutput out) throws IOException; + + /** + * Reads value from stream. Reading operation can be made dependent on map key. + */ + V read(StreamInput in, K key) throws IOException; + + /** + * Whether this serializer supports diffable values + */ + boolean supportsDiffableValues(); + + /** + * Computes diff if this serializer supports diffable values + */ + Diff diff(V value, V beforePart); + + /** + * Writes value as diff to stream if this serializer supports diffable values + */ + void writeDiff(Diff value, StreamOutput out) throws IOException; + + /** + * Reads value as diff from stream if this serializer supports diffable values. + * Reading operation can be made dependent on map key. + */ + Diff readDiff(StreamInput in, K key) throws IOException; + } + + /** + * Serializer for Diffable map values. Needs to implement read and readDiff methods. + * + * @param type of map keys + * @param type of map values + */ + public static abstract class DiffableValueSerializer> implements ValueSerializer { + private static final DiffableValueSerializer WRITE_ONLY_INSTANCE = new DiffableValueSerializer() { + @Override + public Object read(StreamInput in, Object key) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Diff readDiff(StreamInput in, Object key) throws IOException { + throw new UnsupportedOperationException(); + } + }; + + private static > DiffableValueSerializer getWriteOnlyInstance() { + return WRITE_ONLY_INSTANCE; + } + + @Override + public boolean supportsDiffableValues() { + return true; + } + + @Override + public Diff diff(V value, V beforePart) { + return value.diff(beforePart); + } + + @Override + public void write(V value, StreamOutput out) throws IOException { + value.writeTo(out); + } + + public void writeDiff(Diff value, StreamOutput out) throws IOException { + value.writeTo(out); + } + } + + /** + * Serializer for non-diffable map values + * + * @param type of map keys + * @param type of map values + */ + public static abstract class NonDiffableValueSerializer implements ValueSerializer { + @Override + public boolean supportsDiffableValues() { + return false; + } + + @Override + public Diff diff(V value, V beforePart) { + throw new UnsupportedOperationException(); + } + + @Override + public void writeDiff(Diff value, StreamOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Diff readDiff(StreamInput in, K key) throws IOException { + throw new UnsupportedOperationException(); + } + } + + /** + * Implementation of the ValueSerializer that uses a prototype object for reading operations + * + * Note: this implementation is ignoring the key. + */ + public static class DiffablePrototypeValueReader> extends DiffableValueSerializer { + private final V proto; + + public DiffablePrototypeValueReader(V proto) { + this.proto = proto; + } + + @Override + public V read(StreamInput in, K key) throws IOException { + return proto.readFrom(in); + } + + @Override + public Diff readDiff(StreamInput in, K key) throws IOException { + return proto.readDiffFrom(in); + } + } + + /** + * Implementation of ValueSerializer that serializes immutable sets + * + * @param type of map key + */ + public static class StringSetValueSerializer extends NonDiffableValueSerializer> { + private static final StringSetValueSerializer INSTANCE = new StringSetValueSerializer(); + + public static StringSetValueSerializer getInstance() { + return INSTANCE; + } + + @Override + public void write(Set value, StreamOutput out) throws IOException { + out.writeStringArray(value.toArray(new String[value.size()])); + } + + @Override + public Set read(StreamInput in, K key) throws IOException { + return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(in.readStringArray()))); } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 42e9a4b2244b5..669d71477caaf 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; +import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; @@ -30,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeFilters; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.compress.CompressedXContent; @@ -46,10 +48,13 @@ import java.io.IOException; import java.text.ParseException; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Set; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; @@ -168,6 +173,8 @@ public static State fromString(String state) { public static final String SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE = "index.shared_filesystem.recover_on_any_node"; public static final String INDEX_UUID_NA_VALUE = "_na_"; + public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations"; + private final int numberOfShards; private final int numberOfReplicas; @@ -184,6 +191,8 @@ public static State fromString(String state) { private final ImmutableOpenMap customs; + private final ImmutableOpenIntMap> activeAllocationIds; + private transient final int totalNumberOfShards; private final DiscoveryNodeFilters requireFilters; @@ -194,65 +203,29 @@ public static State fromString(String state) { private final Version indexUpgradedVersion; private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion; - private IndexMetaData(String index, long version, State state, Settings settings, ImmutableOpenMap mappings, ImmutableOpenMap aliases, ImmutableOpenMap customs) { - Integer maybeNumberOfShards = settings.getAsInt(SETTING_NUMBER_OF_SHARDS, null); - if (maybeNumberOfShards == null) { - throw new IllegalArgumentException("must specify numberOfShards for index [" + index + "]"); - } - int numberOfShards = maybeNumberOfShards; - if (numberOfShards <= 0) { - throw new IllegalArgumentException("must specify positive number of shards for index [" + index + "]"); - } + private IndexMetaData(String index, long version, State state, int numberOfShards, int numberOfReplicas, Settings settings, + ImmutableOpenMap mappings, ImmutableOpenMap aliases, + ImmutableOpenMap customs, ImmutableOpenIntMap> activeAllocationIds, + DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters, + Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion) { - Integer maybeNumberOfReplicas = settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, null); - if (maybeNumberOfReplicas == null) { - throw new IllegalArgumentException("must specify numberOfReplicas for index [" + index + "]"); - } - int numberOfReplicas = maybeNumberOfReplicas; - if (numberOfReplicas < 0) { - throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]"); - } this.index = index; this.version = version; this.state = state; - this.settings = settings; - this.mappings = mappings; - this.customs = customs; this.numberOfShards = numberOfShards; this.numberOfReplicas = numberOfReplicas; this.totalNumberOfShards = numberOfShards * (numberOfReplicas + 1); + this.settings = settings; + this.mappings = mappings; + this.customs = customs; this.aliases = aliases; - - Map requireMap = settings.getByPrefix("index.routing.allocation.require.").getAsMap(); - if (requireMap.isEmpty()) { - requireFilters = null; - } else { - requireFilters = DiscoveryNodeFilters.buildFromKeyValue(AND, requireMap); - } - Map includeMap = settings.getByPrefix("index.routing.allocation.include.").getAsMap(); - if (includeMap.isEmpty()) { - includeFilters = null; - } else { - includeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, includeMap); - } - Map excludeMap = settings.getByPrefix("index.routing.allocation.exclude.").getAsMap(); - if (excludeMap.isEmpty()) { - excludeFilters = null; - } else { - excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap); - } - indexCreatedVersion = Version.indexCreated(settings); - indexUpgradedVersion = settings.getAsVersion(IndexMetaData.SETTING_VERSION_UPGRADED, indexCreatedVersion); - String stringLuceneVersion = settings.get(SETTING_VERSION_MINIMUM_COMPATIBLE); - if (stringLuceneVersion != null) { - try { - this.minimumCompatibleLuceneVersion = org.apache.lucene.util.Version.parse(stringLuceneVersion); - } catch (ParseException ex) { - throw new IllegalStateException("Cannot parse lucene version [" + stringLuceneVersion + "] in the [" + SETTING_VERSION_MINIMUM_COMPATIBLE +"] setting", ex); - } - } else { - this.minimumCompatibleLuceneVersion = null; - } + this.activeAllocationIds = activeAllocationIds; + this.requireFilters = requireFilters; + this.includeFilters = includeFilters; + this.excludeFilters = excludeFilters; + this.indexCreatedVersion = indexCreatedVersion; + this.indexUpgradedVersion = indexUpgradedVersion; + this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion; } public String getIndex() { @@ -364,6 +337,15 @@ public T custom(String type) { return (T) customs.get(type); } + public ImmutableOpenIntMap> getActiveAllocationIds() { + return activeAllocationIds; + } + + public Set activeAllocationIds(int shardId) { + assert shardId >= 0 && shardId < numberOfShards; + return activeAllocationIds.get(shardId); + } + @Nullable public DiscoveryNodeFilters requireFilters() { return requireFilters; @@ -408,6 +390,9 @@ public boolean equals(Object o) { if (!customs.equals(that.customs)) { return false; } + if (!activeAllocationIds.equals(that.activeAllocationIds)) { + return false; + } return true; } @@ -418,6 +403,7 @@ public int hashCode() { result = 31 * result + aliases.hashCode(); result = 31 * result + settings.hashCode(); result = 31 * result + mappings.hashCode(); + result = 31 * result + activeAllocationIds.hashCode(); return result; } @@ -450,16 +436,19 @@ private static class IndexMetaDataDiff implements Diff { private final Settings settings; private final Diff> mappings; private final Diff> aliases; - private Diff> customs; + private final Diff> customs; + private final Diff>> activeAllocationIds; public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) { index = after.index; version = after.version; state = after.state; settings = after.settings; - mappings = DiffableUtils.diff(before.mappings, after.mappings); - aliases = DiffableUtils.diff(before.aliases, after.aliases); - customs = DiffableUtils.diff(before.customs, after.customs); + mappings = DiffableUtils.diff(before.mappings, after.mappings, DiffableUtils.getStringKeySerializer()); + aliases = DiffableUtils.diff(before.aliases, after.aliases, DiffableUtils.getStringKeySerializer()); + customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer()); + activeAllocationIds = DiffableUtils.diff(before.activeAllocationIds, after.activeAllocationIds, + DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance()); } public IndexMetaDataDiff(StreamInput in) throws IOException { @@ -467,19 +456,22 @@ public IndexMetaDataDiff(StreamInput in) throws IOException { version = in.readLong(); state = State.fromId(in.readByte()); settings = Settings.readSettingsFromStream(in); - mappings = DiffableUtils.readImmutableOpenMapDiff(in, MappingMetaData.PROTO); - aliases = DiffableUtils.readImmutableOpenMapDiff(in, AliasMetaData.PROTO); - customs = DiffableUtils.readImmutableOpenMapDiff(in, new DiffableUtils.KeyedReader() { + mappings = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), MappingMetaData.PROTO); + aliases = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), AliasMetaData.PROTO); + customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), + new DiffableUtils.DiffableValueSerializer() { @Override - public Custom readFrom(StreamInput in, String key) throws IOException { + public Custom read(StreamInput in, String key) throws IOException { return lookupPrototypeSafe(key).readFrom(in); } @Override - public Diff readDiffFrom(StreamInput in, String key) throws IOException { + public Diff readDiff(StreamInput in, String key) throws IOException { return lookupPrototypeSafe(key).readDiffFrom(in); } }); + activeAllocationIds = DiffableUtils.readImmutableOpenIntMapDiff(in, DiffableUtils.getVIntKeySerializer(), + DiffableUtils.StringSetValueSerializer.getInstance()); } @Override @@ -491,6 +483,7 @@ public void writeTo(StreamOutput out) throws IOException { mappings.writeTo(out); aliases.writeTo(out); customs.writeTo(out); + activeAllocationIds.writeTo(out); } @Override @@ -502,6 +495,7 @@ public IndexMetaData apply(IndexMetaData part) { builder.mappings.putAll(mappings.apply(part.mappings)); builder.aliases.putAll(aliases.apply(part.aliases)); builder.customs.putAll(customs.apply(part.customs)); + builder.activeAllocationIds.putAll(activeAllocationIds.apply(part.activeAllocationIds)); return builder.build(); } } @@ -528,6 +522,12 @@ public IndexMetaData readFrom(StreamInput in) throws IOException { Custom customIndexMetaData = lookupPrototypeSafe(type).readFrom(in); builder.putCustom(type, customIndexMetaData); } + int activeAllocationIdsSize = in.readVInt(); + for (int i = 0; i < activeAllocationIdsSize; i++) { + int key = in.readVInt(); + Set allocationIds = DiffableUtils.StringSetValueSerializer.getInstance().read(in, key); + builder.putActiveAllocationIds(key, allocationIds); + } return builder.build(); } @@ -550,6 +550,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(cursor.key); cursor.value.writeTo(out); } + out.writeVInt(activeAllocationIds.size()); + for (IntObjectCursor> cursor : activeAllocationIds) { + out.writeVInt(cursor.key); + DiffableUtils.StringSetValueSerializer.getInstance().write(cursor.value, out); + } } public static Builder builder(String index) { @@ -569,12 +574,14 @@ public static class Builder { private final ImmutableOpenMap.Builder mappings; private final ImmutableOpenMap.Builder aliases; private final ImmutableOpenMap.Builder customs; + private final ImmutableOpenIntMap.Builder> activeAllocationIds; public Builder(String index) { this.index = index; this.mappings = ImmutableOpenMap.builder(); this.aliases = ImmutableOpenMap.builder(); this.customs = ImmutableOpenMap.builder(); + this.activeAllocationIds = ImmutableOpenIntMap.builder(); } public Builder(IndexMetaData indexMetaData) { @@ -585,6 +592,7 @@ public Builder(IndexMetaData indexMetaData) { this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings); this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases); this.customs = ImmutableOpenMap.builder(indexMetaData.customs); + this.activeAllocationIds = ImmutableOpenIntMap.builder(indexMetaData.activeAllocationIds); } public String index() { @@ -693,6 +701,15 @@ public Custom getCustom(String type) { return this.customs.get(type); } + public Builder putActiveAllocationIds(int shardId, Set allocationIds) { + activeAllocationIds.put(shardId, new HashSet(allocationIds)); + return this; + } + + public Set getActiveAllocationIds(int shardId) { + return activeAllocationIds.get(shardId); + } + public long version() { return this.version; } @@ -714,7 +731,72 @@ public IndexMetaData build() { } } - return new IndexMetaData(index, version, state, tmpSettings, mappings.build(), tmpAliases.build(), customs.build()); + Integer maybeNumberOfShards = settings.getAsInt(SETTING_NUMBER_OF_SHARDS, null); + if (maybeNumberOfShards == null) { + throw new IllegalArgumentException("must specify numberOfShards for index [" + index + "]"); + } + int numberOfShards = maybeNumberOfShards; + if (numberOfShards <= 0) { + throw new IllegalArgumentException("must specify positive number of shards for index [" + index + "]"); + } + + Integer maybeNumberOfReplicas = settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, null); + if (maybeNumberOfReplicas == null) { + throw new IllegalArgumentException("must specify numberOfReplicas for index [" + index + "]"); + } + int numberOfReplicas = maybeNumberOfReplicas; + if (numberOfReplicas < 0) { + throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]"); + } + + // fill missing slots in activeAllocationIds with empty set if needed and make all entries immutable + ImmutableOpenIntMap.Builder> filledActiveAllocationIds = ImmutableOpenIntMap.builder(); + for (int i = 0; i < numberOfShards; i++) { + if (activeAllocationIds.containsKey(i)) { + filledActiveAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(activeAllocationIds.get(i)))); + } else { + filledActiveAllocationIds.put(i, Collections.emptySet()); + } + } + + Map requireMap = settings.getByPrefix("index.routing.allocation.require.").getAsMap(); + final DiscoveryNodeFilters requireFilters; + if (requireMap.isEmpty()) { + requireFilters = null; + } else { + requireFilters = DiscoveryNodeFilters.buildFromKeyValue(AND, requireMap); + } + Map includeMap = settings.getByPrefix("index.routing.allocation.include.").getAsMap(); + final DiscoveryNodeFilters includeFilters; + if (includeMap.isEmpty()) { + includeFilters = null; + } else { + includeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, includeMap); + } + Map excludeMap = settings.getByPrefix("index.routing.allocation.exclude.").getAsMap(); + final DiscoveryNodeFilters excludeFilters; + if (excludeMap.isEmpty()) { + excludeFilters = null; + } else { + excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap); + } + Version indexCreatedVersion = Version.indexCreated(settings); + Version indexUpgradedVersion = settings.getAsVersion(IndexMetaData.SETTING_VERSION_UPGRADED, indexCreatedVersion); + String stringLuceneVersion = settings.get(SETTING_VERSION_MINIMUM_COMPATIBLE); + final org.apache.lucene.util.Version minimumCompatibleLuceneVersion; + if (stringLuceneVersion != null) { + try { + minimumCompatibleLuceneVersion = org.apache.lucene.util.Version.parse(stringLuceneVersion); + } catch (ParseException ex) { + throw new IllegalStateException("Cannot parse lucene version [" + stringLuceneVersion + "] in the [" + SETTING_VERSION_MINIMUM_COMPATIBLE +"] setting", ex); + } + } else { + minimumCompatibleLuceneVersion = null; + } + + return new IndexMetaData(index, version, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(), + tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters, + indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion); } public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { @@ -757,6 +839,15 @@ public static void toXContent(IndexMetaData indexMetaData, XContentBuilder build } builder.endObject(); + builder.startObject(KEY_ACTIVE_ALLOCATIONS); + for (IntObjectCursor> cursor : indexMetaData.activeAllocationIds) { + builder.startArray(String.valueOf(cursor.key)); + for (String allocationId : cursor.value) { + builder.value(allocationId); + } + builder.endArray(); + } + builder.endObject(); builder.endObject(); } @@ -792,6 +883,21 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti while (parser.nextToken() != XContentParser.Token.END_OBJECT) { builder.putAlias(AliasMetaData.Builder.fromXContent(parser)); } + } else if (KEY_ACTIVE_ALLOCATIONS.equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + String shardId = currentFieldName; + Set allocationIds = new HashSet<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.VALUE_STRING) { + allocationIds.add(parser.text()); + } + } + builder.putActiveAllocationIds(Integer.valueOf(shardId), allocationIds); + } + } } else { // check if its a custom index metadata Custom proto = lookupPrototype(currentFieldName); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index d4e7baac79073..657259747dde6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.Diffable; import org.elasticsearch.cluster.DiffableUtils; -import org.elasticsearch.cluster.DiffableUtils.KeyedReader; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -55,7 +54,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.warmer.IndexWarmersMetaData; @@ -641,9 +639,9 @@ public MetaDataDiff(MetaData before, MetaData after) { version = after.version; transientSettings = after.transientSettings; persistentSettings = after.persistentSettings; - indices = DiffableUtils.diff(before.indices, after.indices); - templates = DiffableUtils.diff(before.templates, after.templates); - customs = DiffableUtils.diff(before.customs, after.customs); + indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer()); + templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer()); + customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer()); } public MetaDataDiff(StreamInput in) throws IOException { @@ -651,16 +649,17 @@ public MetaDataDiff(StreamInput in) throws IOException { version = in.readLong(); transientSettings = Settings.readSettingsFromStream(in); persistentSettings = Settings.readSettingsFromStream(in); - indices = DiffableUtils.readImmutableOpenMapDiff(in, IndexMetaData.PROTO); - templates = DiffableUtils.readImmutableOpenMapDiff(in, IndexTemplateMetaData.PROTO); - customs = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader() { + indices = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexMetaData.PROTO); + templates = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexTemplateMetaData.PROTO); + customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), + new DiffableUtils.DiffableValueSerializer() { @Override - public Custom readFrom(StreamInput in, String key) throws IOException { + public Custom read(StreamInput in, String key) throws IOException { return lookupPrototypeSafe(key).readFrom(in); } @Override - public Diff readDiffFrom(StreamInput in, String key) throws IOException { + public Diff readDiff(StreamInput in, String key) throws IOException { return lookupPrototypeSafe(key).readDiffFrom(in); } }); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 1a582e63ad2e4..c210539bc5839 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -314,12 +314,12 @@ private static class RoutingTableDiff implements Diff { public RoutingTableDiff(RoutingTable before, RoutingTable after) { version = after.version; - indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting); + indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting, DiffableUtils.getStringKeySerializer()); } public RoutingTableDiff(StreamInput in) throws IOException { version = in.readLong(); - indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, IndexRoutingTable.PROTO); + indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexRoutingTable.PROTO); } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f819d6fde0acf..feafb76a5f297 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -27,7 +27,14 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -39,6 +46,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -79,24 +88,83 @@ public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, Li StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards, clusterInfoService.getClusterInfo()); boolean changed = applyStartedShards(routingNodes, startedShards); if (!changed) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } shardsAllocators.applyStartedShards(allocation); if (withReroute) { reroute(allocation); } - RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); - RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString()); logClusterHealthStateChange( new ClusterStateHealth(clusterState), - new ClusterStateHealth(clusterState.metaData(), routingTable), + new ClusterStateHealth(clusterState.metaData(), result.routingTable()), "shards started [" + startedShardsAsString + "] ..." ); return result; } + + protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) { + return buildChangedResult(metaData, routingNodes, new RoutingExplanations()); + + } + protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) { + final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build(); + MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable); + return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations); + } + + /** + * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. + * + * @param currentMetaData {@link MetaData} object from before the routing table was changed. + * @param newRoutingTable new {@link RoutingTable} created by the allocation change + * @return adapted {@link MetaData}, potentially the original one if no change was needed. + */ + static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) { + // make sure index meta data and routing tables are in sync w.r.t active allocation ids + MetaData.Builder metaDataBuilder = null; + for (IndexRoutingTable indexRoutingTable : newRoutingTable) { + final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex()); + if (indexMetaData == null) { + throw new IllegalStateException("no metadata found for index [" + indexRoutingTable.index() + "]"); + } + IndexMetaData.Builder indexMetaDataBuilder = null; + for (IndexShardRoutingTable shardRoutings : indexRoutingTable) { + Set activeAllocationIds = shardRoutings.activeShards().stream() + .map(ShardRouting::allocationId) + .filter(Objects::nonNull) + .map(AllocationId::getId) + .collect(Collectors.toSet()); + // only update active allocation ids if there is an active shard + if (activeAllocationIds.isEmpty() == false) { + // get currently stored allocation ids + Set storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id()); + if (activeAllocationIds.equals(storedAllocationIds) == false) { + if (indexMetaDataBuilder == null) { + indexMetaDataBuilder = IndexMetaData.builder(indexMetaData); + } + + indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds); + } + } + } + if (indexMetaDataBuilder != null) { + if (metaDataBuilder == null) { + metaDataBuilder = MetaData.builder(currentMetaData); + } + metaDataBuilder.put(indexMetaDataBuilder); + } + } + if (metaDataBuilder != null) { + return metaDataBuilder.build(); + } else { + return currentMetaData; + } + } + public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null))); } @@ -117,16 +185,15 @@ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, Lis System.nanoTime(), System.currentTimeMillis())); } if (!changed) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } shardsAllocators.applyFailedShards(allocation); reroute(allocation); - RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); - RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString()); logClusterHealthStateChange( new ClusterStateHealth(clusterState), - new ClusterStateHealth(clusterState.getMetaData(), routingTable), + new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), "shards failed [" + failedShardsAsString + "] ..." ); return result; @@ -169,11 +236,10 @@ public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCom // the assumption is that commands will move / act on shards (or fail through exceptions) // so, there will always be shard "movements", so no need to check on reroute reroute(allocation); - RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); - RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable, explanations); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations); logClusterHealthStateChange( new ClusterStateHealth(clusterState), - new ClusterStateHealth(clusterState.getMetaData(), routingTable), + new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), "reroute commands" ); return result; @@ -200,13 +266,12 @@ protected RoutingAllocation.Result reroute(ClusterState clusterState, String rea RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime()); allocation.debugDecision(debug); if (!reroute(allocation)) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } - RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()); - RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); logClusterHealthStateChange( new ClusterStateHealth(clusterState), - new ClusterStateHealth(clusterState.getMetaData(), routingTable), + new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), reason ); return result; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index a8a546d946edd..4e6ba0fb5ada0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -52,29 +52,33 @@ public static class Result { private final RoutingTable routingTable; + private final MetaData metaData; + private RoutingExplanations explanations = new RoutingExplanations(); /** * Creates a new {@link RoutingAllocation.Result} - * * @param changed a flag to determine whether the actual {@link RoutingTable} has been changed * @param routingTable the {@link RoutingTable} this Result references + * @param metaData the {@link MetaData} this Result references */ - public Result(boolean changed, RoutingTable routingTable) { + public Result(boolean changed, RoutingTable routingTable, MetaData metaData) { this.changed = changed; this.routingTable = routingTable; + this.metaData = metaData; } /** * Creates a new {@link RoutingAllocation.Result} - * * @param changed a flag to determine whether the actual {@link RoutingTable} has been changed * @param routingTable the {@link RoutingTable} this Result references + * @param metaData the {@link MetaData} this Result references * @param explanations Explanation for the reroute actions */ - public Result(boolean changed, RoutingTable routingTable, RoutingExplanations explanations) { + public Result(boolean changed, RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) { this.changed = changed; this.routingTable = routingTable; + this.metaData = metaData; this.explanations = explanations; } @@ -85,6 +89,14 @@ public boolean changed() { return this.changed; } + /** + * Get the {@link MetaData} referenced by this result + * @return referenced {@link MetaData} + */ + public MetaData metaData() { + return metaData; + } + /** * Get the {@link RoutingTable} referenced by this result * @return referenced {@link RoutingTable} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ActiveAllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ActiveAllocationIdTests.java new file mode 100644 index 0000000000000..7a7f4722e9759 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ActiveAllocationIdTests.java @@ -0,0 +1,81 @@ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.test.ESAllocationTestCase; + +import java.util.Arrays; +import java.util.HashSet; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.hamcrest.Matchers.equalTo; + +public class ActiveAllocationIdTests extends ESAllocationTestCase { + + public void testActiveAllocationIdsUpdated() { + AllocationService allocation = createAllocationService(); + + logger.info("creating an index with 1 shard, 2 replicas"); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + // add index metadata where we have no routing nodes to check that allocation ids are not removed + .put(IndexMetaData.builder("test-old").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2) + .putActiveAllocationIds(0, new HashSet<>(Arrays.asList("x", "y")))) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + logger.info("adding three nodes and performing rerouting"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put( + newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build(); + RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + + assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(0)); + assertThat(clusterState.metaData().index("test-old").activeAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y")))); + + logger.info("start primary shard"); + rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + + assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(1)); + assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).get(0).allocationId().getId(), + equalTo(clusterState.metaData().index("test").activeAllocationIds(0).iterator().next())); + assertThat(clusterState.metaData().index("test-old").activeAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y")))); + + logger.info("start replica shards"); + rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + + assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(3)); + + logger.info("remove a node"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .remove("node1")) + .build(); + rerouteResult = allocation.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + + assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(2)); + + logger.info("remove all remaining nodes"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .remove("node2").remove("node3")) + .build(); + rerouteResult = allocation.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + + // active allocation ids should not be updated + assertThat(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).size(), equalTo(3)); + assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(2)); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java b/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java index 6c120e7ac6cae..b3050392c981e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java @@ -22,68 +22,383 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; -import org.elasticsearch.cluster.DiffableUtils.KeyedReader; +import org.elasticsearch.cluster.DiffableUtils.MapDiff; +import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamableReader; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; public class DiffableTests extends ESTestCase { - public void testJdkMapDiff() throws IOException { - Map before = new HashMap<>(); - before.put("foo", new TestDiffable("1")); - before.put("bar", new TestDiffable("2")); - before.put("baz", new TestDiffable("3")); - before = unmodifiableMap(before); - Map map = new HashMap<>(); - map.putAll(before); - map.remove("bar"); - map.put("baz", new TestDiffable("4")); - map.put("new", new TestDiffable("5")); - Map after = unmodifiableMap(new HashMap<>(map)); - Diff diff = DiffableUtils.diff(before, after); - BytesStreamOutput out = new BytesStreamOutput(); - diff.writeTo(out); - StreamInput in = StreamInput.wrap(out.bytes()); - Map serialized = DiffableUtils.readJdkMapDiff(in, TestDiffable.PROTO).apply(before); - assertThat(serialized.size(), equalTo(3)); - assertThat(serialized.get("foo").value(), equalTo("1")); - assertThat(serialized.get("baz").value(), equalTo("4")); - assertThat(serialized.get("new").value(), equalTo("5")); + + public void testJKDMapDiff() throws IOException { + new JdkMapDriver() { + @Override + protected boolean diffableValues() { + return true; + } + + @Override + protected TestDiffable createValue(Integer key, boolean before) { + return new TestDiffable(String.valueOf(before ? key : key + 1)); + } + + @Override + protected MapDiff diff(Map before, Map after) { + return DiffableUtils.diff(before, after, keySerializer); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return useProtoForDiffableSerialization + ? DiffableUtils.readJdkMapDiff(in, keySerializer, TestDiffable.PROTO) + : DiffableUtils.readJdkMapDiff(in, keySerializer, diffableValueSerializer()); + } + }.execute(); + + new JdkMapDriver() { + @Override + protected boolean diffableValues() { + return false; + } + + @Override + protected String createValue(Integer key, boolean before) { + return String.valueOf(before ? key : key + 1); + } + + @Override + protected MapDiff diff(Map before, Map after) { + return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer()); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return DiffableUtils.readJdkMapDiff(in, keySerializer, nonDiffableValueSerializer()); + } + }.execute(); } public void testImmutableOpenMapDiff() throws IOException { - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); - builder.put("foo", new TestDiffable("1")); - builder.put("bar", new TestDiffable("2")); - builder.put("baz", new TestDiffable("3")); - ImmutableOpenMap before = builder.build(); - builder = ImmutableOpenMap.builder(before); - builder.remove("bar"); - builder.put("baz", new TestDiffable("4")); - builder.put("new", new TestDiffable("5")); - ImmutableOpenMap after = builder.build(); - Diff diff = DiffableUtils.diff(before, after); - BytesStreamOutput out = new BytesStreamOutput(); - diff.writeTo(out); - StreamInput in = StreamInput.wrap(out.bytes()); - ImmutableOpenMap serialized = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader() { - @Override - public TestDiffable readFrom(StreamInput in, String key) throws IOException { + new ImmutableOpenMapDriver() { + @Override + protected boolean diffableValues() { + return true; + } + + @Override + protected TestDiffable createValue(Integer key, boolean before) { + return new TestDiffable(String.valueOf(before ? key : key + 1)); + } + + @Override + protected MapDiff diff(ImmutableOpenMap before, ImmutableOpenMap after) { + return DiffableUtils.diff(before, after, keySerializer); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return useProtoForDiffableSerialization + ? DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, TestDiffable.PROTO) + : DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, diffableValueSerializer()); + } + }.execute(); + + new ImmutableOpenMapDriver() { + @Override + protected boolean diffableValues() { + return false; + } + + @Override + protected String createValue(Integer key, boolean before) { + return String.valueOf(before ? key : key + 1); + } + + @Override + protected MapDiff diff(ImmutableOpenMap before, ImmutableOpenMap after) { + return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer()); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, nonDiffableValueSerializer()); + } + }.execute(); + } + + public void testImmutableOpenIntMapDiff() throws IOException { + new ImmutableOpenIntMapDriver() { + @Override + protected boolean diffableValues() { + return true; + } + + @Override + protected TestDiffable createValue(Integer key, boolean before) { + return new TestDiffable(String.valueOf(before ? key : key + 1)); + } + + @Override + protected MapDiff diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after) { + return DiffableUtils.diff(before, after, keySerializer); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return useProtoForDiffableSerialization + ? DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, TestDiffable.PROTO) + : DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, diffableValueSerializer()); + } + }.execute(); + + new ImmutableOpenIntMapDriver() { + @Override + protected boolean diffableValues() { + return false; + } + + @Override + protected String createValue(Integer key, boolean before) { + return String.valueOf(before ? key : key + 1); + } + + @Override + protected MapDiff diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after) { + return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer()); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, nonDiffableValueSerializer()); + } + }.execute(); + } + + /** + * Class that abstracts over specific map implementation type and value kind (Diffable or not) + * @param map type + * @param value type + */ + public abstract class MapDriver { + protected final Set keys = randomPositiveIntSet(); + protected final Set keysToRemove = new HashSet<>(randomSubsetOf(randomInt(keys.size()), keys.toArray(new Integer[keys.size()]))); + protected final Set keysThatAreNotRemoved = Sets.difference(keys, keysToRemove); + protected final Set keysToOverride = new HashSet<>(randomSubsetOf(randomInt(keysThatAreNotRemoved.size()), + keysThatAreNotRemoved.toArray(new Integer[keysThatAreNotRemoved.size()]))); + protected final Set keysToAdd = Sets.difference(randomPositiveIntSet(), keys); // make sure keysToAdd does not contain elements in keys + protected final Set keysUnchanged = Sets.difference(keysThatAreNotRemoved, keysToOverride); + + protected final DiffableUtils.KeySerializer keySerializer = randomBoolean() + ? DiffableUtils.getIntKeySerializer() + : DiffableUtils.getVIntKeySerializer(); + + protected final boolean useProtoForDiffableSerialization = randomBoolean(); + + private Set randomPositiveIntSet() { + int maxSetSize = randomInt(6); + Set result = new HashSet<>(); + for (int i = 0; i < maxSetSize; i++) { + // due to duplicates, set size can be smaller than maxSetSize + result.add(randomIntBetween(0, 100)); + } + return result; + } + + /** + * whether we operate on {@link org.elasticsearch.cluster.Diffable} values + */ + protected abstract boolean diffableValues(); + + /** + * functions that determines value in "before" or "after" map based on key + */ + protected abstract V createValue(Integer key, boolean before); + + /** + * creates map based on JDK-based map + */ + protected abstract T createMap(Map values); + + /** + * calculates diff between two maps + */ + protected abstract MapDiff diff(T before, T after); + + /** + * reads diff of maps from stream + */ + protected abstract MapDiff readDiff(StreamInput in) throws IOException; + + /** + * gets element at key "key" in map "map" + */ + protected abstract V get(T map, Integer key); + + /** + * returns size of given map + */ + protected abstract int size(T map); + + /** + * executes the actual test + */ + public void execute() throws IOException { + logger.debug("Keys in 'before' map: {}", keys); + logger.debug("Keys to remove: {}", keysToRemove); + logger.debug("Keys to override: {}", keysToOverride); + logger.debug("Keys to add: {}", keysToAdd); + + logger.debug("--> creating 'before' map"); + Map before = new HashMap<>(); + for (Integer key : keys) { + before.put(key, createValue(key, true)); + } + T beforeMap = createMap(before); + + logger.debug("--> creating 'after' map"); + Map after = new HashMap<>(); + after.putAll(before); + for (Integer key : keysToRemove) { + after.remove(key); + } + for (Integer key : keysToOverride) { + after.put(key, createValue(key, false)); + } + for (Integer key : keysToAdd) { + after.put(key, createValue(key, false)); + } + T afterMap = createMap(unmodifiableMap(after)); + + MapDiff diffMap = diff(beforeMap, afterMap); + + // check properties of diffMap + assertThat(new HashSet(diffMap.getDeletes()), equalTo(keysToRemove)); + if (diffableValues()) { + assertThat(diffMap.getDiffs().keySet(), equalTo(keysToOverride)); + for (Integer key : keysToOverride) { + assertThat(diffMap.getDiffs().get(key).apply(get(beforeMap, key)), equalTo(get(afterMap, key))); + } + assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAdd)); + for (Integer key : keysToAdd) { + assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key))); + } + } else { + assertThat(diffMap.getDiffs(), equalTo(emptyMap())); + Set keysToAddAndOverride = Sets.union(keysToAdd, keysToOverride); + assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAddAndOverride)); + for (Integer key : keysToAddAndOverride) { + assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key))); + } + } + + if (randomBoolean()) { + logger.debug("--> serializing diff"); + BytesStreamOutput out = new BytesStreamOutput(); + diffMap.writeTo(out); + StreamInput in = StreamInput.wrap(out.bytes()); + logger.debug("--> reading diff back"); + diffMap = readDiff(in); + } + T appliedDiffMap = diffMap.apply(beforeMap); + + // check properties of appliedDiffMap + assertThat(size(appliedDiffMap), equalTo(keys.size() - keysToRemove.size() + keysToAdd.size())); + for (Integer key : keysToRemove) { + assertThat(get(appliedDiffMap, key), nullValue()); + } + for (Integer key : keysUnchanged) { + assertThat(get(appliedDiffMap, key), equalTo(get(beforeMap, key))); + } + for (Integer key : keysToOverride) { + assertThat(get(appliedDiffMap, key), not(equalTo(get(beforeMap, key)))); + assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key))); + } + for (Integer key : keysToAdd) { + assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key))); + } + } + } + + abstract class JdkMapDriver extends MapDriver, V> { + + @Override + protected Map createMap(Map values) { + return values; + } + + @Override + protected V get(Map map, Integer key) { + return map.get(key); + } + + @Override + protected int size(Map map) { + return map.size(); + } + } + + abstract class ImmutableOpenMapDriver extends MapDriver, V> { + + @Override + protected ImmutableOpenMap createMap(Map values) { + return ImmutableOpenMap.builder().putAll(values).build(); + } + + @Override + protected V get(ImmutableOpenMap map, Integer key) { + return map.get(key); + } + + @Override + protected int size(ImmutableOpenMap map) { + return map.size(); + } + } + + + abstract class ImmutableOpenIntMapDriver extends MapDriver, V> { + + @Override + protected ImmutableOpenIntMap createMap(Map values) { + return ImmutableOpenIntMap.builder().putAll(values).build(); + } + + @Override + protected V get(ImmutableOpenIntMap map, Integer key) { + return map.get(key); + } + + @Override + protected int size(ImmutableOpenIntMap map) { + return map.size(); + } + } + + private static DiffableUtils.DiffableValueSerializer diffableValueSerializer() { + return new DiffableUtils.DiffableValueSerializer() { + @Override + public TestDiffable read(StreamInput in, K key) throws IOException { return new TestDiffable(in.readString()); } @Override - public Diff readDiffFrom(StreamInput in, String key) throws IOException { + public Diff readDiff(StreamInput in, K key) throws IOException { return AbstractDiffable.readDiffFrom(new StreamableReader() { @Override public TestDiffable readFrom(StreamInput in) throws IOException { @@ -91,13 +406,23 @@ public TestDiffable readFrom(StreamInput in) throws IOException { } }, in); } - }).apply(before); - assertThat(serialized.size(), equalTo(3)); - assertThat(serialized.get("foo").value(), equalTo("1")); - assertThat(serialized.get("baz").value(), equalTo("4")); - assertThat(serialized.get("new").value(), equalTo("5")); + }; + } + + private static DiffableUtils.NonDiffableValueSerializer nonDiffableValueSerializer() { + return new DiffableUtils.NonDiffableValueSerializer() { + @Override + public void write(String value, StreamOutput out) throws IOException { + out.writeString(value); + } + @Override + public String read(StreamInput in, K key) throws IOException { + return in.readString(); + } + }; } + public static class TestDiffable extends AbstractDiffable { public static final TestDiffable PROTO = new TestDiffable(""); @@ -121,6 +446,22 @@ public TestDiffable readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(value); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestDiffable that = (TestDiffable) o; + + return !(value != null ? !value.equals(that.value) : that.value != null); + + } + + @Override + public int hashCode() { + return value != null ? value.hashCode() : 0; + } } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 76e6317c1fcda..ea590756a8b46 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -488,17 +488,17 @@ public NoopAllocationService(Settings settings) { @Override public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } @Override public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } @Override protected RoutingAllocation.Result reroute(ClusterState clusterState, String reason, boolean debug) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } }