Skip to content

Commit ebded19

Browse files
committed
Restart API: Allow to restart one or more nodes, closes #155.
1 parent 97958c3 commit ebded19

File tree

16 files changed

+580
-17
lines changed

16 files changed

+580
-17
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
2323
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
24-
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdown;
24+
import org.elasticsearch.action.admin.cluster.node.restart.TransportNodesRestartAction;
25+
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdownAction;
2526
import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction;
2627
import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexReplicationPingAction;
2728
import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicationPingAction;
@@ -62,7 +63,8 @@ public class TransportActionModule extends AbstractModule {
6263
@Override protected void configure() {
6364

6465
bind(TransportNodesInfo.class).asEagerSingleton();
65-
bind(TransportNodesShutdown.class).asEagerSingleton();
66+
bind(TransportNodesShutdownAction.class).asEagerSingleton();
67+
bind(TransportNodesRestartAction.class).asEagerSingleton();
6668
bind(TransportClusterStateAction.class).asEagerSingleton();
6769
bind(TransportClusterHealthAction.class).asEagerSingleton();
6870

modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public static class Cluster {
7474
public static class Node {
7575
public static final String INFO = "/cluster/nodes/info";
7676
public static final String SHUTDOWN = "/cluster/nodes/shutdown";
77+
public static final String RESTART = "/cluster/nodes/restart";
7778
}
7879

7980
public static class Ping {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.admin.cluster.node.restart;
21+
22+
import org.elasticsearch.action.support.nodes.NodesOperationRequest;
23+
import org.elasticsearch.util.TimeValue;
24+
import org.elasticsearch.util.io.stream.StreamInput;
25+
import org.elasticsearch.util.io.stream.StreamOutput;
26+
27+
import java.io.IOException;
28+
29+
import static org.elasticsearch.util.TimeValue.*;
30+
31+
/**
32+
* A request to restart one ore more nodes (or the whole cluster).
33+
*
34+
* @author kimchy (shay.banon)
35+
*/
36+
public class NodesRestartRequest extends NodesOperationRequest {
37+
38+
TimeValue delay = TimeValue.timeValueSeconds(1);
39+
40+
protected NodesRestartRequest() {
41+
}
42+
43+
/**
44+
* Restarts down nodes based on the nodes ids specified. If none are passed, <b>all</b>
45+
* nodes will be shutdown.
46+
*/
47+
public NodesRestartRequest(String... nodesIds) {
48+
super(nodesIds);
49+
}
50+
51+
/**
52+
* The delay for the restart to occur. Defaults to <tt>1s</tt>.
53+
*/
54+
public NodesRestartRequest delay(TimeValue delay) {
55+
this.delay = delay;
56+
return this;
57+
}
58+
59+
public TimeValue delay() {
60+
return this.delay;
61+
}
62+
63+
@Override public void readFrom(StreamInput in) throws IOException {
64+
super.readFrom(in);
65+
delay = readTimeValue(in);
66+
}
67+
68+
@Override public void writeTo(StreamOutput out) throws IOException {
69+
super.writeTo(out);
70+
delay.writeTo(out);
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.admin.cluster.node.restart;
21+
22+
import org.elasticsearch.action.support.nodes.NodeOperationResponse;
23+
import org.elasticsearch.action.support.nodes.NodesOperationResponse;
24+
import org.elasticsearch.cluster.ClusterName;
25+
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.util.io.stream.StreamInput;
27+
import org.elasticsearch.util.io.stream.StreamOutput;
28+
29+
import java.io.IOException;
30+
31+
/**
32+
* @author kimchy (shay.banon)
33+
*/
34+
public class NodesRestartResponse extends NodesOperationResponse<NodesRestartResponse.NodeRestartResponse> {
35+
36+
NodesRestartResponse() {
37+
}
38+
39+
public NodesRestartResponse(ClusterName clusterName, NodeRestartResponse[] nodes) {
40+
super(clusterName, nodes);
41+
}
42+
43+
@Override public void readFrom(StreamInput in) throws IOException {
44+
super.readFrom(in);
45+
nodes = new NodeRestartResponse[in.readVInt()];
46+
for (int i = 0; i < nodes.length; i++) {
47+
nodes[i] = NodeRestartResponse.readNodeRestartResponse(in);
48+
}
49+
}
50+
51+
@Override public void writeTo(StreamOutput out) throws IOException {
52+
super.writeTo(out);
53+
out.writeVInt(nodes.length);
54+
for (NodeRestartResponse node : nodes) {
55+
node.writeTo(out);
56+
}
57+
}
58+
59+
public static class NodeRestartResponse extends NodeOperationResponse {
60+
61+
NodeRestartResponse() {
62+
}
63+
64+
public NodeRestartResponse(DiscoveryNode node) {
65+
super(node);
66+
}
67+
68+
public static NodeRestartResponse readNodeRestartResponse(StreamInput in) throws IOException {
69+
NodeRestartResponse res = new NodeRestartResponse();
70+
res.readFrom(in);
71+
return res;
72+
}
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.admin.cluster.node.restart;
21+
22+
import org.elasticsearch.ElasticSearchException;
23+
import org.elasticsearch.ElasticSearchIllegalStateException;
24+
import org.elasticsearch.action.TransportActions;
25+
import org.elasticsearch.action.support.nodes.NodeOperationRequest;
26+
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
27+
import org.elasticsearch.cluster.ClusterName;
28+
import org.elasticsearch.cluster.ClusterService;
29+
import org.elasticsearch.node.Node;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
import org.elasticsearch.transport.TransportService;
32+
import org.elasticsearch.util.TimeValue;
33+
import org.elasticsearch.util.guice.inject.Inject;
34+
import org.elasticsearch.util.io.stream.StreamInput;
35+
import org.elasticsearch.util.io.stream.StreamOutput;
36+
import org.elasticsearch.util.settings.Settings;
37+
38+
import java.io.IOException;
39+
import java.util.List;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicReferenceArray;
42+
43+
import static org.elasticsearch.util.TimeValue.*;
44+
import static org.elasticsearch.util.gcommon.collect.Lists.*;
45+
46+
/**
47+
* @author kimchy (shay.banon)
48+
*/
49+
public class TransportNodesRestartAction extends TransportNodesOperationAction<NodesRestartRequest, NodesRestartResponse, TransportNodesRestartAction.NodeRestartRequest, NodesRestartResponse.NodeRestartResponse> {
50+
51+
private final Node node;
52+
53+
private final boolean disabled;
54+
55+
@Inject public TransportNodesRestartAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
56+
ClusterService clusterService, TransportService transportService,
57+
Node node) {
58+
super(settings, clusterName, threadPool, clusterService, transportService);
59+
this.node = node;
60+
disabled = componentSettings.getAsBoolean("disabled", false);
61+
}
62+
63+
@Override protected String transportAction() {
64+
return TransportActions.Admin.Cluster.Node.RESTART;
65+
}
66+
67+
@Override protected String transportNodeAction() {
68+
return "/cluster/nodes/restart/node";
69+
}
70+
71+
@Override protected NodesRestartResponse newResponse(NodesRestartRequest nodesShutdownRequest, AtomicReferenceArray responses) {
72+
final List<NodesRestartResponse.NodeRestartResponse> nodeRestartResponses = newArrayList();
73+
for (int i = 0; i < responses.length(); i++) {
74+
Object resp = responses.get(i);
75+
if (resp instanceof NodesRestartResponse.NodeRestartResponse) {
76+
nodeRestartResponses.add((NodesRestartResponse.NodeRestartResponse) resp);
77+
}
78+
}
79+
return new NodesRestartResponse(clusterName, nodeRestartResponses.toArray(new NodesRestartResponse.NodeRestartResponse[nodeRestartResponses.size()]));
80+
}
81+
82+
@Override protected NodesRestartRequest newRequest() {
83+
return new NodesRestartRequest();
84+
}
85+
86+
@Override protected NodeRestartRequest newNodeRequest() {
87+
return new NodeRestartRequest();
88+
}
89+
90+
@Override protected NodeRestartRequest newNodeRequest(String nodeId, NodesRestartRequest request) {
91+
return new NodeRestartRequest(nodeId, request.delay);
92+
}
93+
94+
@Override protected NodesRestartResponse.NodeRestartResponse newNodeResponse() {
95+
return new NodesRestartResponse.NodeRestartResponse();
96+
}
97+
98+
@Override protected NodesRestartResponse.NodeRestartResponse nodeOperation(NodeRestartRequest request) throws ElasticSearchException {
99+
if (disabled) {
100+
throw new ElasticSearchIllegalStateException("Restart is disabled");
101+
}
102+
logger.info("Restarting in [{}]", request.delay);
103+
threadPool.schedule(new Runnable() {
104+
@Override public void run() {
105+
boolean restartWithWrapper = false;
106+
if (System.getProperty("elasticsearch-service") != null) {
107+
try {
108+
Class wrapperManager = settings.getClassLoader().loadClass("org.tanukisoftware.wrapper.WrapperManager");
109+
logger.info("Initiating requested restart (using service)");
110+
wrapperManager.getMethod("restartAndReturn").invoke(null);
111+
restartWithWrapper = true;
112+
} catch (Throwable e) {
113+
e.printStackTrace();
114+
}
115+
}
116+
if (!restartWithWrapper) {
117+
logger.info("Initiating requested restart");
118+
try {
119+
node.stop();
120+
node.start();
121+
} catch (Exception e) {
122+
logger.warn("Failed to restart", e);
123+
}
124+
}
125+
}
126+
}, request.delay.millis(), TimeUnit.MILLISECONDS);
127+
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
128+
}
129+
130+
@Override protected boolean accumulateExceptions() {
131+
return false;
132+
}
133+
134+
protected static class NodeRestartRequest extends NodeOperationRequest {
135+
136+
TimeValue delay;
137+
138+
private NodeRestartRequest() {
139+
}
140+
141+
private NodeRestartRequest(String nodeId, TimeValue delay) {
142+
super(nodeId);
143+
this.delay = delay;
144+
}
145+
146+
@Override public void readFrom(StreamInput in) throws IOException {
147+
super.readFrom(in);
148+
delay = readTimeValue(in);
149+
}
150+
151+
@Override public void writeTo(StreamOutput out) throws IOException {
152+
super.writeTo(out);
153+
delay.writeTo(out);
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)