Skip to content

log messages from allocation commands #25955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 22, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Response returned after a cluster reroute request
Expand Down Expand Up @@ -56,6 +60,15 @@ public RoutingExplanations getExplanations() {
return this.explanations;
}

public List<String> getMessages() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some java docs? the relationship between Explanations and messages is confusing imo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add javadocs for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, missed that

return explanations.explanations().stream()
.filter(explanation -> explanation.decisions().type().equals(Decision.Type.YES))
.map(explanation -> explanation.command().getMessage())
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make more sense to put this method in RoutingExplanations itself since that's what it operates on. My thinking here was that the logic of "which messages do we actually surface" is closer to the response handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above. I'm +0 to move it to RoutingExplanations. If you prefer it here, I'm good with leaving it here too.

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,18 @@ protected ClusterRerouteResponse newResponse() {

@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getMessages().forEach(logger::info);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can come up with some prefix to those. We log them after the operation has been done and which the tense a bit weird - see for example https://github.com/elastic/elasticsearch/pull/25955/files#diff-dc1ff30a0cef7322a73dd1a8e33a1e3aR52 : :"Allocating an empty primary for" - it should be "Allocated" as it is already done. I can't come up with a good prefix though, so by default I would suggest changing the messages to use past tense.

}
listener.onResponse(response);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go into a finally block in case an exception is thrown in getMessages?

Copy link
Contributor Author

@andyb-elastic andyb-elastic Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. If that were to happen without finally, would an error response with the exception be returned somewhere "upstream", or would the request handling just time out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume that the error would be logged and the connection would hang until the client timed out (just a guess)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionListener#wrap protects against this:

    static <Response> ActionListener<Response> wrap(CheckedConsumer<Response, ? extends Exception> onResponse,
            Consumer<Exception> onFailure) {
        return new ActionListener<Response>() {
            @Override
            public void onResponse(Response response) {
                try {
                    onResponse.accept(response);
                } catch (Exception e) {
                    onFailure(e);
                }
            }

            @Override
            public void onFailure(Exception e) {
                onFailure.accept(e);
            }
        };
    }

},
listener::onFailure
);

clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
allocationService, request, listener));
allocationService, request, logWrapper));
}

static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.io.IOException;
import java.util.Locale;
import java.util.Optional;

/**
* Allocates an unassigned empty primary shard to a specific node. Use with extreme care as this will result in data loss.
Expand All @@ -47,6 +49,9 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
public static final String NAME = "allocate_empty_primary";
public static final ParseField COMMAND_NAME_FIELD = new ParseField(NAME);

private static final String MESSAGE = "Allocating an empty primary for [%1$s][%2$s] on node [%3$s]. This action can cause " +
"data loss. If the old primary rejoins the cluster, its copy of this shard will be deleted.";

private static final ObjectParser<Builder, Void> EMPTY_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);

/**
Expand Down Expand Up @@ -131,4 +136,9 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)

return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}

@Override
public Optional<String> getMessage() {
return Optional.of(String.format(Locale.ROOT, MESSAGE, index, shardId, node));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.io.IOException;
import java.util.Locale;
import java.util.Optional;

/**
* Allocates an unassigned stale primary shard to a specific node. Use with extreme care as this will result in data loss.
Expand All @@ -44,6 +46,9 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation
public static final String NAME = "allocate_stale_primary";
public static final ParseField COMMAND_NAME_FIELD = new ParseField(NAME);

private static final String MESSAGE = "Allocating a stale primary for [%1$s][%2$s] on node [%3$s]. This action can cause" +
"data loss. If the old primary rejoins the cluster, its copy of this shard will be overwritten.";

private static final ObjectParser<Builder, Void> STALE_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);

/**
Expand Down Expand Up @@ -126,4 +131,9 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}

@Override
public Optional<String> getMessage() {
return Optional.of(String.format(Locale.ROOT, MESSAGE, index, shardId, node));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Optional;

/**
* A command to move shards in some way.
Expand Down Expand Up @@ -61,4 +62,11 @@ interface Parser<T extends AllocationCommand> {
default String getWriteableName() {
return name();
}

/**
* Returns any feedback the command wants to provide for logging
*/
default Optional<String> getMessage() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@

package org.elasticsearch.cluster.allocation;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand All @@ -50,6 +54,7 @@
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLogAppender;

import java.nio.file.Path;
import java.util.Arrays;
Expand All @@ -63,6 +68,7 @@
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -304,6 +310,86 @@ public void testRerouteExplain() {
assertThat(explanation.decisions().type(), equalTo(Decision.Type.YES));
}

public void testMessages() throws Exception{
final Settings settings = Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE.name())
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.name())
.build();

final String nodeName1 = internalCluster().startNode(settings);
assertThat(cluster().size(), equalTo(1));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1")
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

final String nodeName2 = internalCluster().startNode(settings);
assertThat(cluster().size(), equalTo(2));
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

final String indexName = "fake_index";
client().admin().indices().prepareCreate(indexName).setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1))
.execute().actionGet();

Logger actionLogger = Loggers.getLogger(TransportClusterRerouteAction.class);


MockLogAppender dryRunMockLog = new MockLogAppender();
dryRunMockLog.start();
dryRunMockLog.addExpectation(
new MockLogAppender.UnseenEventExpectation("don't warn about allocation on dry run",
TransportClusterRerouteAction.class.getName(), Level.INFO, "Allocating an empty primary*")
);
Loggers.addAppender(actionLogger, dryRunMockLog);

AllocationCommand dryRunAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
ClusterRerouteResponse dryRunResponse = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.setDryRun(true)
.add(dryRunAllocation)
.execute().actionGet();

// during a dry run, messages exist but are not logged or exposed
assertThat(dryRunResponse.getMessages(), hasSize(1));

dryRunMockLog.assertAllExpectationsMatched();
dryRunMockLog.stop();
Loggers.removeAppender(actionLogger, dryRunMockLog);

MockLogAppender allocateMockLog = new MockLogAppender();
allocateMockLog.start();
allocateMockLog.addExpectation(
new MockLogAppender.SeenEventExpectation("warning for first allocate empty primary",
TransportClusterRerouteAction.class.getName(), Level.INFO, "Allocating an empty primary*")
);
allocateMockLog.addExpectation(
new MockLogAppender.SeenEventExpectation("warning for second allocate empty primary",
TransportClusterRerouteAction.class.getName(), Level.INFO, "Allocating an empty primary*")
);
Loggers.addAppender(actionLogger, allocateMockLog);

AllocationCommand allocateEmptyPrimaryCommand1 = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
AllocationCommand allocateEmptyPrimaryCommand2 = new AllocateEmptyPrimaryAllocationCommand(indexName, 1, nodeName2, true);
ClusterRerouteResponse response = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(allocateEmptyPrimaryCommand1)
.add(allocateEmptyPrimaryCommand2)
.execute().actionGet();

ensureYellow(indexName);

assertThat(response.getMessages(), hasSize(2));
assertThat(response.getMessages().get(0), containsString("Allocating an empty primary"));
assertThat(response.getMessages().get(1), containsString("Allocating an empty primary"));

allocateMockLog.assertAllExpectationsMatched();
allocateMockLog.stop();
Loggers.removeAppender(actionLogger, allocateMockLog);
}

public void testClusterRerouteWithBlocks() throws Exception {
List<String> nodesIds = internalCluster().startNodes(2);

Expand Down