Skip to content

Commit ff0b6c7

Browse files
authored
Decouple ClusterStateTaskListener & ClusterApplier (#30809)
Today, the `ClusterApplier` and `MasterService` both use the `ClusterStateTaskListener` interface to notify their callers when asynchronous activities have completed. However, this is not wholly appropriate: none of the callers into the `ClusterApplier` care about the `ClusterState` arguments that they receive. This change introduces a dedicated ClusterApplyListener interface for callers into the `ClusterApplier`, to distinguish these listeners from the real `ClusterStateTaskListener`s that are waiting for responses from the `MasterService`.
1 parent 0bdfb5c commit ff0b6c7

File tree

9 files changed

+71
-61
lines changed

9 files changed

+71
-61
lines changed

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.cluster.service;
2121

2222
import org.elasticsearch.cluster.ClusterState;
23-
import org.elasticsearch.cluster.ClusterStateTaskListener;
2423

2524
import java.util.function.Supplier;
2625

@@ -38,11 +37,29 @@ public interface ClusterApplier {
3837
* @param clusterStateSupplier the cluster state supplier which provides the latest cluster state to apply
3938
* @param listener callback that is invoked after cluster state is applied
4039
*/
41-
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);
40+
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);
4241

4342
/**
4443
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
4544
*/
4645
ClusterState.Builder newClusterStateBuilder();
4746

47+
/**
48+
* Listener for results of cluster state application
49+
*/
50+
interface ClusterApplyListener {
51+
/**
52+
* Called on successful cluster state application
53+
* @param source information where the cluster state came from
54+
*/
55+
default void onSuccess(String source) {
56+
}
57+
58+
/**
59+
* Called on failure during cluster state application
60+
* @param source information where the cluster state came from
61+
* @param e exception that occurred
62+
*/
63+
void onFailure(String source, Exception e);
64+
}
4865
}

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.cluster.ClusterStateListener;
2828
import org.elasticsearch.cluster.ClusterStateObserver;
2929
import org.elasticsearch.cluster.ClusterStateTaskConfig;
30-
import org.elasticsearch.cluster.ClusterStateTaskListener;
3130
import org.elasticsearch.cluster.LocalNodeMasterListener;
3231
import org.elasticsearch.cluster.NodeConnectionsService;
3332
import org.elasticsearch.cluster.TimeoutClusterStateListener;
@@ -141,10 +140,10 @@ protected synchronized void doStart() {
141140
}
142141

