Skip to content

Commit 41ef1e6

Browse files
authored
Cluster health should await events plus other things (#44348)
Today a cluster health request can wait on a selection of conditions, but it does not guarantee that all of these conditions have ever held simultaneously when it returns. More specifically, if a request sets `waitForEvents()` along with some other conditions then Elasticsearch will respond when the master has processed all the expected pending tasks _and then_ the cluster satisfied the other conditions, but it may be that at the time the cluster satisfied the other conditions there were undesired pending tasks again. This commit adjusts the behaviour of `waitForEvents()` to wait for all the required events to be processed and then, if the resulting cluster state does not satisfy the other conditions, it will wait until there is a cluster state that does and then retry the wait-for-events too.
1 parent 156047f commit 41ef1e6

File tree

2 files changed

+170
-108
lines changed

2 files changed

+170
-108
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 125 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.action.admin.cluster.health;
2121

22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
2224
import org.apache.logging.log4j.message.ParameterizedMessage;
2325
import org.elasticsearch.action.ActionListener;
2426
import org.elasticsearch.action.support.ActionFilters;
@@ -44,10 +46,12 @@
4446
import org.elasticsearch.threadpool.ThreadPool;
4547
import org.elasticsearch.transport.TransportService;
4648

49+
import java.util.function.Consumer;
4750
import java.util.function.Predicate;
4851

49-
public class TransportClusterHealthAction
50-
extends StreamableTransportMasterNodeReadAction<ClusterHealthRequest, ClusterHealthResponse> {
52+
public class TransportClusterHealthAction extends StreamableTransportMasterNodeReadAction<ClusterHealthRequest, ClusterHealthResponse> {
53+
54+
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);
5155

5256
private final GatewayAllocator gatewayAllocator;
5357

@@ -78,129 +82,147 @@ protected ClusterHealthResponse newResponse() {
7882
}
7983

8084
@Override
81-
protected void masterOperation(Task task, final ClusterHealthRequest request, final ClusterState unusedState,
85+
protected void masterOperation(final Task task,
86+
final ClusterHealthRequest request,
87+
final ClusterState unusedState,
8288
final ActionListener<ClusterHealthResponse> listener) {
89+
90+
final int waitCount = getWaitCount(request);
91+
8392
if (request.waitForEvents() != null) {
84-
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
85-
if (request.local()) {
86-
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
87-
new LocalClusterUpdateTask(request.waitForEvents()) {
88-
@Override
89-
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
90-
return unchanged();
91-
}
92-
93-
@Override
94-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
95-
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
96-
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
97-
request.timeout(newTimeout);
98-
executeHealth(request, listener);
99-
}
100-
101-
@Override
102-
public void onFailure(String source, Exception e) {
103-
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
104-
listener.onFailure(e);
105-
}
106-
});
107-
} else {
108-
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
109-
new ClusterStateUpdateTask(request.waitForEvents()) {
110-
@Override
111-
public ClusterState execute(ClusterState currentState) {
112-
return currentState;
113-
}
114-
115-
@Override
116-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
117-
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
118-
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
119-
request.timeout(newTimeout);
120-
executeHealth(request, listener);
121-
}
122-
123-
@Override
124-
public void onNoLongerMaster(String source) {
125-
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.",
126-
request.waitForEvents());
127-
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
128-
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
129-
}
130-
131-
@Override
132-
public void onFailure(String source, Exception e) {
133-
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
134-
listener.onFailure(e);
135-
}
136-
});
137-
}
93+
waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis());
13894
} else {
139-
executeHealth(request, listener);
95+
executeHealth(request, clusterService.state(), listener, waitCount,
96+
clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, false)));
14097
}
98+
}
99+
100+
private void waitForEventsAndExecuteHealth(final ClusterHealthRequest request,
101+
final ActionListener<ClusterHealthResponse> listener,
102+
final int waitCount,
103+
final long endTimeRelativeMillis) {
104+
assert request.waitForEvents() != null;
105+
if (request.local()) {
106+
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
107+
new LocalClusterUpdateTask(request.waitForEvents()) {
108+
@Override
109+
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
110+
return unchanged();
111+
}
112+
113+
@Override
114+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
115+
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
116+
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
117+
request.timeout(newTimeout);
118+
executeHealth(request, clusterService.state(), listener, waitCount,
119+
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis));
120+
}
121+
122+
@Override
123+
public void onFailure(String source, Exception e) {
124+
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
125+
listener.onFailure(e);
126+
}
127+
});
128+
} else {
129+
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
130+
new ClusterStateUpdateTask(request.waitForEvents()) {
131+
@Override
132+
public ClusterState execute(ClusterState currentState) {
133+
return currentState;
134+
}
135+
136+
@Override
137+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
138+
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
139+
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
140+
request.timeout(newTimeout);
141+
executeHealth(request, newState, listener, waitCount,
142+
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis));
143+
}
144+
145+
@Override
146+
public void onNoLongerMaster(String source) {
147+
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.",
148+
request.waitForEvents());
149+
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
150+
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
151+
}
152+
153+
@Override
154+
public void onFailure(String source, Exception e) {
155+
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
156+
listener.onFailure(e);
157+
}
158+
});
159+
}
160+
}
161+
162+
private void executeHealth(final ClusterHealthRequest request,
163+
final ClusterState currentState,
164+
final ActionListener<ClusterHealthResponse> listener,
165+
final int waitCount,
166+
final Consumer<ClusterState> onNewClusterStateAfterDelay) {
167+
168+
if (request.timeout().millis() == 0) {
169+
listener.onResponse(getResponse(request, currentState, waitCount, true));
170+
return;
171+
}
172+
173+
final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
174+
if (validationPredicate.test(currentState)) {
175+
listener.onResponse(getResponse(request, currentState, waitCount, false));
176+
} else {
177+
final ClusterStateObserver observer
178+
= new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
179+
final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() {
180+
@Override
181+
public void onNewClusterState(ClusterState newState) {
182+
onNewClusterStateAfterDelay.accept(newState);
183+
}
184+
185+
@Override
186+
public void onClusterServiceClose() {
187+
listener.onFailure(new IllegalStateException("ClusterService was close during health call"));
188+
}
141189

190+
@Override
191+
public void onTimeout(TimeValue timeout) {
192+
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, true));
193+
}
194+
};
195+
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
196+
}
142197
}
143198

