Skip to content

Commit 479a2eb

Browse files
Van0SSjavanna
authored andcommitted
REST high-level client: add Cluster Health API (#29331)
Relates to #27205
1 parent 8953adc commit 479a2eb

File tree

21 files changed

+1587
-75
lines changed

21 files changed

+1587
-75
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121

2222
import org.apache.http.Header;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
25+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2426
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2527
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
28+
import org.elasticsearch.rest.RestStatus;
2629

2730
import java.io.IOException;
2831

2932
import static java.util.Collections.emptySet;
33+
import static java.util.Collections.singleton;
3034

3135
/**
3236
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Cluster API.
@@ -95,4 +99,34 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR
9599
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings,
96100
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
97101
}
102+
103+
/**
104+
* Get cluster health using the Cluster Health API.
105+
* See
106+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
107+
* <p>
108+
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
109+
* @param healthRequest the request
110+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
111+
* @return the response
112+
* @throws IOException in case there is a problem sending the request or parsing back the response
113+
*/
114+
public ClusterHealthResponse health(ClusterHealthRequest healthRequest, RequestOptions options) throws IOException {
115+
return restHighLevelClient.performRequestAndParseEntity(healthRequest, RequestConverters::clusterHealth, options,
116+
ClusterHealthResponse::fromXContent, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
117+
}
118+
119+
/**
120+
* Asynchronously get cluster health using the Cluster Health API.
121+
* See
122+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
123+
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
124+
* @param healthRequest the request
125+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
126+
* @param listener the listener to be notified upon request completion
127+
*/
128+
public void healthAsync(ClusterHealthRequest healthRequest, RequestOptions options, ActionListener<ClusterHealthResponse> listener) {
129+
restHighLevelClient.performRequestAsyncAndParseEntity(healthRequest, RequestConverters::clusterHealth, options,
130+
ClusterHealthResponse::fromXContent, listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
131+
}
98132
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.http.entity.ContentType;
3030
import org.apache.lucene.util.BytesRef;
3131
import org.elasticsearch.action.DocWriteRequest;
32+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
3233
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
3334
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
3435
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
@@ -74,7 +75,9 @@
7475
import org.elasticsearch.action.support.IndicesOptions;
7576
import org.elasticsearch.action.support.WriteRequest;
7677
import org.elasticsearch.action.update.UpdateRequest;
78+
import org.elasticsearch.cluster.health.ClusterHealthStatus;
7779
import org.elasticsearch.common.Nullable;
80+
import org.elasticsearch.common.Priority;
7881
import org.elasticsearch.common.Strings;
7982
import org.elasticsearch.common.SuppressForbidden;
8083
import org.elasticsearch.common.bytes.BytesReference;
@@ -717,6 +720,28 @@ static Request listTasks(ListTasksRequest listTaskRequest) {
717720
return request;
718721
}
719722

723+
static Request clusterHealth(ClusterHealthRequest healthRequest) {
724+
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
725+
String endpoint = new EndpointBuilder()
726+
.addPathPartAsIs("_cluster/health")
727+
.addCommaSeparatedPathParts(indices)
728+
.build();
729+
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
730+
731+
new Params(request)
732+
.withWaitForStatus(healthRequest.waitForStatus())
733+
.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards())
734+
.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards())
735+
.withWaitForActiveShards(healthRequest.waitForActiveShards(), ActiveShardCount.NONE)
736+
.withWaitForNodes(healthRequest.waitForNodes())
737+
.withWaitForEvents(healthRequest.waitForEvents())
738+
.withTimeout(healthRequest.timeout())
739+
.withMasterTimeout(healthRequest.masterNodeTimeout())
740+
.withLocal(healthRequest.local())
741+
.withLevel(healthRequest.level());
742+
return request;
743+
}
744+
720745
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
721746
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
722747
.addPathPart(rolloverRequest.getNewIndexName()).build();
@@ -1146,6 +1171,42 @@ Params withVerify(boolean verify) {
11461171
}
11471172
return this;
11481173
}
1174+
1175+
Params withWaitForStatus(ClusterHealthStatus status) {
1176+
if (status != null) {
1177+
return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT));
1178+
}
1179+
return this;
1180+
}
1181+
1182+
Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) {
1183+
if (waitNoRelocatingShards) {
1184+
return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString());
1185+
}
1186+
return this;
1187+
}
1188+
1189+
Params withWaitForNoInitializingShards(boolean waitNoInitShards) {
1190+
if (waitNoInitShards) {
1191+
return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString());
1192+
}
1193+
return this;
1194+
}
1195+
1196+
Params withWaitForNodes(String waitForNodes) {
1197+
return putParam("wait_for_nodes", waitForNodes);
1198+
}
1199+
1200+
Params withLevel(ClusterHealthRequest.Level level) {
1201+
return putParam("level", level.name().toLowerCase(Locale.ROOT));
1202+
}
1203+
1204+
Params withWaitForEvents(Priority waitForEvents) {
1205+
if (waitForEvents != null) {
1206+
return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT));
1207+
}
1208+
return this;
1209+
}
11491210
}
11501211

