Skip to content

Commit fbaf8c4

Browse files
Fix Transport Stopped Exception (#48930)
When a node shuts down, `TransportService` moves to stopped state and then closes connections. If a request is done in between, an exception was thrown that was not retried in replication actions. Now throw a wrapped `NodeClosedException` exception instead, which is correctly handled in replication action. Fixed other usages too. Relates #42612
1 parent 42b3cd0 commit fbaf8c4

File tree

9 files changed

+254
-32
lines changed

9 files changed

+254
-32
lines changed

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

-9
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3030
import org.elasticsearch.index.Index;
3131
import org.elasticsearch.rest.RestStatus;
32-
import org.elasticsearch.transport.TransportException;
3332

3433
import java.io.IOException;
3534
import java.io.PrintWriter;
@@ -193,14 +192,6 @@ public static Throwable unwrap(Throwable t, Class<?>... clazzes) {
193192
return null;
194193
}
195194

196-
public static boolean isTransportStoppedForAction(final Throwable t, final String action) {
197-
final TransportException maybeTransport =
198-
(TransportException) ExceptionsHelper.unwrap(t, TransportException.class);
199-
return maybeTransport != null
200-
&& (maybeTransport.getMessage().equals("TransportService is closed stopped can't send request")
201-
|| maybeTransport.getMessage().equals("transport stopped, action: " + action));
202-
}
203-
204195
/**
205196
* Throws the specified exception. If null if specified then <code>true</code> is returned.
206197
*/

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,7 @@ public String toString() {
213213

214214
private void onNoLongerPrimary(Exception failure) {
215215
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
216-
final boolean nodeIsClosing =
217-
cause instanceof NodeClosedException
218-
|| ExceptionsHelper.isTransportStoppedForAction(cause, "internal:cluster/shard/failure");
216+
final boolean nodeIsClosing = cause instanceof NodeClosedException;
219217
final String message;
220218
if (nodeIsClosing) {
221219
message = String.format(Locale.ROOT,

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
7878
import org.elasticsearch.indices.recovery.RecoveryFailedException;
7979
import org.elasticsearch.indices.recovery.RecoveryState;
80+
import org.elasticsearch.node.NodeClosedException;
8081
import org.elasticsearch.repositories.RepositoriesService;
8182
import org.elasticsearch.search.SearchService;
8283
import org.elasticsearch.snapshots.SnapshotShardsService;
@@ -334,8 +335,8 @@ public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) {
334335
ActionListener.wrap(
335336
r -> {},
336337
e -> {
337-
if (ExceptionsHelper.isTransportStoppedForAction(e, RetentionLeaseBackgroundSyncAction.ACTION_NAME + "[p]")) {
338-
// we are likely shutting down
338+
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
339+
// node shutting down
339340
return;
340341
}
341342
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {

server/src/main/java/org/elasticsearch/transport/TransportService.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4545
import org.elasticsearch.common.util.concurrent.ThreadContext;
4646
import org.elasticsearch.core.internal.io.IOUtils;
47+
import org.elasticsearch.node.NodeClosedException;
4748
import org.elasticsearch.tasks.Task;
4849
import org.elasticsearch.tasks.TaskCancelledException;
4950
import org.elasticsearch.tasks.TaskManager;
@@ -265,8 +266,8 @@ public void onFailure(Exception e) {
265266
}
266267
@Override
267268
public void doRun() {
268-
// cf. ExceptionsHelper#isTransportStoppedForAction
269-
TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action());
269+
TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(),
270+
holderToNotify.action(), new NodeClosedException(localNode));
270271
holderToNotify.handler().handleException(ex);
271272
}
272273
});
@@ -621,11 +622,8 @@ private <T extends TransportResponse> void sendRequestInternal(final Transport.C
621622
/*
622623
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
623624
* caller. It will only notify if toStop hasn't done the work yet.
624-
*
625-
* Do not edit this exception message, it is currently relied upon in production code!
626625
*/
627-
// TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction
628-
throw new TransportException("TransportService is closed stopped can't send request");
626+
throw new NodeClosedException(localNode);
629627
}
630628
if (timeoutHandler != null) {
631629
assert options.timeout() != null;

server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.node.NodeClosedException;
4545
import org.elasticsearch.test.ESTestCase;
4646
import org.elasticsearch.transport.SendRequestTransportException;
47-
import org.elasticsearch.transport.TransportException;
4847

4948
import java.util.ArrayList;
5049
import java.util.Collections;
@@ -205,12 +204,9 @@ public void testNoLongerPrimary() throws Exception {
205204
if (randomBoolean()) {
206205
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
207206
} else if (randomBoolean()) {
207+
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
208208
shardActionFailure = new SendRequestTransportException(
209-
new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), ShardStateAction.SHARD_FAILED_ACTION_NAME,
210-
new TransportException("TransportService is closed stopped can't send request"));
211-
} else if (randomBoolean()) {
212-
shardActionFailure = new TransportException(
213-
"transport stopped, action: " + ShardStateAction.SHARD_FAILED_ACTION_NAME);
209+
node, ShardStateAction.SHARD_FAILED_ACTION_NAME, new NodeClosedException(node));
214210
} else {
215211
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
216212
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.support.replication;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.ActionRequest;
24+
import org.elasticsearch.action.ActionResponse;
25+
import org.elasticsearch.action.ActionType;
26+
import org.elasticsearch.action.support.ActionFilters;
27+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
28+
import org.elasticsearch.cluster.metadata.IndexMetaData;
29+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
30+
import org.elasticsearch.cluster.service.ClusterService;
31+
import org.elasticsearch.common.inject.Inject;
32+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
33+
import org.elasticsearch.common.io.stream.StreamInput;
34+
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.util.concurrent.ThreadContext;
36+
import org.elasticsearch.index.shard.IndexShard;
37+
import org.elasticsearch.index.shard.ShardId;
38+
import org.elasticsearch.indices.IndicesService;
39+
import org.elasticsearch.plugins.ActionPlugin;
40+
import org.elasticsearch.plugins.NetworkPlugin;
41+
import org.elasticsearch.plugins.Plugin;
42+
import org.elasticsearch.plugins.PluginsService;
43+
import org.elasticsearch.test.ESIntegTestCase;
44+
import org.elasticsearch.test.InternalTestCluster;
45+
import org.elasticsearch.test.transport.MockTransportService;
46+
import org.elasticsearch.threadpool.ThreadPool;
47+
import org.elasticsearch.transport.Transport;
48+
import org.elasticsearch.transport.TransportInterceptor;
49+
import org.elasticsearch.transport.TransportRequest;
50+
import org.elasticsearch.transport.TransportRequestOptions;
51+
import org.elasticsearch.transport.TransportResponse;
52+
import org.elasticsearch.transport.TransportResponseHandler;
53+
import org.elasticsearch.transport.TransportService;
54+
import org.hamcrest.Matchers;
55+
56+
import java.io.IOException;
57+
import java.util.Collection;
58+
import java.util.List;
59+
import java.util.concurrent.CountDownLatch;
60+
import java.util.concurrent.TimeUnit;
61+
import java.util.concurrent.atomic.AtomicReference;
62+
63+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
64+
65+
66+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
67+
public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCase {
68+
69+
@Override
70+
protected Collection<Class<? extends Plugin>> nodePlugins() {
71+
return List.of(TestPlugin.class, MockTransportService.TestPlugin.class);
72+
}
73+
74+
public static class Request extends ReplicationRequest<Request> {
75+
public Request(ShardId shardId) {
76+
super(shardId);
77+
}
78+
79+
public Request(StreamInput in) throws IOException {
80+
super(in);
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return "test-request";
86+
}
87+
}
88+
89+
public static class Response extends ReplicationResponse {
90+
public Response() {
91+
}
92+
93+
public Response(StreamInput in) throws IOException {
94+
super(in);
95+
}
96+
}
97+
98+
public static class TestAction extends TransportReplicationAction<Request, Request, Response> {
99+
private static final String ACTION_NAME = "internal:test-replication-action";
100+
private static final ActionType<Response> TYPE = new ActionType<>(ACTION_NAME, Response::new);
101+
102+
@Inject
103+
public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
104+
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
105+
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
106+
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
107+
indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
108+
}
109+
110+
@Override
111+
protected Response newResponseInstance(StreamInput in) throws IOException {
112+
return new Response(in);
113+
}
114+
115+
@Override
116+
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
117+
ActionListener<PrimaryResult<Request, Response>> listener) {
118+
listener.onResponse(new PrimaryResult<>(shardRequest, new Response()));
119+
}
120+
121+
@Override
122+
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) {
123+
return new ReplicaResult();
124+
}
125+
}
126+
127+
public static class TestPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
128+
private CountDownLatch actionRunningLatch = new CountDownLatch(1);
129+
private CountDownLatch actionWaitLatch = new CountDownLatch(1);
130+
private volatile String testActionName;
131+
132+
public TestPlugin() {
133+
}
134+
135+
@Override
136+
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
137+
return List.of(new ActionHandler<>(TestAction.TYPE, TestAction.class));
138+
}
139+
140+
@Override
141+
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
142+
ThreadContext threadContext) {
143+
return List.of(new TransportInterceptor() {
144+
@Override
145+
public AsyncSender interceptSender(AsyncSender sender) {
146+
return new AsyncSender() {
147+
@Override
148+
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action,
149+
TransportRequest request, TransportRequestOptions options,
150+
TransportResponseHandler<T> handler) {
151+
// only activated on primary
152+
if (action.equals(testActionName)) {
153+
actionRunningLatch.countDown();
154+
try {
155+
actionWaitLatch.await(10, TimeUnit.SECONDS);
156+
} catch (InterruptedException e) {
157+
throw new AssertionError(e);
158+
}
159+
}
160+
sender.sendRequest(connection, action, request, options, handler);
161+
}
162+
};
163+
}
164+
});
165+
}
166+
}
167+
168+
public void testRetryOnStoppedTransportService() throws Exception {
169+
internalCluster().startMasterOnlyNodes(2);
170+
String primary = internalCluster().startDataOnlyNode();
171+
assertAcked(prepareCreate("test")
172+
.setSettings(Settings.builder()
173+
.put(indexSettings())
174+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
175+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
176+
));
177+
178+
String replica = internalCluster().startDataOnlyNode();
179+
String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
180+
ensureGreen("test");
181+
182+
TestPlugin primaryTestPlugin = getTestPlugin(primary);
183+
// this test only provoked an issue for the primary action, but for completeness, we pick the action randomly
184+
primaryTestPlugin.testActionName = TestAction.ACTION_NAME + (randomBoolean() ? "[p]" : "[r]");
185+
logger.info("--> Test action {}, primary {}, replica {}", primaryTestPlugin.testActionName, primary, replica);
186+
187+
AtomicReference<Object> response = new AtomicReference<>();
188+
CountDownLatch doneLatch = new CountDownLatch(1);
189+
client(coordinator).execute(TestAction.TYPE, new Request(new ShardId(resolveIndex("test"), 0)),
190+
ActionListener.runAfter(ActionListener.wrap(
191+
r -> assertTrue(response.compareAndSet(null, r)),
192+
e -> assertTrue(response.compareAndSet(null, e))),
193+
doneLatch::countDown));
194+
195+
assertTrue(primaryTestPlugin.actionRunningLatch.await(10, TimeUnit.SECONDS));
196+
197+
MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
198+
primary);
199+
// we pause node after TransportService has moved to stopped, but before closing connections, since if connections are closed
200+
// we would not hit the transport service closed case.
201+
primaryTransportService.addOnStopListener(() -> {
202+
primaryTestPlugin.actionWaitLatch.countDown();
203+
try {
204+
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
205+
} catch (InterruptedException e) {
206+
throw new AssertionError(e);
207+
}
208+
});
209+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
210+
211+
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
212+
if (response.get() instanceof Exception) {
213+
throw new AssertionError(response.get());
214+
}
215+
}
216+
217+
private TestPlugin getTestPlugin(String node) {
218+
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, node);
219+
List<TestPlugin> testPlugins = pluginsService.filterPlugins(TestPlugin.class);
220+
assertThat(testPlugins, Matchers.hasSize(1));
221+
return testPlugins.get(0);
222+
}
223+
}