144-
private void executeHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
145-
int waitFor = 0;
199+
private static int getWaitCount(ClusterHealthRequest request) {
200+
int waitCount = 0;
146201
if (request.waitForStatus() != null) {
147-
waitFor++;
202+
waitCount++;
148203
}
149204
if (request.waitForNoRelocatingShards()) {
150-
waitFor++;
205+
waitCount++;
151206
}
152207
if (request.waitForNoInitializingShards()) {
153-
waitFor++;
208+
waitCount++;
154209
}
155210
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
156-
waitFor++;
211+
waitCount++;
157212
}
158213
if (request.waitForNodes().isEmpty() == false) {
159-
waitFor++;
214+
waitCount++;
160215
}
161216
if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data
162-
waitFor++;
163-
}
164-
165-
final ClusterState state = clusterService.state();
166-
final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService,
167-
null, logger, threadPool.getThreadContext());
168-
if (request.timeout().millis() == 0) {
169-
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));
170-
return;
171-
}
172-
final int concreteWaitFor = waitFor;
173-
final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, concreteWaitFor);
174-
175-
final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() {
176-
@Override
177-
public void onNewClusterState(ClusterState clusterState) {
178-
listener.onResponse(getResponse(request, clusterState, concreteWaitFor, false));
179-
}
180-
181-
@Override
182-
public void onClusterServiceClose() {
183-
listener.onFailure(new IllegalStateException("ClusterService was close during health call"));
184-
}
185-
186-
@Override
187-
public void onTimeout(TimeValue timeout) {
188-
final ClusterHealthResponse response = getResponse(request, observer.setAndGetObservedState(), concreteWaitFor, true);
189-
listener.onResponse(response);
190-
}
191-
};
192-
if (validationPredicate.test(state)) {
193-
stateListener.onNewClusterState(state);
194-
} else {
195-
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
217+
waitCount++;
196218
}
219+
return waitCount;
197220
}
198221

