Skip to content

move replicas action functionality into AllocateAction #32523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testSetIndexLifecyclePolicy() throws Exception {
" \"cold\": {\n" +
" \"after\": \"2000s\",\n" +
" \"actions\": {\n" +
" \"replicas\": {\n" +
" \"allocate\": {\n" +
" \"number_of_replicas\": 0\n" +
" }\n" +
" }\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
import org.elasticsearch.xpack.core.indexlifecycle.ReplicasAction;
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
Expand Down Expand Up @@ -408,7 +407,6 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReplicasAction.NAME, ReplicasAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)
Expand Down Expand Up @@ -452,7 +450,6 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReplicasAction.NAME), ReplicasAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@
public class AllocateAction implements LifecycleAction {

public static final String NAME = "allocate";
public static final ParseField NUMBER_OF_REPLICAS_FIELD = new ParseField("number_of_replicas");
public static final ParseField INCLUDE_FIELD = new ParseField("include");
public static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
public static final ParseField REQUIRE_FIELD = new ParseField("require");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<AllocateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new AllocateAction((Map<String, String>) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2]));
a -> new AllocateAction((Integer) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2], (Map<String, String>) a[3]));

static {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), NUMBER_OF_REPLICAS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), INCLUDE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), EXCLUDE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), REQUIRE_FIELD);
}

private final Integer numberOfReplicas;
private final Map<String, String> include;
private final Map<String, String> exclude;
private final Map<String, String> require;
Expand All @@ -49,7 +52,7 @@ public static AllocateAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public AllocateAction(Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
public AllocateAction(Integer numberOfReplicas, Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
if (include == null) {
this.include = Collections.emptyMap();
} else {
Expand All @@ -65,19 +68,27 @@ public AllocateAction(Map<String, String> include, Map<String, String> exclude,
} else {
this.require = require;
}
if (this.include.isEmpty() && this.exclude.isEmpty() && this.require.isEmpty()) {
if (this.include.isEmpty() && this.exclude.isEmpty() && this.require.isEmpty() && numberOfReplicas == null) {
throw new IllegalArgumentException(
"At least one of " + INCLUDE_FIELD.getPreferredName() + ", " + EXCLUDE_FIELD.getPreferredName() + " or "
+ REQUIRE_FIELD.getPreferredName() + "must contain attributes for action " + NAME);
}
if (numberOfReplicas != null && numberOfReplicas < 0) {
throw new IllegalArgumentException("[" + NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0");
}
this.numberOfReplicas = numberOfReplicas;
}

@SuppressWarnings("unchecked")
public AllocateAction(StreamInput in) throws IOException {
this((Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
this(in.readOptionalVInt(), (Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
(Map<String, String>) in.readGenericValue());
}

public Integer getNumberOfReplicas() {
return numberOfReplicas;
}

public Map<String, String> getInclude() {
return include;
}
Expand All @@ -92,6 +103,7 @@ public Map<String, String> getRequire() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalVInt(numberOfReplicas);
out.writeGenericValue(include);
out.writeGenericValue(exclude);
out.writeGenericValue(require);
Expand All @@ -105,6 +117,9 @@ public String getWriteableName() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (numberOfReplicas != null) {
builder.field(NUMBER_OF_REPLICAS_FIELD.getPreferredName(), numberOfReplicas);
}
builder.field(INCLUDE_FIELD.getPreferredName(), include);
builder.field(EXCLUDE_FIELD.getPreferredName(), exclude);
builder.field(REQUIRE_FIELD.getPreferredName(), require);
Expand All @@ -123,6 +138,9 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey allocationRoutedKey = new StepKey(phase, NAME, AllocationRoutedStep.NAME);

Settings.Builder newSettings = Settings.builder();
if (numberOfReplicas != null) {
newSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas);
}
include.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value));
exclude.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
require.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
Expand All @@ -140,7 +158,7 @@ public List<StepKey> toStepKeys(String phase) {

@Override
public int hashCode() {
return Objects.hash(include, exclude, require);
return Objects.hash(numberOfReplicas, include, exclude, require);
}

@Override
Expand All @@ -152,7 +170,8 @@ public boolean equals(Object obj) {
return false;
}
AllocateAction other = (AllocateAction) obj;
return Objects.equals(include, other.include) && Objects.equals(exclude, other.exclude) && Objects.equals(require, other.require);
return Objects.equals(numberOfReplicas, other.numberOfReplicas) && Objects.equals(include, other.include)
&& Objects.equals(exclude, other.exclude) && Objects.equals(require, other.require);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we put each of these on its own line as it makes debugging much easier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ public boolean getWaitOnAllShardCopies() {

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
getKey().getAction(), index.getName());
return new Result(false, new Info(-1, false));
}
IndexMetaData idxMeta = clusterState.metaData().index(index);
if (idxMeta == null) {
throw new IndexNotFoundException("Index not found when executing " + getKey().getAction() + " lifecycle action.",
index.getName());
index.getName());
}
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
getKey().getAction(), index.getName());
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), -1, false));
}
// All the allocation attributes are already set so just need to check
// if the allocation has happened
Expand Down Expand Up @@ -94,7 +94,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
logger.debug(
"[{}] lifecycle action for index [{}] waiting for [{}] shards " + "to be allocated to nodes matching the given filters",
getKey().getAction(), index, allocationPendingAllShards);
return new Result(false, new Info(allocationPendingAllShards, true));
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true));
} else {
logger.debug("[{}] lifecycle action for index [{}] complete", getKey().getAction(), index);
return new Result(true, null);
Expand All @@ -105,7 +105,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
public int hashCode() {
return Objects.hash(super.hashCode(), waitOnAllShardCopies);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
Expand All @@ -115,29 +115,33 @@ public boolean equals(Object obj) {
return false;
}
AllocationRoutedStep other = (AllocationRoutedStep) obj;
return super.equals(obj) &&
return super.equals(obj) &&
Objects.equals(waitOnAllShardCopies, other.waitOnAllShardCopies);
}

public static final class Info implements ToXContentObject {

private final long actualReplicas;
private final long numberShardsLeftToAllocate;
private final boolean allShardsActive;
private final String message;

static final ParseField ACTUAL_REPLICAS = new ParseField("actual_replicas");
static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate");
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
static final ParseField MESSAGE = new ParseField("message");
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("allocation_routed_step_info",
a -> new Info((long) a[0], (boolean) a[1]));
a -> new Info((long) a[0], (long) a[1], (boolean) a[2]));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), ACTUAL_REPLICAS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
PARSER.declareString((i, s) -> {}, MESSAGE);
}

