Skip to content

Commit ffd500e

Browse files
talevyjasontedor
authored andcommitted
move replicas action functionality into AllocateAction (#32523)
Since replica counts and allocation rules are set separately, it is not always clear how many replicas are to be allocated in the allocate action. Moving the replicas action to occur at the same time as the allocate action, resolves this confusion that could end an undesired state. This means that the ReplicasAction is removed, and a new optional replicas parameter is added to AllocateAction.
1 parent 6c93c29 commit ffd500e

File tree

15 files changed

+164
-595
lines changed

15 files changed

+164
-595
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void testSetIndexLifecyclePolicy() throws Exception {
6868
" \"cold\": {\n" +
6969
" \"after\": \"2000s\",\n" +
7070
" \"actions\": {\n" +
71-
" \"replicas\": {\n" +
71+
" \"allocate\": {\n" +
7272
" \"number_of_replicas\": 0\n" +
7373
" }\n" +
7474
" }\n" +

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
4949
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
5050
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
51-
import org.elasticsearch.xpack.core.indexlifecycle.ReplicasAction;
5251
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
5352
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
5453
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
@@ -405,7 +404,6 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
405404
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
406405
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
407406
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
408-
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReplicasAction.NAME, ReplicasAction::new),
409407
new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new),
410408
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
411409
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)
@@ -449,7 +447,6 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
449447
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
450448
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
451449
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
452-
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReplicasAction.NAME), ReplicasAction::parse),
453450
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
454451
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
455452
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocateAction.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,23 @@
2727
public class AllocateAction implements LifecycleAction {
2828

2929
public static final String NAME = "allocate";
30+
public static final ParseField NUMBER_OF_REPLICAS_FIELD = new ParseField("number_of_replicas");
3031
public static final ParseField INCLUDE_FIELD = new ParseField("include");
3132
public static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
3233
public static final ParseField REQUIRE_FIELD = new ParseField("require");
3334

3435
@SuppressWarnings("unchecked")
3536
private static final ConstructingObjectParser<AllocateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
36-
a -> new AllocateAction((Map<String, String>) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2]));
37+
a -> new AllocateAction((Integer) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2], (Map<String, String>) a[3]));
3738

3839
static {
40+
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), NUMBER_OF_REPLICAS_FIELD);
3941
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), INCLUDE_FIELD);
4042
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), EXCLUDE_FIELD);
4143
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), REQUIRE_FIELD);
4244
}
4345

46+
private final Integer numberOfReplicas;
4447
private final Map<String, String> include;
4548
private final Map<String, String> exclude;
4649
private final Map<String, String> require;
@@ -49,7 +52,7 @@ public static AllocateAction parse(XContentParser parser) {
4952
return PARSER.apply(parser, null);
5053
}
5154

