Skip to content

Commit b1c1daa

Browse files
committed
Add get file chunk timeouts with listener timeouts (#38758)
This commit adds a `ListenerTimeouts` class that will wrap a `ActionListener` in a listener with a timeout scheduled on the generic thread pool. If the timeout expires before the listener is completed, `onFailure` will be called with an `ElasticsearchTimeoutException`. Timeouts for the get ccr file chunk action are implemented using this functionality. Additionally, this commit attempts to fix #38027 by also blocking proxied get ccr file chunk actions. This test being un-muted is useful to verify the timeout functionality.
1 parent d80325f commit b1c1daa

File tree

4 files changed

+245
-30
lines changed

4 files changed

+245
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.support;
21+
22+
import org.elasticsearch.ElasticsearchTimeoutException;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.common.unit.TimeValue;
25+
import org.elasticsearch.threadpool.Scheduler;
26+
import org.elasticsearch.threadpool.ThreadPool;
27+
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
public class ListenerTimeouts {
31+
32+
/**
33+
* Wraps a listener with a listener that can timeout. After the timeout period the
34+
* {@link ActionListener#onFailure(Exception)} will be called with a
35+
* {@link ElasticsearchTimeoutException} if the listener has not already been completed.
36+
*
37+
* @param threadPool used to schedule the timeout
38+
* @param listener to that can timeout
39+
* @param timeout period before listener failed
40+
* @param executor to use for scheduling timeout
41+
* @param listenerName name of the listener for timeout exception
42+
* @return the wrapped listener that will timeout
43+
*/
44+
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, ActionListener<Response> listener,
45+
TimeValue timeout, String executor, String listenerName) {
46+
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName);
47+
wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor);
48+
return wrappedListener;
49+
}
50+
51+
private static class TimeoutableListener<Response> implements ActionListener<Response>, Runnable {
52+
53+
private final AtomicBoolean isDone = new AtomicBoolean(false);
54+
private final ActionListener<Response> delegate;
55+
private final TimeValue timeout;
56+
private final String listenerName;
57+
private volatile Scheduler.ScheduledCancellable cancellable;
58+
59+
private TimeoutableListener(ActionListener<Response> delegate, TimeValue timeout, String listenerName) {
60+
this.delegate = delegate;
61+
this.timeout = timeout;
62+
this.listenerName = listenerName;
63+
}
64+
65+
@Override
66+
public void onResponse(Response response) {
67+
if (isDone.compareAndSet(false, true)) {
68+
cancellable.cancel();
69+
delegate.onResponse(response);
70+
}
71+
}
72+
73+
@Override
74+
public void onFailure(Exception e) {
75+
if (isDone.compareAndSet(false, true)) {
76+
cancellable.cancel();
77+
delegate.onFailure(e);
78+
}
79+
}
80+
81+
@Override
82+
public void run() {
83+
if (isDone.compareAndSet(false, true)) {
84+
String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
85+
delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
86+
}
87+
}
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.support;
21+
22+
import org.elasticsearch.ElasticsearchTimeoutException;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.test.ESTestCase;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.junit.Before;
30+
31+
import java.io.IOException;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
35+
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
36+
import static org.hamcrest.core.IsInstanceOf.instanceOf;
37+
38+
public class ListenerTimeoutsTests extends ESTestCase {
39+
40+
private final TimeValue timeout = TimeValue.timeValueMillis(10);
41+
private final String generic = ThreadPool.Names.GENERIC;
42+
private DeterministicTaskQueue taskQueue;
43+
44+
@Before
45+
public void setUp() throws Exception {
46+
super.setUp();
47+
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
48+
taskQueue = new DeterministicTaskQueue(settings, random());
49+
}
50+
51+
public void testListenerTimeout() {
52+
AtomicBoolean success = new AtomicBoolean(false);
53+
AtomicReference<Exception> exception = new AtomicReference<>();
54+
ActionListener<Void> listener = wrap(success, exception);
55+
56+
ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
57+
assertTrue(taskQueue.hasDeferredTasks());
58+
taskQueue.advanceTime();
59+
taskQueue.runAllRunnableTasks();
60+
61+
wrapped.onResponse(null);
62+
wrapped.onFailure(new IOException("incorrect exception"));
63+
64+
assertFalse(success.get());
65+
assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class));
66+
}
67+
68+
public void testFinishNormallyBeforeTimeout() {
69+
AtomicBoolean success = new AtomicBoolean(false);
70+
AtomicReference<Exception> exception = new AtomicReference<>();
71+
ActionListener<Void> listener = wrap(success, exception);
72+
73+
ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
74+
wrapped.onResponse(null);
75+
wrapped.onFailure(new IOException("boom"));
76+
wrapped.onResponse(null);
77+
78+
assertTrue(taskQueue.hasDeferredTasks());
79+
taskQueue.advanceTime();
80+
taskQueue.runAllRunnableTasks();
81+
82+
assertTrue(success.get());
83+
assertNull(exception.get());
84+
}
85+
86+
public void testFinishExceptionallyBeforeTimeout() {
87+
AtomicBoolean success = new AtomicBoolean(false);
88+
AtomicReference<Exception> exception = new AtomicReference<>();
89+
ActionListener<Void> listener = wrap(success, exception);
90+
91+
ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
92+
wrapped.onFailure(new IOException("boom"));
93+
94+
assertTrue(taskQueue.hasDeferredTasks());
95+
taskQueue.advanceTime();
96+
taskQueue.runAllRunnableTasks();
97+
98+
assertFalse(success.get());
99+
assertThat(exception.get(), instanceOf(IOException.class));
100+
}
101+
102+
private ActionListener<Void> wrap(AtomicBoolean success, AtomicReference<Exception> exception) {
103+
return new ActionListener<Void>() {
104+
105+
private final AtomicBoolean completed = new AtomicBoolean();
106+
107+
@Override
108+
public void onResponse(Void aVoid) {
109+
assertTrue(completed.compareAndSet(false, true));
110+
assertTrue(success.compareAndSet(false, true));
111+
}
112+
113+
@Override
114+
public void onFailure(Exception e) {
115+
assertTrue(completed.compareAndSet(false, true));
116+
assertTrue(exception.compareAndSet(null, e));
117+
}
118+
};
119+
}
120+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1717
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1818
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
19+
import org.elasticsearch.action.support.ListenerTimeouts;
1920
import org.elasticsearch.action.support.PlainActionFuture;
2021
import org.elasticsearch.client.Client;
2122
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -103,7 +104,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
103104
private final ThreadPool threadPool;
104105

