Skip to content

Commit 9e9cf7a

Browse files
Extract TransportRequestDeduplication from ShardStateAction (elastic#37870)
* Extracted the logic for master request duplication so it can be reused by the snapshotting logic * Removed custom listener used by `ShardStateAction` to not leak these into future users of this class * Changed semantics slightly to get rid of redundant instantiations of the composite listener * Relates elastic#37686
1 parent 6d9434f commit 9e9cf7a

File tree

7 files changed

+241
-206
lines changed

7 files changed

+241
-206
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,12 +1196,12 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R
11961196
onSuccess.run();
11971197
}
11981198

1199-
protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess,
1199+
protected final ActionListener<Void> createShardActionListener(final Runnable onSuccess,
12001200
final Consumer<Exception> onPrimaryDemoted,
12011201
final Consumer<Exception> onIgnoredFailure) {
1202-
return new ShardStateAction.Listener() {
1202+
return new ActionListener<Void>() {
12031203
@Override
1204-
public void onSuccess() {
1204+
public void onResponse(Void aVoid) {
12051205
onSuccess.run();
12061206
}
12071207

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 16 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.ElasticsearchException;
2626
import org.elasticsearch.ExceptionsHelper;
2727
import org.elasticsearch.Version;
28+
import org.elasticsearch.action.ActionListener;
2829
import org.elasticsearch.cluster.ClusterChangedEvent;
2930
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.ClusterStateObserver;
@@ -47,18 +48,17 @@
4748
import org.elasticsearch.common.io.stream.StreamInput;
4849
import org.elasticsearch.common.io.stream.StreamOutput;
4950
import org.elasticsearch.common.unit.TimeValue;
50-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
5151
import org.elasticsearch.discovery.Discovery;
5252
import org.elasticsearch.index.shard.ShardId;
5353
import org.elasticsearch.node.NodeClosedException;
5454
import org.elasticsearch.threadpool.ThreadPool;
5555
import org.elasticsearch.transport.ConnectTransportException;
5656
import org.elasticsearch.transport.EmptyTransportResponseHandler;
57-
import org.elasticsearch.transport.NodeDisconnectedException;
5857
import org.elasticsearch.transport.RemoteTransportException;
5958
import org.elasticsearch.transport.TransportChannel;
6059
import org.elasticsearch.transport.TransportException;
6160
import org.elasticsearch.transport.TransportRequest;
61+
import org.elasticsearch.transport.TransportRequestDeduplicator;
6262
import org.elasticsearch.transport.TransportRequestHandler;
6363
import org.elasticsearch.transport.TransportResponse;
6464
import org.elasticsearch.transport.TransportService;
@@ -70,7 +70,6 @@
7070
import java.util.Locale;
7171
import java.util.Objects;
7272
import java.util.Set;
73-
import java.util.concurrent.ConcurrentMap;
7473
import java.util.function.Predicate;
7574

7675
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@@ -88,7 +87,7 @@ public class ShardStateAction {
8887

8988
// a list of shards that failed during replication
9089
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
91-
private final ConcurrentMap<FailedShardEntry, CompositeListener> remoteFailedShardsCache = ConcurrentCollections.newConcurrentMap();
90+
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
9291

9392
@Inject
9493
public ShardStateAction(ClusterService clusterService, TransportService transportService,
@@ -105,7 +104,7 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
105104
}
106105

107106
private void sendShardAction(final String actionName, final ClusterState currentState,
108-
final TransportRequest request, final Listener listener) {
107+
final TransportRequest request, final ActionListener<Void> listener) {
109108
ClusterStateObserver observer =
110109
new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
111110
DiscoveryNode masterNode = currentState.nodes().getMasterNode();
@@ -119,7 +118,7 @@ private void sendShardAction(final String actionName, final ClusterState current
119118
actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
120119
@Override
121120
public void handleResponse(TransportResponse.Empty response) {
122-
listener.onSuccess();
121+
listener.onResponse(null);
123122
}
124123

125124
@Override
@@ -162,60 +161,39 @@ private static boolean isMasterChannelException(TransportException exp) {
162161
* @param listener callback upon completion of the request
163162
*/
164163
public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message,
165-
@Nullable final Exception failure, Listener listener) {
164+
@Nullable final Exception failure, ActionListener<Void> listener) {
166165
assert primaryTerm > 0L : "primary term should be strictly positive";
167-
final FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale);
168-
final CompositeListener compositeListener = new CompositeListener(listener);
169-
final CompositeListener existingListener = remoteFailedShardsCache.putIfAbsent(shardEntry, compositeListener);
170-
if (existingListener == null) {
171-
sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, new Listener() {
172-
@Override
173-
public void onSuccess() {
174-
try {
175-
compositeListener.onSuccess();
176-
} finally {
177-
remoteFailedShardsCache.remove(shardEntry);
178-
}
179-
}
180-
@Override
181-
public void onFailure(Exception e) {
182-
try {
183-
compositeListener.onFailure(e);
184-
} finally {
185-
remoteFailedShardsCache.remove(shardEntry);
186-
}
187-
}
188-
});
189-
} else {
190-
existingListener.addListener(listener);
191-
}
166+
remoteFailedShardsDeduplicator.executeOnce(
167+
new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale), listener,
168+
(req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener));
192169
}
193170

194171
int remoteShardFailedCacheSize() {
195-
return remoteFailedShardsCache.size();
172+
return remoteFailedShardsDeduplicator.size();
196173
}
197174

198175
/**
199176
* Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
200177
*/
201178
public void localShardFailed(final ShardRouting shardRouting, final String message,
202-
@Nullable final Exception failure, Listener listener) {
179+
@Nullable final Exception failure, ActionListener<Void> listener) {
203180
localShardFailed(shardRouting, message, failure, listener, clusterService.state());
204181
}
205182

206183
/**
207184
* Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
208185
*/
209186
public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure,
210-
Listener listener, final ClusterState currentState) {
187+
ActionListener<Void> listener, final ClusterState currentState) {
211188
FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(),
212189
0L, message, failure, true);
213190
sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener);
214191
}
215192

216193
// visible for testing
217194
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer,
218-
TransportRequest request, Listener listener, Predicate<ClusterState> changePredicate) {
195+
TransportRequest request, ActionListener<Void> listener,
196+
Predicate<ClusterState> changePredicate) {
219197
observer.waitForNextChange(new ClusterStateObserver.Listener() {
220198
@Override
221199
public void onNewClusterState(ClusterState state) {
@@ -496,14 +474,14 @@ public int hashCode() {
496474
public void shardStarted(final ShardRouting shardRouting,
497475
final long primaryTerm,
498476
final String message,
499-
final Listener listener) {
477+
final ActionListener<Void> listener) {
500478
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
501479
}
502480

503481
public void shardStarted(final ShardRouting shardRouting,
504482
final long primaryTerm,
505483
final String message,
506-
final Listener listener,
484+
final ActionListener<Void> listener,
507485
final ClusterState currentState) {
508486
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
509487
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
@@ -669,97 +647,6 @@ public String toString() {
669647
}
670648
}
671649

672-
public interface Listener {
673-
674-
default void onSuccess() {
675-
}
676-
677-
/**
678-
* Notification for non-channel exceptions that are not handled
679-
* by {@link ShardStateAction}.
680-
*
681-
* The exceptions that are handled by {@link ShardStateAction}
682-
* are:
683-
* - {@link NotMasterException}
684-
* - {@link NodeDisconnectedException}
685-
* - {@link Discovery.FailedToCommitClusterStateException}
686-
*
687-
* Any other exception is communicated to the requester via
688-
* this notification.
689-
*
690-
* @param e the unexpected cause of the failure on the master
691-
*/
692-
default void onFailure(final Exception e) {
693-
}
694-
695-
}
696-
697-
/**
698-
* A composite listener that allows registering multiple listeners dynamically.
699-
*/
700-
static final class CompositeListener implements Listener {
701-
private boolean isNotified = false;
702-
private Exception failure = null;
703-
private final List<Listener> listeners = new ArrayList<>();
704-
705-
CompositeListener(Listener listener) {
706-
listeners.add(listener);
707-
}
708-
709-
void addListener(Listener listener) {
710-
final boolean ready;
711-
synchronized (this) {
712-
ready = this.isNotified;
713-
if (ready == false) {
714-
listeners.add(listener);
715-
}
716-
}
717-
if (ready) {
718-
if (failure != null) {
719-
listener.onFailure(failure);
720-
} else {
721-
listener.onSuccess();
722-
}
723-
}
724-
}
725-
726-
private void onCompleted(Exception failure) {
727-
synchronized (this) {
728-
this.failure = failure;
729-
this.isNotified = true;
730-
}
731-
RuntimeException firstException = null;
732-
for (Listener listener : listeners) {
733-
try {
734-
if (failure != null) {
735-
listener.onFailure(failure);
736-
} else {
737-
listener.onSuccess();
738-
}
739-
} catch (RuntimeException innerEx) {
740-
if (firstException == null) {
741-
firstException = innerEx;
742-
} else {
743-
firstException.addSuppressed(innerEx);
744-
}
745-
}
746-
}
747-
if (firstException != null) {
748-
throw firstException;
749-
}
750-
}
751-
752-
@Override
753-
public void onSuccess() {
754-
onCompleted(null);
755-
}
756-
757-
@Override
758-
public void onFailure(Exception failure) {
759-
onCompleted(failure);
760-
}
761-
}
762-
763650
public static class NoLongerPrimaryShardException extends ElasticsearchException {
764651

765652
public NoLongerPrimaryShardException(ShardId shardId, String msg) {

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
112112
private final ShardStateAction shardStateAction;
113113
private final NodeMappingRefreshAction nodeMappingRefreshAction;
114114

115-
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {
116-
};
115+
private static final ActionListener<Void> SHARD_STATE_ACTION_LISTENER = ActionListener.wrap(() -> {});
117116

118117
private final Settings settings;
119118
// a list of shards that failed during recovery
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.ConcurrentMap;
28+
import java.util.function.BiConsumer;
29+
30+
/**
31+
* Deduplicator for {@link TransportRequest}s that keeps track of {@link TransportRequest}s that should
32+
* not be sent in parallel.
33+
* @param <T> Transport Request Class
34+
*/
35+
public final class TransportRequestDeduplicator<T extends TransportRequest> {
36+
37+
private final ConcurrentMap<T, CompositeListener> requests = ConcurrentCollections.newConcurrentMap();
38+
39+
/**
40+
* Ensures a given request not executed multiple times when another equal request is already in-flight.
41+
* If the request is not yet known to the deduplicator it will invoke the passed callback with an {@link ActionListener}
42+
* that must be completed by the caller when the request completes. Once that listener is completed the request will be removed from
43+
* the deduplicator's internal state. If the request is already known to the deduplicator it will keep
44+
* track of the given listener and invoke it when the listener passed to the callback on first invocation is completed.
45+
* @param request Request to deduplicate
46+
* @param listener Listener to invoke on request completion
47+
* @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator
48+
*/
49+
public void executeOnce(T request, ActionListener<Void> listener, BiConsumer<T, ActionListener<Void>> callback) {
50+
ActionListener<Void> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
51+
if (completionListener != null) {
52+
callback.accept(request, completionListener);
53+
}
54+
}
55+
56+
public int size() {
57+
return requests.size();
58+
}
59+
60+
private final class CompositeListener implements ActionListener<Void> {
61+
62+
private final List<ActionListener<Void>> listeners = new ArrayList<>();
63+
64+
private final T request;
65+
66+
private boolean isNotified;
67+
private Exception failure;
68+
69+
CompositeListener(T request) {
70+
this.request = request;
71+
}
72+
73+
CompositeListener addListener(ActionListener<Void> listener) {
74+
synchronized (this) {
75+
if (this.isNotified == false) {
76+
listeners.add(listener);
77+
return listeners.size() == 1 ? this : null;
78+
}
79+
}
80+
if (failure != null) {
81+
listener.onFailure(failure);
82+
} else {
83+
listener.onResponse(null);
84+
}
85+
return null;
86+
}
87+
88+
private void onCompleted(Exception failure) {
89+
synchronized (this) {
90+
this.failure = failure;
91+
this.isNotified = true;
92+
}
93+
try {
94+
if (failure == null) {
95+
ActionListener.onResponse(listeners, null);
96+
} else {
97+
ActionListener.onFailure(listeners, failure);
98+
}
99+
} finally {
100+
requests.remove(request);
101+
}
102+
}
103+
104+
@Override
105+
public void onResponse(final Void aVoid) {
106+
onCompleted(null);
107+
}
108+
109+
@Override
110+
public void onFailure(Exception failure) {
111+
onCompleted(failure);
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)