11511212
/**

client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@
2020
package org.elasticsearch.client;
2121

2222
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
24+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2325
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2426
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
27+
import org.elasticsearch.cluster.health.ClusterHealthStatus;
28+
import org.elasticsearch.cluster.health.ClusterIndexHealth;
29+
import org.elasticsearch.cluster.health.ClusterShardHealth;
2530
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
2631
import org.elasticsearch.common.settings.Settings;
2732
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -34,6 +39,7 @@
3439
import java.util.HashMap;
3540
import java.util.Map;
3641

42+
import static java.util.Collections.emptyMap;
3743
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3844
import static org.hamcrest.Matchers.equalTo;
3945
import static org.hamcrest.Matchers.notNullValue;
@@ -108,4 +114,136 @@ public void testClusterUpdateSettingNonExistent() {
108114
assertThat(exception.getMessage(), equalTo(
109115
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
110116
}
117+
118+
public void testClusterHealthGreen() throws IOException {
119+
ClusterHealthRequest request = new ClusterHealthRequest();
120+
request.timeout("5s");
121+
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
122+
123+
assertThat(response, notNullValue());
124+
assertThat(response.isTimedOut(), equalTo(false));
125+
assertThat(response.status(), equalTo(RestStatus.OK));
126+
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
127+
assertNoIndices(response);
128+
}
129+
130+
public void testClusterHealthYellowClusterLevel() throws IOException {
131+
createIndex("index", Settings.EMPTY);
132+
createIndex("index2", Settings.EMPTY);
133+
ClusterHealthRequest request = new ClusterHealthRequest();
134+
request.timeout("5s");
135+
request.level(ClusterHealthRequest.Level.CLUSTER);
136+
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
137+
138+
assertYellowShards(response);
139+
assertThat(response.getIndices().size(), equalTo(0));
140+
}
141+
142+
public void testClusterHealthYellowIndicesLevel() throws IOException {
143+
createIndex("index", Settings.EMPTY);
144+
createIndex("index2", Settings.EMPTY);
145+
ClusterHealthRequest request = new ClusterHealthRequest();
146+
request.timeout("5s");
147+
request.level(ClusterHealthRequest.Level.INDICES);
148+
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
149+
150+
assertYellowShards(response);
151+
assertThat(response.getIndices().size(), equalTo(2));
152+
for (Map.Entry<String, ClusterIndexHealth> entry : response.getIndices().entrySet()) {
153+
assertYellowIndex(entry.getKey(), entry.getValue(), true);
154+
}
155+
}
156+
157+
private static void assertYellowShards(ClusterHealthResponse response) {
158+
assertThat(response, notNullValue());
159+
assertThat(response.isTimedOut(), equalTo(false));
160+
assertThat(response.status(), equalTo(RestStatus.OK));
161+
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
162+
assertThat(response.getActivePrimaryShards(), equalTo(10));
163+
assertThat(response.getNumberOfDataNodes(), equalTo(1));
164+
assertThat(response.getNumberOfNodes(), equalTo(1));
165+
assertThat(response.getActiveShards(), equalTo(10));
166+
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
167+
assertThat(response.getInitializingShards(), equalTo(0));
168+
assertThat(response.getUnassignedShards(), equalTo(10));
169+
assertThat(response.getActiveShardsPercent(), equalTo(50d));
170+
}
171+
172+
public void testClusterHealthYellowSpecificIndex() throws IOException {
173+
createIndex("index", Settings.EMPTY);
174+
createIndex("index2", Settings.EMPTY);
175+
ClusterHealthRequest request = new ClusterHealthRequest("index");
176+
request.timeout("5s");
177+
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
178+
179+
assertThat(response, notNullValue());
180+
assertThat(response.isTimedOut(), equalTo(false));
181+
assertThat(response.status(), equalTo(RestStatus.OK));
182+
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
183+
assertThat(response.getActivePrimaryShards(), equalTo(5));
184+
assertThat(response.getNumberOfDataNodes(), equalTo(1));
185+
assertThat(response.getNumberOfNodes(), equalTo(1));
186+
assertThat(response.getActiveShards(), equalTo(5));
187+
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
188+
assertThat(response.getInitializingShards(), equalTo(0));
189+
assertThat(response.getUnassignedShards(), equalTo(5));
190+
assertThat(response.getActiveShardsPercent(), equalTo(50d));
191+
assertThat(response.getIndices().size(), equalTo(1));
192+
Map.Entry<String, ClusterIndexHealth> index = response.getIndices().entrySet().iterator().next();
193+
assertYellowIndex(index.getKey(), index.getValue(), false);
194+
}
195+
196+
private static void assertYellowIndex(String indexName, ClusterIndexHealth indexHealth, boolean emptyShards) {
197+
assertThat(indexHealth, notNullValue());
198+
assertThat(indexHealth.getIndex(),equalTo(indexName));
199+
assertThat(indexHealth.getActivePrimaryShards(),equalTo(5));
200+
assertThat(indexHealth.getActiveShards(),equalTo(5));
201+
assertThat(indexHealth.getNumberOfReplicas(),equalTo(1));
202+
assertThat(indexHealth.getInitializingShards(),equalTo(0));
203+
assertThat(indexHealth.getUnassignedShards(),equalTo(5));
204+
assertThat(indexHealth.getRelocatingShards(),equalTo(0));
205+
assertThat(indexHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
206+
if (emptyShards) {
207+
assertThat(indexHealth.getShards().size(), equalTo(0));
208+
} else {
209+
assertThat(indexHealth.getShards().size(), equalTo(5));
210+
for (Map.Entry<Integer, ClusterShardHealth> entry : indexHealth.getShards().entrySet()) {
211+
assertYellowShard(entry.getKey(), entry.getValue());
212+
}
213+
}
214+
}
215+
216+
private static void assertYellowShard(int shardId, ClusterShardHealth shardHealth) {
217+
assertThat(shardHealth, notNullValue());
218+
assertThat(shardHealth.getShardId(), equalTo(shardId));
219+
assertThat(shardHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
220+
assertThat(shardHealth.getActiveShards(), equalTo(1));
221+
assertThat(shardHealth.getInitializingShards(), equalTo(0));
222+
assertThat(shardHealth.getUnassignedShards(), equalTo(1));
223+
assertThat(shardHealth.getRelocatingShards(), equalTo(0));
224+
}
225+
226+
public void testClusterHealthNotFoundIndex() throws IOException {
227+
ClusterHealthRequest request = new ClusterHealthRequest("notexisted-index");
228+
request.timeout("5s");
229+
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
230+
231+
assertThat(response, notNullValue());
232+
assertThat(response.isTimedOut(), equalTo(true));
233+
assertThat(response.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
234+
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
235+
assertNoIndices(response);
236+
}
237+
238+
private static void assertNoIndices(ClusterHealthResponse response) {
239+
assertThat(response.getIndices(), equalTo(emptyMap()));
240+
assertThat(response.getActivePrimaryShards(), equalTo(0));
241+
assertThat(response.getNumberOfDataNodes(), equalTo(1));
242+
assertThat(response.getNumberOfNodes(), equalTo(1));
243+
assertThat(response.getActiveShards(), equalTo(0));
244+
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
245+
assertThat(response.getInitializingShards(), equalTo(0));
246+
assertThat(response.getUnassignedShards(), equalTo(0));
247+
assertThat(response.getActiveShardsPercent(), equalTo(100d));
248+
}
111249
}

0 commit comments

Comments
 (0)