105106
private final CounterMetric throttledTime = new CounterMetric();
106-
107+
107108
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
108109
CcrSettings ccrSettings, ThreadPool threadPool) {
109110
this.metadata = metadata;
@@ -377,7 +378,8 @@ void restoreFiles() throws IOException {
377378
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
378379
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
379380

380-
try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) {
381+
try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
382+
})) {
381383
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
382384
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
383385

@@ -403,8 +405,9 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
403405
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
404406
fileInfo.name(), offset, bytesRequested);
405407

406-
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
407-
ActionListener.wrap(
408+
TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
409+
ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener =
410+
ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap(
408411
r -> threadPool.generic().execute(new AbstractRunnable() {
409412
@Override
410413
public void onFailure(Exception e) {
@@ -428,7 +431,8 @@ protected void doRun() throws Exception {
428431
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
429432
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
430433
}
431-
));
434+
), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
435+
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
432436
} catch (Exception e) {
433437
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
434438
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

+27-25
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.snapshots.RestoreService;
3838
import org.elasticsearch.snapshots.Snapshot;
3939
import org.elasticsearch.test.transport.MockTransportService;
40+
import org.elasticsearch.transport.TransportActionProxy;
4041
import org.elasticsearch.transport.TransportService;
4142
import org.elasticsearch.xpack.CcrIntegTestCase;
4243
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
@@ -292,7 +293,6 @@ public void testRateLimitingIsEmployed() throws Exception {
292293
}
293294
}
294295

295-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38027")
296296
public void testIndividualActionsTimeout() throws Exception {
297297
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
298298
TimeValue timeValue = TimeValue.timeValueMillis(100);
@@ -315,7 +315,8 @@ public void testIndividualActionsTimeout() throws Exception {
315315
MockTransportService mockTransportService = (MockTransportService) transportService;
316316
transportServices.add(mockTransportService);
317317
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
318-
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) {
318+
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false &&
319+
action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) {
319320
connection.sendRequest(requestId, action, request, options);
320321
}
321322
});
@@ -337,33 +338,34 @@ public void testIndividualActionsTimeout() throws Exception {
337338
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
338339
.indexSettings(settingsBuilder);
339340

340-
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
341-
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
342-
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
343-
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
344-
345-
// Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
346-
// metadata this will throw an exception. If it times-out when restoring a shard, the shard will
347-
// be marked as failed. Either one is a success for the purpose of this test.
348341
try {
349-
RestoreInfo restoreInfo = future.actionGet();
350-
assertThat(restoreInfo.failedShards(), greaterThan(0));
351-
assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
352-
assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
353-
} catch (Exception e) {
354-
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
355-
}
342+
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
343+
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
344+
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
345+
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
356346

347+
// Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
348+
// metadata this will throw an exception. If it times-out when restoring a shard, the shard will
349+
// be marked as failed. Either one is a success for the purpose of this test.
350+
try {
351+
RestoreInfo restoreInfo = future.actionGet();
352+
assertThat(restoreInfo.failedShards(), greaterThan(0));
353+
assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
354+
assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
355+
} catch (Exception e) {
356+
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
357+
}
358+
} finally {
359+
for (MockTransportService transportService : transportServices) {
360+
transportService.clearAllRules();
361+
}
357362

358-
for (MockTransportService transportService : transportServices) {
359-
transportService.clearAllRules();
363+
settingsRequest = new ClusterUpdateSettingsRequest();
364+
TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
365+
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
366+
defaultValue));
367+
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
360368
}
361-
362-
settingsRequest = new ClusterUpdateSettingsRequest();
363-
TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
364-
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
365-
defaultValue));
366-
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
367369
}
368370

369371
public void testFollowerMappingIsUpdated() throws IOException {

0 commit comments

Comments
 (0)