143142
class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {
144-
final ClusterStateTaskListener listener;
143+
final ClusterApplyListener listener;
145144
final Function<ClusterState, ClusterState> updateFunction;
146145

147-
UpdateTask(Priority priority, String source, ClusterStateTaskListener listener,
146+
UpdateTask(Priority priority, String source, ClusterApplyListener listener,
148147
Function<ClusterState, ClusterState> updateFunction) {
149148
super(priority, source);
150149
this.listener = listener;
@@ -301,7 +300,7 @@ public void run() {
301300
}
302301

303302
public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
304-
final ClusterStateTaskListener listener, Priority priority) {
303+
final ClusterApplyListener listener, Priority priority) {
305304
submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority),
306305
(clusterState) -> {
307306
clusterStateConsumer.accept(clusterState);
@@ -311,13 +310,13 @@ public void runOnApplierThread(final String source, Consumer<ClusterState> clust
311310
}
312311

313312
public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
314-
final ClusterStateTaskListener listener) {
313+
final ClusterApplyListener listener) {
315314
runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
316315
}
317316

318317
@Override
319318
public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
320-
final ClusterStateTaskListener listener) {
319+
final ClusterApplyListener listener) {
321320
Function<ClusterState, ClusterState> applyFunction = currentState -> {
322321
ClusterState nextState = clusterStateSupplier.get();
323322
if (nextState != null) {
@@ -331,12 +330,12 @@ public void onNewClusterState(final String source, final Supplier<ClusterState>
331330

332331
private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
333332
final Function<ClusterState, ClusterState> executor,
334-
final ClusterStateTaskListener listener) {
333+
final ClusterApplyListener listener) {
335334
if (!lifecycle.started()) {
336335
return;
337336
}
338337
try {
339-
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
338+
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
340339
if (config.timeout() != null) {
341340
threadPoolExecutor.execute(updateTask, config.timeout(),
342341
() -> threadPool.generic().execute(
@@ -417,7 +416,7 @@ protected void runTask(UpdateTask task) {
417416
}
418417

419418
if (previousClusterState == newClusterState) {
420-
task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
419+
task.listener.onSuccess(task.source);
421420
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
422421
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
423422
warnAboutSlowTaskIfNeeded(executionTime, task.source);
@@ -486,7 +485,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
486485

487486
callClusterStateListeners(clusterChangedEvent);
488487

489-
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
488+
task.listener.onSuccess(task.source);
490489
}
491490

492491
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
@@ -511,11 +510,11 @@ private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent)
511510
});
512511
}
513512

514-
private static class SafeClusterStateTaskListener implements ClusterStateTaskListener {
515-
private final ClusterStateTaskListener listener;
513+
private static class SafeClusterApplyListener implements ClusterApplyListener {
514+
private final ClusterApplyListener listener;
516515
private final Logger logger;
517516

518-
SafeClusterStateTaskListener(ClusterStateTaskListener listener, Logger logger) {
517+
SafeClusterApplyListener(ClusterApplyListener listener, Logger logger) {
519518
this.listener = listener;
520519
this.logger = logger;
521520
}
@@ -532,14 +531,12 @@ public void onFailure(String source, Exception e) {
532531
}
533532

534533
@Override
535-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
534+
public void onSuccess(String source) {
536535
try {
537-
listener.clusterStateProcessed(source, oldState, newState);
536+
listener.onSuccess(source);
538537
} catch (Exception e) {
539538
logger.error(new ParameterizedMessage(
540-
"exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" +
541-
"{}\nnew cluster state:\n{}",
542-
source, oldState, newState), e);
539+
"exception thrown by listener while notifying of cluster state processed from [{}]", source), e);
543540
}
544541
}
545542
}

server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,16 @@
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.cluster.ClusterChangedEvent;
2424
import org.elasticsearch.cluster.ClusterState;
25-
import org.elasticsearch.cluster.ClusterStateTaskListener;
2625
import org.elasticsearch.cluster.block.ClusterBlocks;
2726
import org.elasticsearch.cluster.node.DiscoveryNode;
2827
import org.elasticsearch.cluster.node.DiscoveryNodes;
2928
import org.elasticsearch.cluster.service.ClusterApplier;
29+
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
3030
import org.elasticsearch.cluster.service.MasterService;
3131
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3232
import org.elasticsearch.common.settings.Settings;
3333
import org.elasticsearch.discovery.Discovery;
3434
import org.elasticsearch.discovery.DiscoveryStats;
35-
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
36-
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
3735
import org.elasticsearch.transport.TransportService;
3836

3937
import java.io.IOException;
@@ -65,9 +63,9 @@ public synchronized void publish(final ClusterChangedEvent event,
6563
clusterState = event.state();
6664
CountDownLatch latch = new CountDownLatch(1);
6765

68-
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
66+
ClusterApplyListener listener = new ClusterApplyListener() {
6967
@Override
70-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
68+
public void onSuccess(String source) {
7169
latch.countDown();
7270
ackListener.onNodeAck(transportService.getLocalNode(), null);
7371
}

server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
24-
import org.apache.logging.log4j.util.Supplier;
2524
import org.elasticsearch.core.internal.io.IOUtils;
2625
import org.elasticsearch.ElasticsearchException;
2726
import org.elasticsearch.ExceptionsHelper;
@@ -34,12 +33,11 @@
3433
import org.elasticsearch.cluster.ClusterStateTaskListener;
3534
import org.elasticsearch.cluster.NotMasterException;
3635
import org.elasticsearch.cluster.block.ClusterBlocks;
37-
import org.elasticsearch.cluster.metadata.IndexMetaData;
38-
import org.elasticsearch.cluster.metadata.MetaData;
3936
import org.elasticsearch.cluster.node.DiscoveryNode;
4037
import org.elasticsearch.cluster.node.DiscoveryNodes;
4138
import org.elasticsearch.cluster.routing.allocation.AllocationService;
4239
import org.elasticsearch.cluster.service.ClusterApplier;
40+
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
4341
import org.elasticsearch.cluster.service.MasterService;
4442
import org.elasticsearch.common.Priority;
4543
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@@ -789,9 +787,9 @@ boolean processNextCommittedClusterState(String reason) {
789787

790788
clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
791789
this::clusterState,
792-
new ClusterStateTaskListener() {
790+
new ClusterApplyListener() {
793791
@Override
794-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
792+
public void onSuccess(String source) {
795793
try {
796794
pendingStatesQueue.markAsProcessed(newClusterState);
797795
} catch (Exception e) {

server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.block.ClusterBlocks;
3131
import org.elasticsearch.cluster.node.DiscoveryNode;
3232
import org.elasticsearch.cluster.node.DiscoveryNodes;
33+
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
3334
import org.elasticsearch.common.logging.Loggers;
3435
import org.elasticsearch.common.settings.ClusterSettings;
3536
import org.elasticsearch.common.settings.Settings;
@@ -135,9 +136,9 @@ public void testClusterStateUpdateLogging() throws Exception {
135136
clusterApplierService.currentTimeOverride = System.nanoTime();
136137
clusterApplierService.runOnApplierThread("test1",
137138
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
138-
new ClusterStateTaskListener() {
139+
new ClusterApplyListener() {
139140
@Override
140-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
141+
public void onSuccess(String source) {
141142
latch.countDown();
142143
}
143144

@@ -151,9 +152,9 @@ public void onFailure(String source, Exception e) {
151152
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
152153
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
153154
},
154-
new ClusterStateTaskListener() {
155+
new ClusterApplyListener() {
155156
@Override
156-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
157+
public void onSuccess(String source) {
157158
fail();
158159
}
159160

@@ -166,9 +167,9 @@ public void onFailure(String source, Exception e) {
166167
// We don't check logging for this on since there is no guarantee that it will occur before our check
167168
clusterApplierService.runOnApplierThread("test3",
168169
currentState -> {},
169-
new ClusterStateTaskListener() {
170+
new ClusterApplyListener() {
170171
@Override
171-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
172+
public void onSuccess(String source) {
172173
latch.countDown();
173174
}
174175

@@ -216,9 +217,9 @@ public void testLongClusterStateUpdateLogging() throws Exception {
216217
clusterApplierService.currentTimeOverride = System.nanoTime();
217218
clusterApplierService.runOnApplierThread("test1",
218219
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
219-
new ClusterStateTaskListener() {
220+
new ClusterApplyListener() {
220221
@Override
221-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
222+
public void onSuccess(String source) {
222223
latch.countDown();
223224
processedFirstTask.countDown();
224225
}
@@ -234,9 +235,9 @@ public void onFailure(String source, Exception e) {
234235
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
235236
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
236237
},
237-
new ClusterStateTaskListener() {
238+
new ClusterApplyListener() {
238239
@Override
239-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
240+
public void onSuccess(String source) {
240241
fail();
241242
}
242243

@@ -247,9 +248,9 @@ public void onFailure(String source, Exception e) {
247248
});
248249
clusterApplierService.runOnApplierThread("test3",
249250
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(),
250-
new ClusterStateTaskListener() {
251+
new ClusterApplyListener() {
251252
@Override
252-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
253+
public void onSuccess(String source) {
253254
latch.countDown();
254255
}
255256

@@ -262,9 +263,9 @@ public void onFailure(String source, Exception e) {
262263
// We don't check logging for this on since there is no guarantee that it will occur before our check
263264
clusterApplierService.runOnApplierThread("test4",
264265
currentState -> {},
265-
new ClusterStateTaskListener() {
266+
new ClusterApplyListener() {
266267
@Override
267-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
268+
public void onSuccess(String source) {
268269
latch.countDown();
269270
}
270271

@@ -340,10 +341,10 @@ public void testClusterStateApplierCantSampleClusterState() throws InterruptedEx
340341

341342
CountDownLatch latch = new CountDownLatch(1);
342343
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
343-
new ClusterStateTaskListener() {
344+
new ClusterApplyListener() {
344345

345346
@Override
346-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
347+
public void onSuccess(String source) {
347348
latch.countDown();
348349
}
349350

@@ -390,9 +391,9 @@ public void onTimeout(TimeValue timeout) {
390391

391392
CountDownLatch latch = new CountDownLatch(1);
392393
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
393-
new ClusterStateTaskListener() {
394+
new ClusterApplyListener() {
394395
@Override
395-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
396+
public void onSuccess(String source) {
396397
latch.countDown();
397398
}
398399

server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.Version;
2424
import org.elasticsearch.cluster.ClusterName;
2525
import org.elasticsearch.cluster.ClusterState;
26-
import org.elasticsearch.cluster.ClusterStateTaskListener;
2726
import org.elasticsearch.cluster.node.DiscoveryNode;
2827
import org.elasticsearch.cluster.node.DiscoveryNodes;
2928
import org.elasticsearch.cluster.service.ClusterApplier;
@@ -72,9 +71,9 @@ public ClusterState.Builder newClusterStateBuilder() {
7271

7372
@Override
7473
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier,
75-
ClusterStateTaskListener listener) {
74+
ClusterApplyListener listener) {
7675
clusterState.set(clusterStateSupplier.get());
77-
listener.clusterStateProcessed(source, clusterState.get(), clusterState.get());
76+
listener.onSuccess(source);
7877
}
7978
});
8079
discovery.start();

server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.cluster.ClusterModule;
2727
import org.elasticsearch.cluster.ClusterName;
2828
import org.elasticsearch.cluster.ClusterState;
29-
import org.elasticsearch.cluster.ClusterStateTaskListener;
3029
import org.elasticsearch.cluster.ESAllocationTestCase;
3130
import org.elasticsearch.cluster.metadata.IndexMetaData;
3231
import org.elasticsearch.cluster.metadata.MetaData;
@@ -314,8 +313,8 @@ public ClusterState.Builder newClusterStateBuilder() {
314313
}
315314

316315
@Override
317-
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener) {
318-
listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get());
316+
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
317+
listener.onSuccess(source);
319318
}
320319
};
321320
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),

0 commit comments

Comments
 (0)