Skip to content

Commit 193740c

Browse files
Simplify ClusterStateUpdateTask Timeout Handling (#64117) (#64313)
It's confusing and slightly error prone (see #64116) to handle the timeouts via overrides but the priority via a field. This simplifies the code to to avoid future issues and save over 100 LOC. Also this fixes a bug in `TransportVotingConfigExclusionsAction` where trying to instantiate a time value with a negative time could throw and unexpected exception and as a result leak a listener.
1 parent 5b06581 commit 193740c

File tree

15 files changed

+81
-208
lines changed

15 files changed

+81
-208
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java

+11-44
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
4747
public class ClusterServiceIT extends ESIntegTestCase {
4848

49+
private static final TimeValue TEN_SECONDS = TimeValue.timeValueSeconds(10L);
50+
4951
public void testAckedUpdateTask() throws Exception {
5052
internalCluster().startNode();
5153
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
@@ -56,7 +58,8 @@ public void testAckedUpdateTask() throws Exception {
5658
final AtomicBoolean executed = new AtomicBoolean(false);
5759
final CountDownLatch latch = new CountDownLatch(1);
5860
final CountDownLatch processedLatch = new CountDownLatch(1);
59-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
61+
clusterService.submitStateUpdateTask("test",
62+
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
6063
@Override
6164
protected Void newResponse(boolean acknowledged) {
6265
return null;
@@ -79,16 +82,6 @@ public void onAckTimeout() {
7982
latch.countDown();
8083
}
8184

82-
@Override
83-
public TimeValue ackTimeout() {
84-
return TimeValue.timeValueSeconds(10);
85-
}
86-
87-
@Override
88-
public TimeValue timeout() {
89-
return TimeValue.timeValueSeconds(10);
90-
}
91-
9285
@Override
9386
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
9487
processedLatch.countDown();
@@ -129,7 +122,8 @@ public void testAckedUpdateTaskSameClusterState() throws Exception {
129122
final AtomicBoolean executed = new AtomicBoolean(false);
130123
final CountDownLatch latch = new CountDownLatch(1);
131124
final CountDownLatch processedLatch = new CountDownLatch(1);
132-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
125+
clusterService.submitStateUpdateTask("test",
126+
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
133127
@Override
134128
protected Void newResponse(boolean acknowledged) {
135129
return null;
@@ -147,16 +141,6 @@ public void onAckTimeout() {
147141
latch.countDown();
148142
}
149143

150-
@Override
151-
public TimeValue ackTimeout() {
152-
return TimeValue.timeValueSeconds(10);
153-
}
154-
155-
@Override
156-
public TimeValue timeout() {
157-
return TimeValue.timeValueSeconds(10);
158-
}
159-
160144
@Override
161145
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
162146
processedLatch.countDown();
@@ -196,7 +180,9 @@ public void testAckedUpdateTaskNoAckExpected() throws Exception {
196180
final AtomicBoolean onFailure = new AtomicBoolean(false);
197181
final AtomicBoolean executed = new AtomicBoolean(false);
198182
final CountDownLatch latch = new CountDownLatch(1);
199-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
183+
184+
clusterService.submitStateUpdateTask(
185+
"test", new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) {
200186
@Override
201187
protected Void newResponse(boolean acknowledged) {
202188
return null;
@@ -219,16 +205,6 @@ public void onAckTimeout() {
219205
latch.countDown();
220206
}
221207

222-
@Override
223-
public TimeValue ackTimeout() {
224-
return TimeValue.timeValueSeconds(10);
225-
}
226-
227-
@Override
228-
public TimeValue timeout() {
229-
return TimeValue.timeValueSeconds(10);
230-
}
231-
232208
@Override
233209
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
234210
}
@@ -266,7 +242,8 @@ public void testAckedUpdateTaskTimeoutZero() throws Exception {
266242
final AtomicBoolean executed = new AtomicBoolean(false);
267243
final CountDownLatch latch = new CountDownLatch(1);
268244
final CountDownLatch processedLatch = new CountDownLatch(1);
269-
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask<Void>(null, null) {
245+
clusterService.submitStateUpdateTask("test",
246+
new AckedClusterStateUpdateTask<Void>(MasterServiceTests.ackedRequest(TimeValue.ZERO, TEN_SECONDS), null) {
270247
@Override
271248
protected Void newResponse(boolean acknowledged) {
272249
return null;
@@ -289,16 +266,6 @@ public void onAckTimeout() {
289266
latch.countDown();
290267
}
291268

292-
@Override
293-
public TimeValue ackTimeout() {
294-
return TimeValue.timeValueSeconds(0);
295-
}
296-
297-
@Override
298-
public TimeValue timeout() {
299-
return TimeValue.timeValueSeconds(10);
300-
}
301-
302269
@Override
303270
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
304271
processedLatch.countDown();

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ public void onTimeout(TimeValue timeout) {
104104

105105
private void submitClearVotingConfigExclusionsTask(ClearVotingConfigExclusionsRequest request, long startTimeMillis,
106106
ActionListener<ClearVotingConfigExclusionsResponse> listener) {
107-
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) {
107+
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT,
108+
TimeValue.timeValueMillis(
109+
Math.max(0, request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis()))) {
108110
@Override
109111
public ClusterState execute(ClusterState currentState) {
110112
final CoordinationMetadata newCoordinationMetadata =
@@ -119,11 +121,6 @@ public void onFailure(String source, Exception e) {
119121
listener.onFailure(e);
120122
}
121123

122-
@Override
123-
public TimeValue timeout() {
124-
return TimeValue.timeValueMillis(request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis());
125-
}
126-
127124
@Override
128125
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
129126
listener.onResponse(new ClearVotingConfigExclusionsResponse());

server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -127,17 +127,12 @@ public void onFailure(String source, Exception e) {
127127
} else {
128128
final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
129129
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
130-
new ClusterStateUpdateTask(request.waitForEvents()) {
130+
new ClusterStateUpdateTask(request.waitForEvents(), taskTimeout) {
131131
@Override
132132
public ClusterState execute(ClusterState currentState) {
133133
return currentState;
134134
}
135135

136-
@Override
137-
public TimeValue timeout() {
138-
return taskTimeout;
139-
}
140-
141136
@Override
142137
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
143138
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());

server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo
3939
}
4040

4141
protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener<Response> listener) {
42-
super(priority);
42+
super(priority, request.masterNodeTimeout());
4343
this.listener = listener;
4444
this.request = request;
4545
}
@@ -82,12 +82,7 @@ public void onFailure(String source, Exception e) {
8282
/**
8383
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
8484
*/
85-
public TimeValue ackTimeout() {
85+
public final TimeValue ackTimeout() {
8686
return request.ackTimeout();
8787
}
88-
89-
@Override
90-
public TimeValue timeout() {
91-
return request.masterNodeTimeout();
92-
}
9388
}

server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,24 @@ public abstract class ClusterStateUpdateTask
3333

3434
private final Priority priority;
3535

36+
@Nullable
37+
private final TimeValue timeout;
38+
3639
public ClusterStateUpdateTask() {
3740
this(Priority.NORMAL);
3841
}
3942

4043
public ClusterStateUpdateTask(Priority priority) {
44+
this(priority, null);
45+
}
46+
47+
public ClusterStateUpdateTask(TimeValue timeout) {
48+
this(Priority.NORMAL, timeout);
49+
}
50+
51+
public ClusterStateUpdateTask(Priority priority, TimeValue timeout) {
4152
this.priority = priority;
53+
this.timeout = timeout;
4254
}
4355

4456
@Override
@@ -75,12 +87,12 @@ public final void clusterStatePublished(ClusterChangedEvent clusterChangedEvent)
7587
* {@link ClusterStateTaskListener#onFailure(String, Exception)}. May return null to indicate no timeout is needed (default).
7688
*/
7789
@Nullable
78-
public TimeValue timeout() {
79-
return null;
90+
public final TimeValue timeout() {
91+
return timeout;
8092
}
8193

8294
@Override
83-
public Priority priority() {
95+
public final Priority priority() {
8496
return priority;
8597
}
8698

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

+2-13
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import org.elasticsearch.common.inject.Inject;
6464
import org.elasticsearch.common.settings.Setting;
6565
import org.elasticsearch.common.settings.Settings;
66-
import org.elasticsearch.common.unit.TimeValue;
6766
import org.elasticsearch.common.util.concurrent.AtomicArray;
6867
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
6968
import org.elasticsearch.common.util.concurrent.CountDown;
@@ -160,7 +159,7 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
160159
}
161160

162161
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
163-
new ClusterStateUpdateTask(Priority.URGENT) {
162+
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
164163

165164
private final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
166165

@@ -235,11 +234,6 @@ public void clusterStateProcessed(final String source,
235234
public void onFailure(final String source, final Exception e) {
236235
listener.onFailure(e);
237236
}
238-
239-
@Override
240-
public TimeValue timeout() {
241-
return request.masterNodeTimeout();
242-
}
243237
}
244238
);
245239
}
@@ -411,7 +405,7 @@ public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request,
411405
}
412406

413407
clusterService.submitStateUpdateTask("add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices),
414-
new ClusterStateUpdateTask(Priority.URGENT) {
408+
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
415409

416410
private Map<Index, ClusterBlock> blockedIndices;
417411

@@ -473,11 +467,6 @@ public void clusterStateProcessed(final String source,
473467
public void onFailure(final String source, final Exception e) {
474468
listener.onFailure(e);
475469
}
476-
477-
@Override
478-
public TimeValue timeout() {
479-
return request.masterNodeTimeout();
480-
}
481470
}
482471
);
483472
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

+7-36
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,8 @@ public MetadataIndexTemplateService(ClusterService clusterService,
120120
}
121121

122122
public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
123-
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT) {
124-
125-
@Override
126-
public TimeValue timeout() {
127-
return request.masterTimeout;
128-
}
123+
clusterService.submitStateUpdateTask(
124+
"remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {
129125

130126
@Override
131127
public void onFailure(String source, Exception e) {
@@ -171,12 +167,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
171167
public void putComponentTemplate(final String cause, final boolean create, final String name, final TimeValue masterTimeout,
172168
final ComponentTemplate template, final ActionListener<AcknowledgedResponse> listener) {
173169
clusterService.submitStateUpdateTask("create-component-template [" + name + "], cause [" + cause + "]",
174-
new ClusterStateUpdateTask(Priority.URGENT) {
175-
176-
@Override
177-
public TimeValue timeout() {
178-
return masterTimeout;
179-
}
170+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
180171

181172
@Override
182173
public void onFailure(String source, Exception e) {
@@ -305,12 +296,7 @@ public void removeComponentTemplate(final String name, final TimeValue masterTim
305296
final ActionListener<AcknowledgedResponse> listener) {
306297
validateNotInUse(clusterService.state().metadata(), name);
307298
clusterService.submitStateUpdateTask("remove-component-template [" + name + "]",
308-
new ClusterStateUpdateTask(Priority.URGENT) {
309-
310-
@Override
311-
public TimeValue timeout() {
312-
return masterTimeout;
313-
}
299+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
314300

315301
@Override
316302
public void onFailure(String source, Exception e) {
@@ -384,12 +370,7 @@ public void putIndexTemplateV2(final String cause, final boolean create, final S
384370
final ComposableIndexTemplate template, final ActionListener<AcknowledgedResponse> listener) {
385371
validateV2TemplateRequest(clusterService.state().metadata(), name, template);
386372
clusterService.submitStateUpdateTask("create-index-template-v2 [" + name + "], cause [" + cause + "]",
387-
new ClusterStateUpdateTask(Priority.URGENT) {
388-
389-
@Override
390-
public TimeValue timeout() {
391-
return masterTimeout;
392-
}
373+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
393374

394375
@Override
395376
public void onFailure(String source, Exception e) {
@@ -639,12 +620,7 @@ static Map<String, List<String>> findConflictingV2Templates(final ClusterState s
639620
public void removeIndexTemplateV2(final String name, final TimeValue masterTimeout,
640621
final ActionListener<AcknowledgedResponse> listener) {
641622
clusterService.submitStateUpdateTask("remove-index-template-v2 [" + name + "]",
642-
new ClusterStateUpdateTask(Priority.URGENT) {
643-
644-
@Override
645-
public TimeValue timeout() {
646-
return masterTimeout;
647-
}
623+
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
648624

649625
@Override
650626
public void onFailure(String source, Exception e) {
@@ -734,12 +710,7 @@ public void putTemplate(final PutRequest request, final PutListener listener) {
734710
final IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder(request.name);
735711

736712
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]",
737-
new ClusterStateUpdateTask(Priority.URGENT) {
738-
739-
@Override
740-
public TimeValue timeout() {
741-
return request.masterTimeout;
742-
}
713+
new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {
743714

744715
@Override
745716
public void onFailure(String source, Exception e) {

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import org.elasticsearch.common.settings.Settings;
7878
import org.elasticsearch.common.unit.ByteSizeUnit;
7979
import org.elasticsearch.common.unit.ByteSizeValue;
80-
import org.elasticsearch.common.unit.TimeValue;
8180
import org.elasticsearch.common.util.BigArrays;
8281
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
8382
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -361,7 +360,7 @@ public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUp
361360
final RepositoryMetadata repositoryMetadataStart = metadata;
362361
getRepositoryData(ActionListener.wrap(repositoryData -> {
363362
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
364-
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
363+
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority(), updateTask.timeout()) {
365364

366365
private boolean executedTask = false;
367366

@@ -397,11 +396,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
397396
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
398397
}
399398
}
400-
401-
@Override
402-
public TimeValue timeout() {
403-
return updateTask.timeout();
404-
}
405399
});
406400
}, onFailure));
407401
}

0 commit comments

Comments
 (0)