Skip to content

Commit bdefcbd

Browse files
reroute API: log messages from commands (#25955)
Gives allocation commands from the cluster reroute API the ability to provide messages to be logged once the cluster state change has been committed. The purpose of this change is to create a record in the logs when allocation commands which could potentially be destructive are applied. The allocate_empty_primary and allocate_stale_primary commands are the only ones that currently provide log messages. Closes #22821
1 parent 0120448 commit bdefcbd

File tree

6 files changed

+142
-8
lines changed

6 files changed

+142
-8
lines changed

core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,18 @@ protected ClusterRerouteResponse newResponse() {
6868

6969
@Override
7070
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
71+
ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
72+
response -> {
73+
if (request.dryRun() == false) {
74+
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
75+
}
76+
listener.onResponse(response);
77+
},
78+
listener::onFailure
79+
);
80+
7181
clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
72-
allocationService, request, listener));
82+
allocationService, request, logWrapper));
7383
}
7484

7585
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {

core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingExplanations.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation;
2121

22+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.xcontent.ToXContent.Params;
@@ -28,6 +29,8 @@
2829
import java.io.IOException;
2930
import java.util.ArrayList;
3031
import java.util.List;
32+
import java.util.Optional;
33+
import java.util.stream.Collectors;
3134

3235
/**
3336
* Class used to encapsulate a number of {@link RerouteExplanation}
@@ -49,6 +52,18 @@ public List<RerouteExplanation> explanations() {
4952
return this.explanations;
5053
}
5154

55+
/**
56+
* Provides feedback from commands with a YES decision that should be displayed to the user after the command has been applied
57+
*/
58+
public List<String> getYesDecisionMessages() {
59+
return explanations().stream()
60+
.filter(explanation -> explanation.decisions().type().equals(Decision.Type.YES))
61+
.map(explanation -> explanation.command().getMessage())
62+
.filter(Optional::isPresent)
63+
.map(Optional::get)
64+
.collect(Collectors.toList());
65+
}
66+
5267
/**
5368
* Read in a RoutingExplanations object
5469
*/

core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.index.shard.ShardNotFoundException;
3939

4040
import java.io.IOException;
41+
import java.util.Optional;
4142

4243
/**
4344
* Allocates an unassigned empty primary shard to a specific node. Use with extreme care as this will result in data loss.
@@ -72,6 +73,11 @@ public String name() {
7273
return NAME;
7374
}
7475

76+
@Override
77+
public Optional<String> getMessage() {
78+
return Optional.of("allocated an empty primary for [" + index + "][" + shardId + "] on node [" + node + "] from user command");
79+
}
80+
7581
public static AllocateEmptyPrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException {
7682
return new Builder().parse(parser).build();
7783
}
@@ -115,19 +121,22 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
115121
}
116122

117123
if (shardRouting.recoverySource().getType() != RecoverySource.Type.EMPTY_STORE && acceptDataLoss == false) {
118-
return explainOrThrowRejectedCommand(explain, allocation,
119-
"allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm by setting the accept_data_loss parameter to true");
124+
String dataLossWarning = "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm " +
125+
"by setting the accept_data_loss parameter to true";
126+
return explainOrThrowRejectedCommand(explain, allocation, dataLossWarning);
120127
}
121128

122129
UnassignedInfo unassignedInfoToUpdate = null;
123130
if (shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY) {
124-
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY,
125-
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
131+
String unassignedInfoMessage = "force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() +
132+
", " + shardRouting.unassignedInfo().getMessage();
133+
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage,
126134
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false,
127135
shardRouting.unassignedInfo().getLastAllocationStatus());
128136
}
129137

130-
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, StoreRecoverySource.EMPTY_STORE_INSTANCE);
138+
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate,
139+
StoreRecoverySource.EMPTY_STORE_INSTANCE);
131140

132141
return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
133142
}

core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.index.shard.ShardNotFoundException;
3636

3737
import java.io.IOException;
38+
import java.util.Optional;
3839

3940
/**
4041
* Allocates an unassigned stale primary shard to a specific node. Use with extreme care as this will result in data loss.
@@ -70,6 +71,11 @@ public String name() {
7071
return NAME;
7172
}
7273

74+
@Override
75+
public Optional<String> getMessage() {
76+
return Optional.of("allocated a stale primary for [" + index + "][" + shardId + "] on node [" + node + "] from user command");
77+
}
78+
7379
public static AllocateStalePrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException {
7480
return new Builder().parse(parser).build();
7581
}
@@ -113,8 +119,9 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
113119
}
114120

115121
if (acceptDataLoss == false) {
116-
return explainOrThrowRejectedCommand(explain, allocation,
117-
"allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm by setting the accept_data_loss parameter to true");
122+
String dataLossWarning = "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please " +
123+
"confirm by setting the accept_data_loss parameter to true";
124+
return explainOrThrowRejectedCommand(explain, allocation, dataLossWarning);
118125
}
119126

120127
if (shardRouting.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE) {

core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.xcontent.XContentParser;
2828

2929
import java.io.IOException;
30+
import java.util.Optional;
3031

3132
/**
3233
* A command to move shards in some way.
@@ -61,4 +62,12 @@ interface Parser<T extends AllocationCommand> {
6162
default String getWriteableName() {
6263
return name();
6364
}
65+
66+
/**
67+
* Returns any feedback the command wants to provide for logging. This message should be appropriate to expose to the user after the
68+
* command has been applied
69+
*/
70+
default Optional<String> getMessage() {
71+
return Optional.empty();
72+
}
6473
}