52-
public AllocateAction(Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
55+
public AllocateAction(Integer numberOfReplicas, Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
5356
if (include == null) {
5457
this.include = Collections.emptyMap();
5558
} else {
@@ -65,19 +68,27 @@ public AllocateAction(Map<String, String> include, Map<String, String> exclude,
6568
} else {
6669
this.require = require;
6770
}
68-
if (this.include.isEmpty() && this.exclude.isEmpty() && this.require.isEmpty()) {
71+
if (this.include.isEmpty() && this.exclude.isEmpty() && this.require.isEmpty() && numberOfReplicas == null) {
6972
throw new IllegalArgumentException(
7073
"At least one of " + INCLUDE_FIELD.getPreferredName() + ", " + EXCLUDE_FIELD.getPreferredName() + " or "
7174
+ REQUIRE_FIELD.getPreferredName() + "must contain attributes for action " + NAME);
7275
}
76+
if (numberOfReplicas != null && numberOfReplicas < 0) {
77+
throw new IllegalArgumentException("[" + NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0");
78+
}
79+
this.numberOfReplicas = numberOfReplicas;
7380
}
7481

7582
@SuppressWarnings("unchecked")
7683
public AllocateAction(StreamInput in) throws IOException {
77-
this((Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
84+
this(in.readOptionalVInt(), (Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
7885
(Map<String, String>) in.readGenericValue());
7986
}
8087

88+
public Integer getNumberOfReplicas() {
89+
return numberOfReplicas;
90+
}
91+
8192
public Map<String, String> getInclude() {
8293
return include;
8394
}
@@ -92,6 +103,7 @@ public Map<String, String> getRequire() {
92103

93104
@Override
94105
public void writeTo(StreamOutput out) throws IOException {
106+
out.writeOptionalVInt(numberOfReplicas);
95107
out.writeGenericValue(include);
96108
out.writeGenericValue(exclude);
97109
out.writeGenericValue(require);
@@ -105,6 +117,9 @@ public String getWriteableName() {
105117
@Override
106118
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
107119
builder.startObject();
120+
if (numberOfReplicas != null) {
121+
builder.field(NUMBER_OF_REPLICAS_FIELD.getPreferredName(), numberOfReplicas);
122+
}
108123
builder.field(INCLUDE_FIELD.getPreferredName(), include);
109124
builder.field(EXCLUDE_FIELD.getPreferredName(), exclude);
110125
builder.field(REQUIRE_FIELD.getPreferredName(), require);
@@ -123,6 +138,9 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
123138
StepKey allocationRoutedKey = new StepKey(phase, NAME, AllocationRoutedStep.NAME);
124139

125140
Settings.Builder newSettings = Settings.builder();
141+
if (numberOfReplicas != null) {
142+
newSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas);
143+
}
126144
include.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value));
127145
exclude.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
128146
require.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
@@ -140,7 +158,7 @@ public List<StepKey> toStepKeys(String phase) {
140158

141159
@Override
142160
public int hashCode() {
143-
return Objects.hash(include, exclude, require);
161+
return Objects.hash(numberOfReplicas, include, exclude, require);
144162
}
145163

146164
@Override
@@ -152,7 +170,10 @@ public boolean equals(Object obj) {
152170
return false;
153171
}
154172
AllocateAction other = (AllocateAction) obj;
155-
return Objects.equals(include, other.include) && Objects.equals(exclude, other.exclude) && Objects.equals(require, other.require);
173+
return Objects.equals(numberOfReplicas, other.numberOfReplicas) &&
174+
Objects.equals(include, other.include) &&
175+
Objects.equals(exclude, other.exclude) &&
176+
Objects.equals(require, other.require);
156177
}
157178

158179
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStep.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ public boolean getWaitOnAllShardCopies() {
5454

5555
@Override
5656
public Result isConditionMet(Index index, ClusterState clusterState) {
57-
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
58-
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
59-
getKey().getAction(), index.getName());
60-
return new Result(false, new Info(-1, false));
61-
}
6257
IndexMetaData idxMeta = clusterState.metaData().index(index);
6358
if (idxMeta == null) {
6459
throw new IndexNotFoundException("Index not found when executing " + getKey().getAction() + " lifecycle action.",
65-
index.getName());
60+
index.getName());
61+
}
62+
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
63+
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
64+
getKey().getAction(), index.getName());
65+
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), -1, false));
6666
}
6767
// All the allocation attributes are already set so just need to check
6868
// if the allocation has happened
@@ -94,7 +94,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
9494
logger.debug(
9595
"[{}] lifecycle action for index [{}] waiting for [{}] shards " + "to be allocated to nodes matching the given filters",
9696
getKey().getAction(), index, allocationPendingAllShards);
97-
return new Result(false, new Info(allocationPendingAllShards, true));
97+
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true));
9898
} else {
9999
logger.debug("[{}] lifecycle action for index [{}] complete", getKey().getAction(), index);
100100
return new Result(true, null);
@@ -105,7 +105,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
105105
public int hashCode() {
106106
return Objects.hash(super.hashCode(), waitOnAllShardCopies);
107107
}
108-
108+
109109
@Override
110110
public boolean equals(Object obj) {
111111
if (obj == null) {
@@ -115,29 +115,33 @@ public boolean equals(Object obj) {
115115
return false;
116116
}
117117
AllocationRoutedStep other = (AllocationRoutedStep) obj;
118-
return super.equals(obj) &&
118+
return super.equals(obj) &&
119119
Objects.equals(waitOnAllShardCopies, other.waitOnAllShardCopies);
120120
}
121-
121+
122122
public static final class Info implements ToXContentObject {
123123

124+
private final long actualReplicas;
124125
private final long numberShardsLeftToAllocate;
125126
private final boolean allShardsActive;
126127
private final String message;
127128

129+
static final ParseField ACTUAL_REPLICAS = new ParseField("actual_replicas");
128130
static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate");
129131
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
130132
static final ParseField MESSAGE = new ParseField("message");
131133
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("allocation_routed_step_info",
132-
a -> new Info((long) a[0], (boolean) a[1]));
134+
a -> new Info((long) a[0], (long) a[1], (boolean) a[2]));
133135
static {
136+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), ACTUAL_REPLICAS);
134137
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE);
135138
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
136139
PARSER.declareString((i, s) -> {}, MESSAGE);
137140
}
138141

139-
public Info(long numberShardsLeftToMerge, boolean allShardsActive) {
140-
this.numberShardsLeftToAllocate = numberShardsLeftToMerge;
142+
public Info(long actualReplicas, long numberShardsLeftToAllocate, boolean allShardsActive) {
143+
this.actualReplicas = actualReplicas;
144+
this.numberShardsLeftToAllocate = numberShardsLeftToAllocate;
141145
this.allShardsActive = allShardsActive;
142146
if (allShardsActive == false) {
143147
message = "Waiting for all shard copies to be active";
@@ -147,10 +151,14 @@ public Info(long numberShardsLeftToMerge, boolean allShardsActive) {
147151
}
148152
}
149153

154+
public long getActualReplicas() {
155+
return actualReplicas;
156+
}
157+
150158
public long getNumberShardsLeftToAllocate() {
151159
return numberShardsLeftToAllocate;
152160
}
153-
161+
154162
public boolean allShardsActive() {
155163
return allShardsActive;
156164
}
@@ -161,13 +169,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
161169
builder.field(MESSAGE.getPreferredName(), message);
162170
builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate);
163171
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
172+
builder.field(ACTUAL_REPLICAS.getPreferredName(), actualReplicas);
164173
builder.endObject();
165174
return builder;
166175
}
167176

168177
@Override
169178
public int hashCode() {
170-
return Objects.hash(numberShardsLeftToAllocate, allShardsActive);
179+
return Objects.hash(actualReplicas, numberShardsLeftToAllocate, allShardsActive);
171180
}
172181

173182
@Override
@@ -179,8 +188,9 @@ public boolean equals(Object obj) {
179188
return false;
180189
}
181190
Info other = (Info) obj;
182-
return Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
183-
Objects.equals(allShardsActive, other.allShardsActive);
191+
return Objects.equals(actualReplicas, other.actualReplicas) &&
192+
Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
193+
Objects.equals(allShardsActive, other.allShardsActive);
184194
}
185195

186196
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ReplicasAction.java

Lines changed: 0 additions & 123 deletions
This file was deleted.

0 commit comments

Comments
 (0)