Skip to content

Commit c6b0f08

Browse files
authored
Add safety phase to CoordinatorTests (#34241)
Today's CoordinatorTests have a limited amount of randomisation in how things are scheduled. However, to be fully confident in Zen2's liveness we require the system to stabilise after any permitted sequence of events. We can achieve this by running the system in a much more random fashion for a while, with much larger variation in when things are scheduled (simulating GC pressure and network disruption) and then continuing to assert that the system stabilises as we expect. When running randomly, we do not expect to make significant progress and merely verify that no safety property is violated. This change introduces the runRandomly() test method which implements this idea. It also fixes a handful of liveness bugs that this first version of runRandomly() exposed.
1 parent cbe1cf9 commit c6b0f08

File tree

10 files changed

+283
-55
lines changed

10 files changed

+283
-55
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.settings.Setting;
4040
import org.elasticsearch.common.settings.Settings;
4141
import org.elasticsearch.common.unit.TimeValue;
42+
import org.elasticsearch.common.util.concurrent.EsExecutors;
4243
import org.elasticsearch.common.util.concurrent.ListenableFuture;
4344
import org.elasticsearch.discovery.Discovery;
4445
import org.elasticsearch.discovery.DiscoverySettings;
@@ -184,6 +185,13 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
184185
synchronized (mutex) {
185186
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();
186187
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
188+
189+
if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) {
190+
// Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication
191+
// is already failed so there is no point in proceeding.
192+
throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest);
193+
}
194+
187195
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
188196
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
189197

@@ -438,8 +446,10 @@ public void invariant() {
438446
= currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState());
439447
lastPublishedState.nodes().forEach(lastPublishedNodes::add);
440448
assert lastPublishedNodes.remove(getLocalNode());
449+
assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers
450+
+ " [becomingMaster=" + becomingMaster + ", publicationInProgress=" + publicationInProgress() + "]";
451+
// TODO instead assert that knownFollowers is updated appropriately at the end of each publication
441452
}
442-
assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers;
443453
} else if (mode == Mode.FOLLOWER) {
444454
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
445455
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
@@ -604,11 +614,6 @@ protected void onCompletion(boolean committed) {
604614
@Override
605615
public void onResponse(Void ignore) {
606616
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
607-
assert coordinationState.get().getLastAcceptedTerm() == publishRequest.getAcceptedState().term()
608-
&& coordinationState.get().getLastAcceptedVersion() == publishRequest.getAcceptedState().version()
609-
: "onPossibleCompletion: term or version mismatch when publishing [" + this
610-
+ "]: current version is now [" + coordinationState.get().getLastAcceptedVersion()
611-
+ "] in term [" + coordinationState.get().getLastAcceptedTerm() + "]";
612617
assert committed;
613618

614619
// TODO: send to applier
@@ -628,7 +633,7 @@ public void onFailure(Exception e) {
628633
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
629634
publishListener.onFailure(exception);
630635
}
631-
}, transportService.getThreadPool().generic());
636+
}, EsExecutors.newDirectExecutorService());
632637
}
633638

634639
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

