Skip to content

Commit fea1f5e

Browse files
committed
move replicas action functionality into AllocateAction
1 parent 0a9c3ae commit fea1f5e

File tree

15 files changed

+122
-595
lines changed

15 files changed

+122
-595
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void testSetIndexLifecyclePolicy() throws Exception {
6868
" \"cold\": {\n" +
6969
" \"after\": \"2000s\",\n" +
7070
" \"actions\": {\n" +
71-
" \"replicas\": {\n" +
71+
" \"allocate\": {\n" +
7272
" \"number_of_replicas\": 0\n" +
7373
" }\n" +
7474
" }\n" +

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
5050
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
5151
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
52-
import org.elasticsearch.xpack.core.indexlifecycle.ReplicasAction;
5352
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
5453
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
5554
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
@@ -408,7 +407,6 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
408407
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
409408
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
410409
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
411-
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReplicasAction.NAME, ReplicasAction::new),
412410
new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new),
413411
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
414412
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)
@@ -452,7 +450,6 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
452450
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
453451
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
454452
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
455-
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReplicasAction.NAME), ReplicasAction::parse),
456453
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
457454
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
458455
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocateAction.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,23 @@
2727
public class AllocateAction implements LifecycleAction {
2828

2929
public static final String NAME = "allocate";
30+
public static final ParseField NUMBER_OF_REPLICAS_FIELD = new ParseField("number_of_replicas");
3031
public static final ParseField INCLUDE_FIELD = new ParseField("include");
3132
public static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
3233
public static final ParseField REQUIRE_FIELD = new ParseField("require");
3334

3435
@SuppressWarnings("unchecked")
3536
private static final ConstructingObjectParser<AllocateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
36-
a -> new AllocateAction((Map<String, String>) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2]));
37+
a -> new AllocateAction((Integer) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2], (Map<String, String>) a[3]));
3738

3839
static {
40+
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), NUMBER_OF_REPLICAS_FIELD);
3941
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), INCLUDE_FIELD);
4042
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), EXCLUDE_FIELD);
4143
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), REQUIRE_FIELD);
4244
}
4345

46+
private final Integer numberOfReplicas;
4447
private final Map<String, String> include;
4548
private final Map<String, String> exclude;
4649
private final Map<String, String> require;
@@ -49,7 +52,7 @@ public static AllocateAction parse(XContentParser parser) {
4952
return PARSER.apply(parser, null);
5053
}
5154