server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.action.support.replication.ReplicationResponse;
3030
import org.elasticsearch.client.node.NodeClient;
3131
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
32+
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.common.settings.Settings;
3334
import org.elasticsearch.common.util.concurrent.ThreadContext;
3435
import org.elasticsearch.index.Index;
@@ -40,10 +41,12 @@
4041
import org.elasticsearch.index.shard.IndexShardClosedException;
4142
import org.elasticsearch.index.shard.ShardId;
4243
import org.elasticsearch.indices.IndicesService;
44+
import org.elasticsearch.node.NodeClosedException;
4345
import org.elasticsearch.tasks.Task;
4446
import org.elasticsearch.tasks.TaskManager;
4547
import org.elasticsearch.test.ESTestCase;
4648
import org.elasticsearch.threadpool.ThreadPool;
49+
import org.elasticsearch.transport.SendRequestTransportException;
4750
import org.elasticsearch.transport.TransportException;
4851
import org.elasticsearch.transport.TransportService;
4952
import org.junit.Before;
@@ -117,10 +120,11 @@ protected void doExecute(Task task, RetentionLeaseBackgroundSyncAction.Request r
117120
final Exception e = randomFrom(
118121
new AlreadyClosedException("closed"),
119122
new IndexShardClosedException(indexShard.shardId()),
120-
new TransportException(randomFrom(
121-
"failed",
122-
"TransportService is closed stopped can't send request",
123-
"transport stopped, action: indices:admin/seq_no/retention_lease_background_sync[p]")),
123+
new TransportException("failed"),
124+
new SendRequestTransportException(null, randomFrom(
125+
"some-action",
126+
"indices:admin/seq_no/retention_lease_background_sync[p]"
127+
), new NodeClosedException((DiscoveryNode) null)),
124128
new RuntimeException("failed"));
125129
listener.onFailure(e);
126130
if (e.getMessage().equals("failed")) {

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

+12
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public final class MockTransportService extends TransportService {
9292

9393
private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();
9494

95+
private final List<Runnable> onStopListeners = new CopyOnWriteArrayList<>();
96+
9597
public static class TestPlugin extends Plugin {
9698
@Override
9799
public List<Setting<?>> getSettings() {
@@ -527,6 +529,16 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi
527529
}));
528530
}
529531

532+
public void addOnStopListener(Runnable listener) {
533+
onStopListeners.add(listener);
534+
}
535+
536+
@Override
537+
protected void doStop() {
538+
onStopListeners.forEach(Runnable::run);
539+
super.doStop();
540+
}
541+
530542
@Override
531543
protected void doClose() throws IOException {
532544
super.doClose();

0 commit comments

Comments
 (0)