199-
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
222+
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitCount) {
200223
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
201224
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
202-
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
203-
return readyCounter == waitFor;
225+
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
204226
}
205227

206228
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
@@ -220,7 +242,7 @@ private ClusterHealthResponse getResponse(final ClusterHealthRequest request, Cl
220242
}
221243

222244
static int prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response,
223-
final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) {
245+
final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) {
224246
int waitForCounter = 0;
225247
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
226248
waitForCounter++;

server/src/test/java/org/elasticsearch/cluster/ClusterHealthIT.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
package org.elasticsearch.cluster;
2121

22+
import org.elasticsearch.action.ActionFuture;
2223
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2324
import org.elasticsearch.action.support.IndicesOptions;
2425
import org.elasticsearch.cluster.health.ClusterHealthStatus;
2526
import org.elasticsearch.cluster.metadata.IndexMetaData;
27+
import org.elasticsearch.cluster.service.ClusterService;
2628
import org.elasticsearch.common.Priority;
2729
import org.elasticsearch.common.settings.Settings;
2830
import org.elasticsearch.test.ESIntegTestCase;
@@ -38,14 +40,16 @@ public class ClusterHealthIT extends ESIntegTestCase {
3840

3941
public void testSimpleLocalHealth() {
4042
createIndex("test");
41-
ensureGreen(); // master should thing it's green now.
43+
ensureGreen(); // master should think it's green now.
4244

43-
for (String node : internalCluster().getNodeNames()) {
45+
for (final String node : internalCluster().getNodeNames()) {
4446
// a very high time out, which should never fire due to the local flag
45-
ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true)
47+
logger.info("--> getting cluster health on [{}]", node);
48+
final ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true)
4649
.setWaitForEvents(Priority.LANGUID).setTimeout("30s").get("10s");
47-
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
48-
assertThat(health.isTimedOut(), equalTo(false));
50+
logger.info("--> got cluster health on [{}]", node);
51+
assertFalse("timed out on " + node, health.isTimedOut());
52+
assertThat("health status on " + node, health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
4953
}
5054
}
5155

@@ -254,4 +258,40 @@ public void run() {
254258
clusterHealthThread.join();
255259
}
256260

261+
public void testWaitForEventsRetriesIfOtherConditionsNotMet() throws Exception {
262+
final ActionFuture<ClusterHealthResponse> healthResponseFuture
263+
= client().admin().cluster().prepareHealth("index").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute();
264+
265+
final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
266+
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
267+
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
268+
@Override
269+
public ClusterState execute(ClusterState currentState) {
270+
return currentState;
271+
}
272+
273+
@Override
274+
public void onFailure(String source, Exception e) {
275+
throw new AssertionError(source, e);
276+
}
277+
278+
@Override
279+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
280+
if (keepSubmittingTasks.get()) {
281+
clusterService.submitStateUpdateTask("looping task", this);
282+
}
283+
}
284+
});
285+
286+
createIndex("index");
287+
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());
288+
289+
// at this point the original health response should not have returned: there was never a point where the index was green AND
290+
// the master had processed all pending tasks above LANGUID priority.
291+
assertFalse(healthResponseFuture.isDone());
292+
293+
keepSubmittingTasks.set(false);
294+
assertFalse(healthResponseFuture.get().isTimedOut());
295+
}
296+
257297
}

0 commit comments

Comments
 (0)