public Info(long numberShardsLeftToMerge, boolean allShardsActive) {
this.numberShardsLeftToAllocate = numberShardsLeftToMerge;
public Info(long actualReplicas, long numberShardsLeftToAllocate, boolean allShardsActive) {
this.actualReplicas = actualReplicas;
this.numberShardsLeftToAllocate = numberShardsLeftToAllocate;
this.allShardsActive = allShardsActive;
if (allShardsActive == false) {
message = "Waiting for all shard copies to be active";
Expand All @@ -147,10 +151,14 @@ public Info(long numberShardsLeftToMerge, boolean allShardsActive) {
}
}

public long getActualReplicas() {
return actualReplicas;
}

public long getNumberShardsLeftToAllocate() {
return numberShardsLeftToAllocate;
}

public boolean allShardsActive() {
return allShardsActive;
}
Expand All @@ -161,13 +169,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(MESSAGE.getPreferredName(), message);
builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate);
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
builder.field(ACTUAL_REPLICAS.getPreferredName(), actualReplicas);
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(numberShardsLeftToAllocate, allShardsActive);
return Objects.hash(actualReplicas, numberShardsLeftToAllocate, allShardsActive);
}

@Override
Expand All @@ -179,8 +188,9 @@ public boolean equals(Object obj) {
return false;
}
Info other = (Info) obj;
return Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
Objects.equals(allShardsActive, other.allShardsActive);
return Objects.equals(actualReplicas, other.actualReplicas) &&
Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
Objects.equals(allShardsActive, other.allShardsActive);
}

@Override
Expand Down

This file was deleted.

Loading