+32-20
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030
import org.elasticsearch.common.collect.Tuple;
3131
import org.elasticsearch.common.component.AbstractComponent;
3232
import org.elasticsearch.common.io.stream.StreamInput;
33+
import org.elasticsearch.common.settings.Setting;
3334
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.unit.TimeValue;
3436
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3537
import org.elasticsearch.threadpool.ThreadPool;
3638
import org.elasticsearch.threadpool.ThreadPool.Names;
3739
import org.elasticsearch.transport.TransportException;
40+
import org.elasticsearch.transport.TransportRequestOptions;
3841
import org.elasticsearch.transport.TransportResponse;
3942
import org.elasticsearch.transport.TransportResponse.Empty;
4043
import org.elasticsearch.transport.TransportResponseHandler;
@@ -56,9 +59,15 @@ public class JoinHelper extends AbstractComponent {
5659
public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
5760
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";
5861

62+
// the timeout for each join attempt
63+
public static final Setting<TimeValue> JOIN_TIMEOUT_SETTING =
64+
Setting.timeSetting("cluster.join.timeout",
65+
TimeValue.timeValueMillis(60000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
66+
5967
private final MasterService masterService;
6068
private final TransportService transportService;
6169
private final JoinTaskExecutor joinTaskExecutor;
70+
private final TimeValue joinTimeout;
6271

6372
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();
6473

@@ -68,6 +77,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master
6877
super(settings);
6978
this.masterService = masterService;
7079
this.transportService = transportService;
80+
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
7181
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
7282

7383
@Override
@@ -130,29 +140,31 @@ public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJo
130140
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
131141
if (pendingOutgoingJoins.add(dedupKey)) {
132142
logger.debug("attempting to join {} with {}", destination, joinRequest);
133-
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler<Empty>() {
134-
@Override
135-
public Empty read(StreamInput in) {
136-
return Empty.INSTANCE;
137-
}
143+
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
144+
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
145+
new TransportResponseHandler<Empty>() {
146+
@Override
147+
public Empty read(StreamInput in) {
148+
return Empty.INSTANCE;
149+
}
138150

139-
@Override
140-
public void handleResponse(Empty response) {
141-
pendingOutgoingJoins.remove(dedupKey);
142-
logger.debug("successfully joined {} with {}", destination, joinRequest);
143-
}
151+
@Override
152+
public void handleResponse(Empty response) {
153+
pendingOutgoingJoins.remove(dedupKey);
154+
logger.debug("successfully joined {} with {}", destination, joinRequest);
155+
}
144156

145-
@Override
146-
public void handleException(TransportException exp) {
147-
pendingOutgoingJoins.remove(dedupKey);
148-
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
149-
}
157+
@Override
158+
public void handleException(TransportException exp) {
159+
pendingOutgoingJoins.remove(dedupKey);
160+
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
161+
}
150162

151-
@Override
152-
public String executor() {
153-
return Names.SAME;
154-
}
155-
});
163+
@Override
164+
public String executor() {
165+
return Names.SAME;
166+
}
167+
});
156168
} else {
157169
logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
158170
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
3434
import org.elasticsearch.cluster.coordination.Coordinator;
3535
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
36+
import org.elasticsearch.cluster.coordination.JoinHelper;
3637
import org.elasticsearch.cluster.metadata.IndexGraveyard;
3738
import org.elasticsearch.cluster.metadata.MetaData;
3839
import org.elasticsearch.cluster.routing.OperationRouting;
@@ -445,10 +446,12 @@ public void apply(Settings value, Settings current, Settings previous) {
445446
IndexGraveyard.SETTING_MAX_TOMBSTONES,
446447
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
447448
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
449+
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
448450
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
449451
ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING,
450452
ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING,
451-
Coordinator.PUBLISH_TIMEOUT_SETTING
453+
Coordinator.PUBLISH_TIMEOUT_SETTING,
454+
JoinHelper.JOIN_TIMEOUT_SETTING
452455
)));
453456

454457
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(

server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ protected synchronized void done() {
8282

8383
private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
8484
try {
85-
executorService.submit(new Runnable() {
85+
executorService.execute(new Runnable() {
8686
@Override
8787
public void run() {
8888
try {

server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3737
import org.elasticsearch.threadpool.ThreadPool.Names;
3838
import org.elasticsearch.transport.TransportException;
39+
import org.elasticsearch.transport.TransportRequestOptions;
3940
import org.elasticsearch.transport.TransportResponseHandler;
4041
import org.elasticsearch.transport.TransportService;
4142

@@ -59,7 +60,12 @@ public abstract class PeerFinder extends AbstractComponent {
5960
Setting.timeSetting("discovery.find_peers_interval",
6061
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
6162

62-
private final TimeValue findPeersDelay;
63+
public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING =
64+
Setting.timeSetting("discovery.request_peers_timeout",
65+
TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
66+
67+
private final TimeValue findPeersInterval;
68+
private final TimeValue requestPeersTimeout;
6369

6470
private final Object mutex = new Object();
6571
private final TransportService transportService;
@@ -75,7 +81,8 @@ public abstract class PeerFinder extends AbstractComponent {
7581
public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
7682
ConfiguredHostsResolver configuredHostsResolver) {
7783
super(settings);
78-
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
84+
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
85+
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
7986
this.transportService = transportService;
8087
this.transportAddressConnector = transportAddressConnector;
8188
this.configuredHostsResolver = configuredHostsResolver;
@@ -241,7 +248,7 @@ private boolean handleWakeUp() {
241248
}
242249
});
243250

244-
transportService.getThreadPool().schedule(findPeersDelay, Names.GENERIC, new AbstractRunnable() {
251+
transportService.getThreadPool().schedule(findPeersInterval, Names.GENERIC, new AbstractRunnable() {
245252
@Override
246253
public boolean isForceExecution() {
247254
return true;
@@ -360,9 +367,11 @@ public void onFailure(Exception e) {
360367
});
361368
}
362369

363-
private void removePeer() {
370+
void removePeer() {
364371
final Peer removed = peersByAddress.remove(transportAddress);
365-
assert removed == Peer.this;
372+
// assert removed == Peer.this : removed + " != " + Peer.this;
373+
// ^ This assertion sometimes trips if we are deactivated and reactivated while a request is in flight.
374+
// TODO be more careful about avoiding multiple active Peer objects for each address
366375
}
367376

368377
private void requestPeers() {
@@ -380,6 +389,7 @@ private void requestPeers() {
380389

381390
transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME,
382391
new PeersRequest(getLocalNode(), knownNodes),
392+
TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(),
383393
new TransportResponseHandler<PeersResponse>() {
384394

385395
@Override

0 commit comments

Comments
 (0)