52-
public AllocateAction(Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
55+
public AllocateAction(Integer numberOfReplicas, Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
5356
if (include == null) {
5457
this.include = Collections.emptyMap();
5558
} else {
@@ -65,19 +68,27 @@ public AllocateAction(Map<String, String> include, Map<String, String> exclude,
6568
} else {
6669
this.require = require;
6770
}
68-
if (this.include.isEmpty() && this.exclude.isEmpty() && this.require.isEmpty()) {
71+
if (this.include.isEmpty() && this.exclude.isEmpty() && this.require.isEmpty() && numberOfReplicas == null) {
6972
throw new IllegalArgumentException(
7073
"At least one of " + INCLUDE_FIELD.getPreferredName() + ", " + EXCLUDE_FIELD.getPreferredName() + " or "
7174
+ REQUIRE_FIELD.getPreferredName() + "must contain attributes for action " + NAME);
7275
}
76+
if (numberOfReplicas != null && numberOfReplicas < 0) {
77+
throw new IllegalArgumentException("[" + NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0");
78+
}
79+
this.numberOfReplicas = numberOfReplicas;
7380
}
7481

7582
@SuppressWarnings("unchecked")
7683
public AllocateAction(StreamInput in) throws IOException {
77-
this((Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
84+
this(in.readOptionalVInt(), (Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
7885
(Map<String, String>) in.readGenericValue());
7986
}
8087

88+
public Integer getNumberOfReplicas() {
89+
return numberOfReplicas;
90+
}
91+
8192
public Map<String, String> getInclude() {
8293
return include;
8394
}
@@ -92,6 +103,7 @@ public Map<String, String> getRequire() {
92103

93104
@Override
94105
public void writeTo(StreamOutput out) throws IOException {
106+
out.writeOptionalVInt(numberOfReplicas);
95107
out.writeGenericValue(include);
96108
out.writeGenericValue(exclude);
97109
out.writeGenericValue(require);
@@ -105,6 +117,9 @@ public String getWriteableName() {
105117
@Override
106118
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
107119
builder.startObject();
120+
if (numberOfReplicas != null) {
121+
builder.field(NUMBER_OF_REPLICAS_FIELD.getPreferredName(), numberOfReplicas);
122+
}
108123
builder.field(INCLUDE_FIELD.getPreferredName(), include);
109124
builder.field(EXCLUDE_FIELD.getPreferredName(), exclude);
110125
builder.field(REQUIRE_FIELD.getPreferredName(), require);
@@ -123,6 +138,9 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
123138
StepKey allocationRoutedKey = new StepKey(phase, NAME, AllocationRoutedStep.NAME);
124139

125140
Settings.Builder newSettings = Settings.builder();
141+
if (numberOfReplicas != null) {
142+
newSettings.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas);
143+
}
126144
include.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value));
127145
exclude.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
128146
require.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
@@ -140,7 +158,7 @@ public List<StepKey> toStepKeys(String phase) {
140158

141159
@Override
142160
public int hashCode() {
143-
return Objects.hash(include, exclude, require);
161+
return Objects.hash(numberOfReplicas, include, exclude, require);
144162
}
145163

146164
@Override
@@ -152,7 +170,8 @@ public boolean equals(Object obj) {
152170
return false;
153171
}
154172
AllocateAction other = (AllocateAction) obj;
155-
return Objects.equals(include, other.include) && Objects.equals(exclude, other.exclude) && Objects.equals(require, other.require);
173+
return Objects.equals(numberOfReplicas, other.numberOfReplicas) && Objects.equals(include, other.include)
174+
&& Objects.equals(exclude, other.exclude) && Objects.equals(require, other.require);
156175
}
157176

158177
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStep.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ public boolean getWaitOnAllShardCopies() {
5454

5555
@Override
5656
public Result isConditionMet(Index index, ClusterState clusterState) {
57-
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
58-
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
59-
getKey().getAction(), index.getName());
60-
return new Result(false, new Info(-1, false));
61-
}
6257
IndexMetaData idxMeta = clusterState.metaData().index(index);
6358
if (idxMeta == null) {
6459
throw new IndexNotFoundException("Index not found when executing " + getKey().getAction() + " lifecycle action.",
65-
index.getName());
60+
index.getName());
61+
}
62+
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
63+
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
64+
getKey().getAction(), index.getName());
65+
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), -1, false));
6666
}
6767
// All the allocation attributes are already set so just need to check
6868
// if the allocation has happened
@@ -94,7 +94,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
9494
logger.debug(
9595
"[{}] lifecycle action for index [{}] waiting for [{}] shards " + "to be allocated to nodes matching the given filters",
9696
getKey().getAction(), index, allocationPendingAllShards);
97-
return new Result(false, new Info(allocationPendingAllShards, true));
97+
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true));
9898
} else {
9999
logger.debug("[{}] lifecycle action for index [{}] complete", getKey().getAction(), index);
100100
return new Result(true, null);
@@ -105,7 +105,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
105105
public int hashCode() {
106106
return Objects.hash(super.hashCode(), waitOnAllShardCopies);
107107
}
108-
108+
109109
@Override
110110
public boolean equals(Object obj) {
111111
if (obj == null) {
@@ -115,29 +115,33 @@ public boolean equals(Object obj) {
115115
return false;
116116
}
117117
AllocationRoutedStep other = (AllocationRoutedStep) obj;
118-
return super.equals(obj) &&
118+
return super.equals(obj) &&
119119
Objects.equals(waitOnAllShardCopies, other.waitOnAllShardCopies);
120120
}
121-
121+
122122
public static final class Info implements ToXContentObject {
123123

124+
private final long actualReplicas;
124125
private final long numberShardsLeftToAllocate;
125126
private final boolean allShardsActive;
126127
private final String message;
127128

129+
static final ParseField ACTUAL_REPLICAS = new ParseField("actual_replicas");
128130
static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate");
129131
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
130132
static final ParseField MESSAGE = new ParseField("message");
131133
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("allocation_routed_step_info",
132-
a -> new Info((long) a[0], (boolean) a[1]));
134+
a -> new Info((long) a[0], (long) a[1], (boolean) a[2]));
133135
static {
136+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), ACTUAL_REPLICAS);
134137
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE);
135138
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
136139
PARSER.declareString((i, s) -> {}, MESSAGE);
137140
}
138141

139-
public Info(long numberShardsLeftToMerge, boolean allShardsActive) {
140-
this.numberShardsLeftToAllocate = numberShardsLeftToMerge;
142+
public Info(long actualReplicas, long numberShardsLeftToAllocate, boolean allShardsActive) {
143+
this.actualReplicas = actualReplicas;
144+
this.numberShardsLeftToAllocate = numberShardsLeftToAllocate;
141145
this.allShardsActive = allShardsActive;
142146
if (allShardsActive == false) {
143147
message = "Waiting for all shard copies to be active";
@@ -147,10 +151,14 @@ public Info(long numberShardsLeftToMerge, boolean allShardsActive) {
147151
}
148152
}
149153

154+
public long getActualReplicas() {
155+
return actualReplicas;
156+
}
157+
150158
public long getNumberShardsLeftToAllocate() {
151159
return numberShardsLeftToAllocate;
152160
}
153-
161+
154162
public boolean allShardsActive() {
155163
return allShardsActive;
156164
}
@@ -161,13 +169,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
161169
builder.field(MESSAGE.getPreferredName(), message);
162170
builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate);
163171
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
172+
builder.field(ACTUAL_REPLICAS.getPreferredName(), actualReplicas);
164173
builder.endObject();
165174
return builder;
166175
}
167176

168177
@Override
169178
public int hashCode() {
170-
return Objects.hash(numberShardsLeftToAllocate, allShardsActive);
179+
return Objects.hash(actualReplicas, numberShardsLeftToAllocate, allShardsActive);
171180
}
172181

173182
@Override
@@ -179,8 +188,9 @@ public boolean equals(Object obj) {
179188
return false;
180189
}
181190
Info other = (Info) obj;
182-
return Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
183-
Objects.equals(allShardsActive, other.allShardsActive);
191+
return Objects.equals(actualReplicas, other.actualReplicas) &&
192+
Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
193+
Objects.equals(allShardsActive, other.allShardsActive);
184194
}
185195

186196
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ReplicasAction.java

Lines changed: 0 additions & 123 deletions
This file was deleted.

0 commit comments

Comments
 (0)