core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,24 @@
1919

2020
package org.elasticsearch.cluster.allocation;
2121

22+
import org.apache.logging.log4j.Level;
2223
import org.apache.logging.log4j.Logger;
2324
import org.apache.lucene.util.IOUtils;
2425
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2526
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
27+
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
2628
import org.elasticsearch.action.support.ActiveShardCount;
2729
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
2830
import org.elasticsearch.cluster.ClusterState;
2931
import org.elasticsearch.cluster.health.ClusterHealthStatus;
32+
import org.elasticsearch.cluster.metadata.IndexMetaData;
3033
import org.elasticsearch.cluster.node.DiscoveryNode;
3134
import org.elasticsearch.cluster.routing.ShardRouting;
3235
import org.elasticsearch.cluster.routing.ShardRoutingState;
3336
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
3437
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
3538
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
39+
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
3640
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
3741
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
3842
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -50,6 +54,7 @@
5054
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
5155
import org.elasticsearch.test.ESIntegTestCase.Scope;
5256
import org.elasticsearch.test.InternalTestCluster;
57+
import org.elasticsearch.test.MockLogAppender;
5358

5459
import java.nio.file.Path;
5560
import java.util.Arrays;
@@ -63,6 +68,7 @@
6368
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
6469
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6570
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
71+
import static org.hamcrest.Matchers.containsString;
6672
import static org.hamcrest.Matchers.equalTo;
6773
import static org.hamcrest.Matchers.hasSize;
6874

@@ -304,6 +310,84 @@ public void testRerouteExplain() {
304310
assertThat(explanation.decisions().type(), equalTo(Decision.Type.YES));
305311
}
306312

313+
public void testMessageLogging() throws Exception{
314+
final Settings settings = Settings.builder()
315+
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE.name())
316+
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.name())
317+
.build();
318+
319+
final String nodeName1 = internalCluster().startNode(settings);
320+
assertThat(cluster().size(), equalTo(1));
321+
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1")
322+
.execute().actionGet();
323+
assertThat(healthResponse.isTimedOut(), equalTo(false));
324+
325+
final String nodeName2 = internalCluster().startNode(settings);
326+
assertThat(cluster().size(), equalTo(2));
327+
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
328+
assertThat(healthResponse.isTimedOut(), equalTo(false));
329+
330+
final String indexName = "test_index";
331+
client().admin().indices().prepareCreate(indexName).setWaitForActiveShards(ActiveShardCount.NONE)
332+
.setSettings(Settings.builder()
333+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2)
334+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1))
335+
.execute().actionGet();
336+
337+
Logger actionLogger = Loggers.getLogger(TransportClusterRerouteAction.class);
338+
339+
MockLogAppender dryRunMockLog = new MockLogAppender();
340+
dryRunMockLog.start();
341+
dryRunMockLog.addExpectation(
342+
new MockLogAppender.UnseenEventExpectation("no completed message logged on dry run",
343+
TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*")
344+
);
345+
Loggers.addAppender(actionLogger, dryRunMockLog);
346+
347+
AllocationCommand dryRunAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
348+
ClusterRerouteResponse dryRunResponse = client().admin().cluster().prepareReroute()
349+
.setExplain(randomBoolean())
350+
.setDryRun(true)
351+
.add(dryRunAllocation)
352+
.execute().actionGet();
353+
354+
// during a dry run, messages exist but are not logged or exposed
355+
assertThat(dryRunResponse.getExplanations().getYesDecisionMessages(), hasSize(1));
356+
assertThat(dryRunResponse.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary"));
357+
358+
dryRunMockLog.assertAllExpectationsMatched();
359+
dryRunMockLog.stop();
360+
Loggers.removeAppender(actionLogger, dryRunMockLog);
361+
362+
MockLogAppender allocateMockLog = new MockLogAppender();
363+
allocateMockLog.start();
364+
allocateMockLog.addExpectation(
365+
new MockLogAppender.SeenEventExpectation("message for first allocate empty primary",
366+
TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*" + nodeName1 + "*")
367+
);
368+
allocateMockLog.addExpectation(
369+
new MockLogAppender.UnseenEventExpectation("no message for second allocate empty primary",
370+
TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*" + nodeName2 + "*")
371+
);
372+
Loggers.addAppender(actionLogger, allocateMockLog);
373+
374+
AllocationCommand yesDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
375+
AllocationCommand noDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand("noexist", 1, nodeName2, true);
376+
ClusterRerouteResponse response = client().admin().cluster().prepareReroute()
377+
.setExplain(true) // so we get a NO decision back rather than an exception
378+
.add(yesDecisionAllocation)
379+
.add(noDecisionAllocation)
380+
.execute().actionGet();
381+
382+
assertThat(response.getExplanations().getYesDecisionMessages(), hasSize(1));
383+
assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary"));
384+
assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString(nodeName1));
385+
386+
allocateMockLog.assertAllExpectationsMatched();
387+
allocateMockLog.stop();
388+
Loggers.removeAppender(actionLogger, allocateMockLog);
389+
}
390+
307391
public void testClusterRerouteWithBlocks() throws Exception {
308392
List<String> nodesIds = internalCluster().startNodes(2);
309393

0 commit comments

Comments
 (0)