Skip to content

Persist currently started allocation IDs to index metadata #14964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions core/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -469,6 +469,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endArray();

builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
builder.startArray(String.valueOf(cursor.key));
for (String allocationId : cursor.value) {
builder.value(allocationId);
}
builder.endArray();
}
builder.endObject();

builder.endObject();
}
builder.endObject();
Expand Down Expand Up @@ -584,6 +594,7 @@ public Builder nodes(DiscoveryNodes nodes) {

public Builder routingResult(RoutingAllocation.Result routingResult) {
this.routingTable = routingResult.routingTable();
this.metaData = routingResult.metaData();
return this;
}

Expand Down Expand Up @@ -759,7 +770,7 @@ public ClusterStateDiff(ClusterState before, ClusterState after) {
nodes = after.nodes.diff(before.nodes);
metaData = after.metaData.diff(before.metaData);
blocks = after.blocks.diff(before.blocks);
customs = DiffableUtils.diff(before.customs, after.customs);
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
}

public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException {
Expand All @@ -771,14 +782,15 @@ public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException {
nodes = proto.nodes.readDiffFrom(in);
metaData = proto.metaData.readDiffFrom(in);
blocks = proto.blocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader<Custom>() {
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
@Override
public Custom readFrom(StreamInput in, String key) throws IOException {
public Custom read(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readFrom(in);
}

@Override
public Diff<Custom> readDiffFrom(StreamInput in, String key) throws IOException {
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readDiffFrom(in);
}
});
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/elasticsearch/cluster/Diff.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface Diff<T> {

/**
* Applies difference to the specified part and retunrs the resulted part
* Applies difference to the specified part and returns the resulted part
*/
T apply(T part);

Expand Down
577 changes: 472 additions & 105 deletions core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java

Large diffs are not rendered by default.

228 changes: 167 additions & 61 deletions core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -55,7 +54,6 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
Expand Down Expand Up @@ -641,26 +639,27 @@ public MetaDataDiff(MetaData before, MetaData after) {
version = after.version;
transientSettings = after.transientSettings;
persistentSettings = after.persistentSettings;
indices = DiffableUtils.diff(before.indices, after.indices);
templates = DiffableUtils.diff(before.templates, after.templates);
customs = DiffableUtils.diff(before.customs, after.customs);
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer());
templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer());
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
}

public MetaDataDiff(StreamInput in) throws IOException {
clusterUUID = in.readString();
version = in.readLong();
transientSettings = Settings.readSettingsFromStream(in);
persistentSettings = Settings.readSettingsFromStream(in);
indices = DiffableUtils.readImmutableOpenMapDiff(in, IndexMetaData.PROTO);
templates = DiffableUtils.readImmutableOpenMapDiff(in, IndexTemplateMetaData.PROTO);
customs = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader<Custom>() {
indices = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexMetaData.PROTO);
templates = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexTemplateMetaData.PROTO);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
@Override
public Custom readFrom(StreamInput in, String key) throws IOException {
public Custom read(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readFrom(in);
}

@Override
public Diff<Custom> readDiffFrom(StreamInput in, String key) throws IOException {
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readDiffFrom(in);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,12 @@ private static class RoutingTableDiff implements Diff<RoutingTable> {

public RoutingTableDiff(RoutingTable before, RoutingTable after) {
version = after.version;
indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting);
indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting, DiffableUtils.getStringKeySerializer());
}

public RoutingTableDiff(StreamInput in) throws IOException {
version = in.readLong();
indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, IndexRoutingTable.PROTO);
indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexRoutingTable.PROTO);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
Expand All @@ -39,6 +46,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -79,24 +88,83 @@ public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, Li
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards, clusterInfoService.getClusterInfo());
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
shardsAllocators.applyStartedShards(allocation);
if (withReroute) {
reroute(allocation);
}
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable);
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);

String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.metaData(), routingTable),
new ClusterStateHealth(clusterState.metaData(), result.routingTable()),
"shards started [" + startedShardsAsString + "] ..."
);
return result;
}


protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) {
return buildChangedResult(metaData, routingNodes, new RoutingExplanations());

}
protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) {
final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build();
MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable);
return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations);
}

/**
* Updates the current {@link MetaData} based on the newly created {@link RoutingTable}.
*
* @param currentMetaData {@link MetaData} object from before the routing table was changed.
* @param newRoutingTable new {@link RoutingTable} created by the allocation change
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
*/
static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) {
// make sure index meta data and routing tables are in sync w.r.t active allocation ids
MetaData.Builder metaDataBuilder = null;
for (IndexRoutingTable indexRoutingTable : newRoutingTable) {
final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex());
if (indexMetaData == null) {
throw new IllegalStateException("no metadata found for index [" + indexRoutingTable.index() + "]");
}
IndexMetaData.Builder indexMetaDataBuilder = null;
for (IndexShardRoutingTable shardRoutings : indexRoutingTable) {
Set<String> activeAllocationIds = shardRoutings.activeShards().stream()
.map(ShardRouting::allocationId)
.filter(Objects::nonNull)
.map(AllocationId::getId)
.collect(Collectors.toSet());
// only update active allocation ids if there is an active shard
if (activeAllocationIds.isEmpty() == false) {
// get currently stored allocation ids
Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id());
if (activeAllocationIds.equals(storedAllocationIds) == false) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
}

indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds);
}
}
}
if (indexMetaDataBuilder != null) {
if (metaDataBuilder == null) {
metaDataBuilder = MetaData.builder(currentMetaData);
}
metaDataBuilder.put(indexMetaDataBuilder);
}
}
if (metaDataBuilder != null) {
return metaDataBuilder.build();
} else {
return currentMetaData;
}
}

public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null)));
}
Expand All @@ -117,16 +185,15 @@ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, Lis
System.nanoTime(), System.currentTimeMillis()));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
shardsAllocators.applyFailedShards(allocation);
reroute(allocation);
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable);
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), routingTable),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
"shards failed [" + failedShardsAsString + "] ..."
);
return result;
Expand Down Expand Up @@ -169,11 +236,10 @@ public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCom
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable, explanations);
RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), routingTable),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
"reroute commands"
);
return result;
Expand All @@ -200,13 +266,12 @@ protected RoutingAllocation.Result reroute(ClusterState clusterState, String rea
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable);
RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), routingTable),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
reason
);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,33 @@ public static class Result {

private final RoutingTable routingTable;

private final MetaData metaData;

private RoutingExplanations explanations = new RoutingExplanations();

/**
* Creates a new {@link RoutingAllocation.Result}
*
* @param changed a flag to determine whether the actual {@link RoutingTable} has been changed
* @param routingTable the {@link RoutingTable} this Result references
* @param metaData the {@link MetaData} this Result references
*/
public Result(boolean changed, RoutingTable routingTable) {
public Result(boolean changed, RoutingTable routingTable, MetaData metaData) {
this.changed = changed;
this.routingTable = routingTable;
this.metaData = metaData;
}

/**
* Creates a new {@link RoutingAllocation.Result}
*
* @param changed a flag to determine whether the actual {@link RoutingTable} has been changed
* @param routingTable the {@link RoutingTable} this Result references
* @param metaData the {@link MetaData} this Result references
* @param explanations Explanation for the reroute actions
*/
public Result(boolean changed, RoutingTable routingTable, RoutingExplanations explanations) {
public Result(boolean changed, RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) {
this.changed = changed;
this.routingTable = routingTable;
this.metaData = metaData;
this.explanations = explanations;
}

Expand All @@ -85,6 +89,14 @@ public boolean changed() {
return this.changed;
}

/**
* Get the {@link MetaData} referenced by this result
* @return referenced {@link MetaData}
*/
public MetaData metaData() {
return metaData;
}

/**
* Get the {@link RoutingTable} referenced by this result
* @return referenced {@link RoutingTable}
Expand Down
Loading