@@ -182,7 +182,7 @@ public void initClient() throws IOException {
182
182
assert hasXPack != null ;
183
183
assert nodeVersions != null ;
184
184
}
185
-
185
+
186
186
/**
187
187
* Helper class to check warnings in REST responses with sensitivity to versions
188
188
* used in the target cluster.
@@ -191,26 +191,26 @@ public static class VersionSensitiveWarningsHandler implements WarningsHandler {
191
191
Set <String > requiredSameVersionClusterWarnings = new HashSet <>();
192
192
Set <String > allowedWarnings = new HashSet <>();
193
193
final Set <Version > testNodeVersions ;
194
-
194
+
195
195
public VersionSensitiveWarningsHandler (Set <Version > nodeVersions ) {
196
196
this .testNodeVersions = nodeVersions ;
197
197
}
198
198
199
199
/**
200
200
* Adds to the set of warnings that are all required in responses if the cluster
201
- * is formed from nodes all running the exact same version as the client.
201
+ * is formed from nodes all running the exact same version as the client.
202
202
* @param requiredWarnings a set of required warnings
203
203
*/
204
204
public void current (String ... requiredWarnings ) {
205
205
requiredSameVersionClusterWarnings .addAll (Arrays .asList (requiredWarnings ));
206
206
}
207
207
208
208
/**
209
- * Adds to the set of warnings that are permissible (but not required) when running
209
+ * Adds to the set of warnings that are permissible (but not required) when running
210
210
* in mixed-version clusters or those that differ in version from the test client.
211
211
* @param allowedWarnings optional warnings that will be ignored if received
212
212
*/
213
- public void compatible (String ... allowedWarnings ) {
213
+ public void compatible (String ... allowedWarnings ) {
214
214
this .allowedWarnings .addAll (Arrays .asList (allowedWarnings ));
215
215
}
216
216
@@ -231,13 +231,13 @@ public boolean warningsShouldFailRequest(List<String> warnings) {
231
231
return false ;
232
232
}
233
233
}
234
-
234
+
235
235
private boolean isExclusivelyTargetingCurrentVersionCluster () {
236
236
assertFalse ("Node versions running in the cluster are missing" , testNodeVersions .isEmpty ());
237
- return testNodeVersions .size () == 1 &&
237
+ return testNodeVersions .size () == 1 &&
238
238
testNodeVersions .iterator ().next ().equals (Version .CURRENT );
239
- }
240
-
239
+ }
240
+
241
241
}
242
242
243
243
/**
@@ -250,14 +250,14 @@ private boolean isExclusivelyTargetingCurrentVersionCluster() {
250
250
public static RequestOptions expectWarnings (String ... warnings ) {
251
251
return expectVersionSpecificWarnings (consumer -> consumer .current (warnings ));
252
252
}
253
-
253
+
254
254
public static RequestOptions expectVersionSpecificWarnings (Consumer <VersionSensitiveWarningsHandler > expectationsSetter ) {
255
255
Builder builder = RequestOptions .DEFAULT .toBuilder ();
256
256
VersionSensitiveWarningsHandler warningsHandler = new VersionSensitiveWarningsHandler (nodeVersions );
257
257
expectationsSetter .accept (warningsHandler );
258
258
builder .setWarningsHandler (warningsHandler );
259
259
return builder .build ();
260
- }
260
+ }
261
261
262
262
/**
263
263
* Construct an HttpHost from the given host and port
@@ -442,7 +442,7 @@ private void wipeCluster() throws Exception {
442
442
// Cleanup rollup before deleting indices. A rollup job might have bulks in-flight,
443
443
// so we need to fully shut them down first otherwise a job might stall waiting
444
444
// for a bulk to finish against a non-existing index (and then fail tests)
445
- //
445
+ //
446
446
// Rollups were introduced in 6.3.0 so any cluster that contains older
447
447
// nodes won't be able to do *anything* with rollups, including cleanup.
448
448
if (hasXPack && nodeVersions .first ().onOrAfter (Version .V_6_3_0 )
@@ -777,17 +777,31 @@ protected static void assertOK(Response response) {
777
777
* @param index index to test for
778
778
**/
779
779
protected static void ensureGreen (String index ) throws IOException {
780
- Request request = new Request ("GET" , "/_cluster/health/" + index );
781
- request .addParameter ("wait_for_status" , "green" );
782
- request .addParameter ("wait_for_no_relocating_shards" , "true" );
783
- request .addParameter ("timeout" , "70s" );
784
- request .addParameter ("level" , "shards" );
780
+ ensureHealth (index , (request ) -> {
781
+ request .addParameter ("wait_for_status" , "green" );
782
+ request .addParameter ("wait_for_no_relocating_shards" , "true" );
783
+ request .addParameter ("timeout" , "70s" );
784
+ request .addParameter ("level" , "shards" );
785
+ });
786
+ }
787
+
788
+ protected static void ensureHealth (Consumer <Request > requestConsumer ) throws IOException {
789
+ ensureHealth ("" , requestConsumer );
790
+ }
791
+
792
+ protected static void ensureHealth (String index , Consumer <Request > requestConsumer ) throws IOException {
793
+ ensureHealth (client (), index , requestConsumer );
794
+ }
795
+
796
+ protected static void ensureHealth (RestClient client , String index , Consumer <Request > requestConsumer ) throws IOException {
797
+ Request request = new Request ("GET" , "/_cluster/health" + (index .trim ().isEmpty () ? "" : "/" + index ));
798
+ requestConsumer .accept (request );
785
799
try {
786
- client () .performRequest (request );
800
+ client .performRequest (request );
787
801
} catch (ResponseException e ) {
788
802
if (e .getResponse ().getStatusLine ().getStatusCode () == HttpStatus .SC_REQUEST_TIMEOUT ) {
789
803
try {
790
- final Response clusterStateResponse = client () .performRequest (new Request ("GET" , "/_cluster/state" ));
804
+ final Response clusterStateResponse = client .performRequest (new Request ("GET" , "/_cluster/state?pretty " ));
791
805
fail ("timed out waiting for green state for index [" + index + "] " +
792
806
"cluster state [" + EntityUtils .toString (clusterStateResponse .getEntity ()) + "]" );
793
807
} catch (Exception inner ) {
0 commit comments