19
19
package org .elasticsearch .backwards ;
20
20
21
21
import org .apache .http .HttpHost ;
22
- import org .apache .http .entity .ContentType ;
23
- import org .apache .http .entity .StringEntity ;
24
22
import org .elasticsearch .Version ;
23
+ import org .elasticsearch .client .Request ;
25
24
import org .elasticsearch .client .Response ;
26
25
import org .elasticsearch .client .RestClient ;
27
26
import org .elasticsearch .cluster .metadata .IndexMetaData ;
34
33
35
34
import java .io .IOException ;
36
35
import java .util .ArrayList ;
37
- import java .util .Collections ;
38
36
import java .util .HashMap ;
39
37
import java .util .List ;
40
38
import java .util .Map ;
41
39
import java .util .stream .Collectors ;
42
40
43
- import static com .carrotsearch .randomizedtesting .RandomizedTest .randomAsciiOfLength ;
44
- import static java .util .Collections .emptyMap ;
45
- import static java .util .Collections .singletonMap ;
46
41
import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
47
42
import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
48
- import static org .hamcrest .Matchers .empty ;
49
43
import static org .hamcrest .Matchers .equalTo ;
50
- import static org .hamcrest .Matchers .not ;
51
44
52
45
public class IndexingIT extends ESRestTestCase {
53
46
54
47
private int indexDocs (String index , final int idStart , final int numDocs ) throws IOException {
55
48
for (int i = 0 ; i < numDocs ; i ++) {
56
49
final int id = idStart + i ;
57
- assertOK (client ().performRequest ("PUT" , index + "/test/" + id , emptyMap (),
58
- new StringEntity ("{\" test\" : \" test_" + randomAsciiOfLength (2 ) + "\" }" , ContentType .APPLICATION_JSON )));
50
+ Request request = new Request ("PUT" , index + "/test/" + id );
51
+ request .setJsonEntity ("{\" test\" : \" test_" + randomAlphaOfLength (2 ) + "\" }" );
52
+ assertOK (client ().performRequest (request ));
59
53
}
60
54
return numDocs ;
61
55
}
@@ -108,7 +102,7 @@ public void testIndexVersionPropagation() throws Exception {
108
102
logger .info ("allowing shards on all nodes" );
109
103
updateIndexSettings (index , Settings .builder ().putNull ("index.routing.allocation.include._name" ));
110
104
ensureGreen (index );
111
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
105
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
112
106
List <Shard > shards = buildShards (index , nodes , newNodeClient );
113
107
Shard primary = buildShards (index , nodes , newNodeClient ).stream ().filter (Shard ::isPrimary ).findFirst ().get ();
114
108
logger .info ("primary resolved to: " + primary .getNode ().getNodeName ());
@@ -120,7 +114,7 @@ public void testIndexVersionPropagation() throws Exception {
120
114
nUpdates = randomIntBetween (minUpdates , maxUpdates );
121
115
logger .info ("indexing docs with [{}] concurrent updates after allowing shards on all nodes" , nUpdates );
122
116
final int finalVersionForDoc2 = indexDocWithConcurrentUpdates (index , 2 , nUpdates );
123
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
117
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
124
118
shards = buildShards (index , nodes , newNodeClient );
125
119
primary = shards .stream ().filter (Shard ::isPrimary ).findFirst ().get ();
126
120
logger .info ("primary resolved to: " + primary .getNode ().getNodeName ());
@@ -136,7 +130,7 @@ public void testIndexVersionPropagation() throws Exception {
136
130
nUpdates = randomIntBetween (minUpdates , maxUpdates );
137
131
logger .info ("indexing docs with [{}] concurrent updates after moving primary" , nUpdates );
138
132
final int finalVersionForDoc3 = indexDocWithConcurrentUpdates (index , 3 , nUpdates );
139
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
133
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
140
134
shards = buildShards (index , nodes , newNodeClient );
141
135
for (Shard shard : shards ) {
142
136
assertVersion (index , 3 , "_only_nodes:" + shard .getNode ().getNodeName (), finalVersionForDoc3 );
@@ -149,7 +143,7 @@ public void testIndexVersionPropagation() throws Exception {
149
143
nUpdates = randomIntBetween (minUpdates , maxUpdates );
150
144
logger .info ("indexing doc with [{}] concurrent updates after setting number of replicas to 0" , nUpdates );
151
145
final int finalVersionForDoc4 = indexDocWithConcurrentUpdates (index , 4 , nUpdates );
152
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
146
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
153
147
shards = buildShards (index , nodes , newNodeClient );
154
148
for (Shard shard : shards ) {
155
149
assertVersion (index , 4 , "_only_nodes:" + shard .getNode ().getNodeName (), finalVersionForDoc4 );
@@ -162,7 +156,7 @@ public void testIndexVersionPropagation() throws Exception {
162
156
nUpdates = randomIntBetween (minUpdates , maxUpdates );
163
157
logger .info ("indexing doc with [{}] concurrent updates after setting number of replicas to 1" , nUpdates );
164
158
final int finalVersionForDoc5 = indexDocWithConcurrentUpdates (index , 5 , nUpdates );
165
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
159
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
166
160
shards = buildShards (index , nodes , newNodeClient );
167
161
for (Shard shard : shards ) {
168
162
assertVersion (index , 5 , "_only_nodes:" + shard .getNode ().getNodeName (), finalVersionForDoc5 );
@@ -197,7 +191,7 @@ public void testSeqNoCheckpoints() throws Exception {
197
191
logger .info ("allowing shards on all nodes" );
198
192
updateIndexSettings (index , Settings .builder ().putNull ("index.routing.allocation.include._name" ));
199
193
ensureGreen (index );
200
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
194
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
201
195
for (final String bwcName : bwcNamesList ) {
202
196
assertCount (index , "_only_nodes:" + bwcName , numDocs );
203
197
}
@@ -228,7 +222,7 @@ public void testSeqNoCheckpoints() throws Exception {
228
222
logger .info ("setting number of replicas to 1" );
229
223
updateIndexSettings (index , Settings .builder ().put ("index.number_of_replicas" , 1 ));
230
224
ensureGreen (index );
231
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
225
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
232
226
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
233
227
assertCount (index , "_primary" , numDocs );
234
228
assertCount (index , "_replica" , numDocs );
@@ -242,20 +236,18 @@ public void testUpdateSnapshotStatus() throws Exception {
242
236
logger .info ("cluster discovered: {}" , nodes .toString ());
243
237
244
238
// Create the repository before taking the snapshot.
245
- String repoConfig = Strings
239
+ Request request = new Request ("PUT" , "/_snapshot/repo" );
240
+ request .setJsonEntity (Strings
246
241
.toString (JsonXContent .contentBuilder ()
247
242
.startObject ()
248
- .field ("type" , "fs" )
249
- .startObject ("settings" )
250
- .field ("compress" , randomBoolean ())
251
- .field ("location" , System .getProperty ("tests.path.repo" ))
252
- .endObject ()
253
- .endObject ());
254
-
255
- assertOK (
256
- client ().performRequest ("PUT" , "/_snapshot/repo" , emptyMap (),
257
- new StringEntity (repoConfig , ContentType .APPLICATION_JSON ))
258
- );
243
+ .field ("type" , "fs" )
244
+ .startObject ("settings" )
245
+ .field ("compress" , randomBoolean ())
246
+ .field ("location" , System .getProperty ("tests.path.repo" ))
247
+ .endObject ()
248
+ .endObject ()));
249
+
250
+ assertOK (client ().performRequest (request ));
259
251
260
252
String bwcNames = nodes .getBWCNodes ().stream ().map (Node ::getNodeName ).collect (Collectors .joining ("," ));
261
253
@@ -269,34 +261,36 @@ public void testUpdateSnapshotStatus() throws Exception {
269
261
createIndex (index , settings .build ());
270
262
indexDocs (index , 0 , between (50 , 100 ));
271
263
ensureGreen (index );
272
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
264
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
273
265
274
- assertOK (
275
- client (). performRequest ( "PUT" , "/_snapshot/repo/bwc-snapshot" , singletonMap ( " wait_for_completion" , "true" ),
276
- new StringEntity ("{\" indices\" : \" " + index + "\" }" , ContentType . APPLICATION_JSON ))
277
- );
266
+ request = new Request ( "PUT" , "/_snapshot/repo/bwc-snapshot" );
267
+ request . addParameter ( " wait_for_completion" , "true" );
268
+ request . setJsonEntity ("{\" indices\" : \" " + index + "\" }" );
269
+ assertOK ( client (). performRequest ( request ) );
278
270
279
271
// Allocating shards on all nodes, taking snapshots should happen on all nodes.
280
272
updateIndexSettings (index , Settings .builder ().putNull ("index.routing.allocation.include._name" ));
281
273
ensureGreen (index );
282
- assertOK (client ().performRequest ("POST" , index + "/_refresh" ));
274
+ assertOK (client ().performRequest (new Request ( "POST" , index + "/_refresh" ) ));
283
275
284
- assertOK (
285
- client ().performRequest ("PUT" , "/_snapshot/repo/mixed-snapshot" , singletonMap ("wait_for_completion" , "true" ),
286
- new StringEntity ("{\" indices\" : \" " + index + "\" }" , ContentType .APPLICATION_JSON ))
287
- );
276
+ request = new Request ("PUT" , "/_snapshot/repo/mixed-snapshot" );
277
+ request .addParameter ("wait_for_completion" , "true" );
278
+ request .setJsonEntity ("{\" indices\" : \" " + index + "\" }" );
288
279
}
289
280
290
281
private void assertCount (final String index , final String preference , final int expectedCount ) throws IOException {
291
- final Response response = client ().performRequest ("GET" , index + "/_count" , Collections .singletonMap ("preference" , preference ));
282
+ Request request = new Request ("GET" , index + "/_count" );
283
+ request .addParameter ("preference" , preference );
284
+ final Response response = client ().performRequest (request );
292
285
assertOK (response );
293
286
final int actualCount = Integer .parseInt (ObjectPath .createFromResponse (response ).evaluate ("count" ).toString ());
294
287
assertThat (actualCount , equalTo (expectedCount ));
295
288
}
296
289
297
290
private void assertVersion (final String index , final int docId , final String preference , final int expectedVersion ) throws IOException {
298
- final Response response = client ().performRequest ("GET" , index + "/test/" + docId ,
299
- Collections .singletonMap ("preference" , preference ));
291
+ Request request = new Request ("GET" , index + "/test/" + docId );
292
+ request .addParameter ("preference" , preference );
293
+ final Response response = client ().performRequest (request );
300
294
assertOK (response );
301
295
final int actualVersion = Integer .parseInt (ObjectPath .createFromResponse (response ).evaluate ("_version" ).toString ());
302
296
assertThat ("version mismatch for doc [" + docId + "] preference [" + preference + "]" , actualVersion , equalTo (expectedVersion ));
@@ -339,7 +333,9 @@ private void assertSeqNoOnShards(String index, Nodes nodes, int numDocs, RestCli
339
333
}
340
334
341
335
private List <Shard > buildShards (String index , Nodes nodes , RestClient client ) throws IOException {
342
- Response response = client .performRequest ("GET" , index + "/_stats" , singletonMap ("level" , "shards" ));
336
+ Request request = new Request ("GET" , index + "/_stats" );
337
+ request .addParameter ("level" , "shards" );
338
+ Response response = client .performRequest (request );
343
339
List <Object > shardStats = ObjectPath .createFromResponse (response ).evaluate ("indices." + index + ".shards.0" );
344
340
ArrayList <Shard > shards = new ArrayList <>();
345
341
for (Object shard : shardStats ) {
@@ -361,7 +357,7 @@ private List<Shard> buildShards(String index, Nodes nodes, RestClient client) th
361
357
}
362
358
363
359
private Nodes buildNodeAndVersions () throws IOException {
364
- Response response = client ().performRequest ("GET" , "_nodes" );
360
+ Response response = client ().performRequest (new Request ( "GET" , "_nodes" ) );
365
361
ObjectPath objectPath = ObjectPath .createFromResponse (response );
366
362
Map <String , Object > nodesAsMap = objectPath .evaluate ("nodes" );
367
363
Nodes nodes = new Nodes ();
@@ -372,7 +368,7 @@ private Nodes buildNodeAndVersions() throws IOException {
372
368
Version .fromString (objectPath .evaluate ("nodes." + id + ".version" )),
373
369
HttpHost .create (objectPath .evaluate ("nodes." + id + ".http.publish_address" ))));
374
370
}
375
- response = client ().performRequest ("GET" , "_cluster/state" );
371
+ response = client ().performRequest (new Request ( "GET" , "_cluster/state" ) );
376
372
nodes .setMasterNodeId (ObjectPath .createFromResponse (response ).evaluate ("master_node" ));
377
373
return nodes ;
378
374
}
0 commit comments