19
19
package org .elasticsearch .backwards ;
20
20
21
21
import org .apache .http .HttpHost ;
22
- import org .apache .http .entity .ContentType ;
23
- import org .apache .http .entity .StringEntity ;
24
22
import org .elasticsearch .Version ;
23
+ import org .elasticsearch .client .Request ;
25
24
import org .elasticsearch .client .Response ;
26
25
import org .elasticsearch .client .RestClient ;
27
26
import org .elasticsearch .cluster .metadata .IndexMetaData ;
34
33
35
34
import java .io .IOException ;
36
35
import java .util .ArrayList ;
37
- import java .util .Collections ;
38
36
import java .util .HashMap ;
39
37
import java .util .List ;
40
38
import java .util .Map ;
41
39
import java .util .stream .Collectors ;
42
40
43
- import static com .carrotsearch .randomizedtesting .RandomizedTest .randomAsciiOfLength ;
44
- import static java .util .Collections .emptyMap ;
45
- import static java .util .Collections .singletonMap ;
46
41
import static org .hamcrest .Matchers .equalTo ;
47
- import static org .hamcrest .Matchers .not ;
48
42
49
43
public class IndexingIT extends ESRestTestCase {
50
44
51
45
private int indexDocs (String index , final int idStart , final int numDocs ) throws IOException {
52
46
for (int i = 0 ; i < numDocs ; i ++) {
53
47
final int id = idStart + i ;
54
- assertOK (client ().performRequest ("PUT" , index + "/test/" + id , emptyMap (),
55
- new StringEntity ("{\" test\" : \" test_" + randomAsciiOfLength (2 ) + "\" }" , ContentType .APPLICATION_JSON )));
48
+ Request request = new Request ("PUT" , index + "/test/" + id );
49
+ request .setJsonEntity ("{\" test\" : \" test_" + randomAlphaOfLength (2 ) + "\" }" );
50
+ assertOK (client ().performRequest (request ));
56
51
}
57
52
return numDocs ;
58
53
}
@@ -105,7 +100,7 @@ public void testIndexVersionPropagation() throws Exception {
105
100
logger .info ("allowing shards on all nodes" );
106
101
updateIndexSettings (index , Settings .builder ().putNull ("index.routing.allocation.include._name" ));
107
102
ensureGreen (index );
108
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
103
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
109
104
List <Shard > shards = buildShards (index , nodes , newNodeClient );
110
105
Shard primary = buildShards (index , nodes , newNodeClient ).stream ().filter (Shard ::isPrimary ).findFirst ().get ();
111
106
logger .info ("primary resolved to: " + primary .getNode ().getNodeName ());
@@ -117,7 +112,7 @@ public void testIndexVersionPropagation() throws Exception {
117
112
nUpdates = randomIntBetween (minUpdates , maxUpdates );
118
113
logger .info ("indexing docs with [{}] concurrent updates after allowing shards on all nodes" , nUpdates );
119
114
final int finalVersionForDoc2 = indexDocWithConcurrentUpdates (index , 2 , nUpdates );
120
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
115
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
121
116
shards = buildShards (index , nodes , newNodeClient );
122
117
primary = shards .stream ().filter (Shard ::isPrimary ).findFirst ().get ();
123
118
logger .info ("primary resolved to: " + primary .getNode ().getNodeName ());
@@ -133,7 +128,7 @@ public void testIndexVersionPropagation() throws Exception {
133
128
nUpdates = randomIntBetween (minUpdates , maxUpdates );
134
129
logger .info ("indexing docs with [{}] concurrent updates after moving primary" , nUpdates );
135
130
final int finalVersionForDoc3 = indexDocWithConcurrentUpdates (index , 3 , nUpdates );
136
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
131
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
137
132
shards = buildShards (index , nodes , newNodeClient );
138
133
for (Shard shard : shards ) {
139
134
assertVersion (index , 3 , "_only_nodes:" + shard .getNode ().getNodeName (), finalVersionForDoc3 );
@@ -146,7 +141,7 @@ public void testIndexVersionPropagation() throws Exception {
146
141
nUpdates = randomIntBetween (minUpdates , maxUpdates );
147
142
logger .info ("indexing doc with [{}] concurrent updates after setting number of replicas to 0" , nUpdates );
148
143
final int finalVersionForDoc4 = indexDocWithConcurrentUpdates (index , 4 , nUpdates );
149
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
144
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
150
145
shards = buildShards (index , nodes , newNodeClient );
151
146
for (Shard shard : shards ) {
152
147
assertVersion (index , 4 , "_only_nodes:" + shard .getNode ().getNodeName (), finalVersionForDoc4 );
@@ -159,7 +154,7 @@ public void testIndexVersionPropagation() throws Exception {
159
154
nUpdates = randomIntBetween (minUpdates , maxUpdates );
160
155
logger .info ("indexing doc with [{}] concurrent updates after setting number of replicas to 1" , nUpdates );
161
156
final int finalVersionForDoc5 = indexDocWithConcurrentUpdates (index , 5 , nUpdates );
162
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
157
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
163
158
shards = buildShards (index , nodes , newNodeClient );
164
159
for (Shard shard : shards ) {
165
160
assertVersion (index , 5 , "_only_nodes:" + shard .getNode ().getNodeName (), finalVersionForDoc5 );
@@ -191,7 +186,7 @@ public void testSeqNoCheckpoints() throws Exception {
191
186
logger .info ("allowing shards on all nodes" );
192
187
updateIndexSettings (index , Settings .builder ().putNull ("index.routing.allocation.include._name" ));
193
188
ensureGreen (index );
194
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
189
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
195
190
for (final String bwcName : bwcNamesList ) {
196
191
assertCount (index , "_only_nodes:" + bwcName , numDocs );
197
192
}
@@ -222,7 +217,7 @@ public void testSeqNoCheckpoints() throws Exception {
222
217
logger .info ("setting number of replicas to 1" );
223
218
updateIndexSettings (index , Settings .builder ().put ("index.number_of_replicas" , 1 ));
224
219
ensureGreen (index );
225
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
220
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
226
221
227
222
for (Shard shard : buildShards (index , nodes , newNodeClient )) {
228
223
assertCount (index , "_only_nodes:" + shard .node .nodeName , numDocs );
@@ -237,20 +232,18 @@ public void testUpdateSnapshotStatus() throws Exception {
237
232
logger .info ("cluster discovered: {}" , nodes .toString ());
238
233
239
234
// Create the repository before taking the snapshot.
240
- String repoConfig = Strings
235
+ Request request = new Request ("PUT" , "/_snapshot/repo" );
236
+ request .setJsonEntity (Strings
241
237
.toString (JsonXContent .contentBuilder ()
242
238
.startObject ()
243
- .field ("type" , "fs" )
244
- .startObject ("settings" )
245
- .field ("compress" , randomBoolean ())
246
- .field ("location" , System .getProperty ("tests.path.repo" ))
247
- .endObject ()
248
- .endObject ());
249
-
250
- assertOK (
251
- client ().performRequest ("PUT" , "/_snapshot/repo" , emptyMap (),
252
- new StringEntity (repoConfig , ContentType .APPLICATION_JSON ))
253
- );
239
+ .field ("type" , "fs" )
240
+ .startObject ("settings" )
241
+ .field ("compress" , randomBoolean ())
242
+ .field ("location" , System .getProperty ("tests.path.repo" ))
243
+ .endObject ()
244
+ .endObject ()));
245
+
246
+ assertOK (client ().performRequest (request ));
254
247
255
248
String bwcNames = nodes .getBWCNodes ().stream ().map (Node ::getNodeName ).collect (Collectors .joining ("," ));
256
249
@@ -264,34 +257,36 @@ public void testUpdateSnapshotStatus() throws Exception {
264
257
createIndex (index , settings .build ());
265
258
indexDocs (index , 0 , between (50 , 100 ));
266
259
ensureGreen (index );
267
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
260
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
268
261
269
- assertOK (
270
- client (). performRequest ( "PUT" , "/_snapshot/repo/bwc-snapshot" , singletonMap ( " wait_for_completion" , "true" ),
271
- new StringEntity ("{\" indices\" : \" " + index + "\" }" , ContentType . APPLICATION_JSON ))
272
- );
262
+ request = new Request ( "PUT" , "/_snapshot/repo/bwc-snapshot" );
263
+ request . addParameter ( " wait_for_completion" , "true" );
264
+ request . setJsonEntity ("{\" indices\" : \" " + index + "\" }" );
265
+ assertOK ( client (). performRequest ( request ) );
273
266
274
267
// Allocating shards on all nodes, taking snapshots should happen on all nodes.
275
268
updateIndexSettings (index , Settings .builder ().putNull ("index.routing.allocation.include._name" ));
276
269
ensureGreen (index );
277
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
270
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
278
271
279
- assertOK (
280
- client ().performRequest ("PUT" , "/_snapshot/repo/mixed-snapshot" , singletonMap ("wait_for_completion" , "true" ),
281
- new StringEntity ("{\" indices\" : \" " + index + "\" }" , ContentType .APPLICATION_JSON ))
282
- );
272
+ request = new Request ("PUT" , "/_snapshot/repo/mixed-snapshot" );
273
+ request .addParameter ("wait_for_completion" , "true" );
274
+ request .setJsonEntity ("{\" indices\" : \" " + index + "\" }" );
283
275
}
284
276
285
277
private void assertCount (final String index , final String preference , final int expectedCount ) throws IOException {
286
- final Response response = client ().performRequest ("GET" , index + "/_count" , Collections .singletonMap ("preference" , preference ));
278
+ Request request = new Request ("GET" , index + "/_count" );
279
+ request .addParameter ("preference" , preference );
280
+ final Response response = client ().performRequest (request );
287
281
assertOK (response );
288
282
final int actualCount = Integer .parseInt (ObjectPath .createFromResponse (response ).evaluate ("count" ).toString ());
289
283
assertThat (actualCount , equalTo (expectedCount ));
290
284
}
291
285
292
286
private void assertVersion (final String index , final int docId , final String preference , final int expectedVersion ) throws IOException {
293
- final Response response = client ().performRequest ("GET" , index + "/test/" + docId ,
294
- Collections .singletonMap ("preference" , preference ));
287
+ Request request = new Request ("GET" , index + "/test/" + docId );
288
+ request .addParameter ("preference" , preference );
289
+ final Response response = client ().performRequest (request );
295
290
assertOK (response );
296
291
final int actualVersion = Integer .parseInt (ObjectPath .createFromResponse (response ).evaluate ("_version" ).toString ());
297
292
assertThat ("version mismatch for doc [" + docId + "] preference [" + preference + "]" , actualVersion , equalTo (expectedVersion ));
@@ -323,7 +318,9 @@ private void assertSeqNoOnShards(String index, Nodes nodes, int numDocs, RestCli
323
318
}
324
319
325
320
private List <Shard > buildShards (String index , Nodes nodes , RestClient client ) throws IOException {
326
- Response response = client .performRequest ("GET" , index + "/_stats" , singletonMap ("level" , "shards" ));
321
+ Request request = new Request ("GET" , index + "/_stats" );
322
+ request .addParameter ("level" , "shards" );
323
+ Response response = client .performRequest (request );
327
324
List <Object > shardStats = ObjectPath .createFromResponse (response ).evaluate ("indices." + index + ".shards.0" );
328
325
ArrayList <Shard > shards = new ArrayList <>();
329
326
for (Object shard : shardStats ) {
@@ -341,7 +338,7 @@ private List<Shard> buildShards(String index, Nodes nodes, RestClient client) th
341
338
}
342
339
343
340
private Nodes buildNodeAndVersions () throws IOException {
344
- Response response = client ().performRequest ("GET" , "_nodes" );
341
+ Response response = client ().performRequest (new Request ( "GET" , "_nodes" ) );
345
342
ObjectPath objectPath = ObjectPath .createFromResponse (response );
346
343
Map <String , Object > nodesAsMap = objectPath .evaluate ("nodes" );
347
344
Nodes nodes = new Nodes ();
@@ -352,7 +349,7 @@ private Nodes buildNodeAndVersions() throws IOException {
352
349
Version .fromString (objectPath .evaluate ("nodes." + id + ".version" )),
353
350
HttpHost .create (objectPath .evaluate ("nodes." + id + ".http.publish_address" ))));
354
351
}
355
- response = client ().performRequest ("GET" , "_cluster/state" );
352
+ response = client ().performRequest (new Request ( "GET" , "_cluster/state" ) );
356
353
nodes .setMasterNodeId (ObjectPath .createFromResponse (response ).evaluate ("master_node" ));
357
354
return nodes ;
358
355
}
0 commit comments