39
39
import org .elasticsearch .action .update .UpdateResponse ;
40
40
import org .elasticsearch .common .Strings ;
41
41
import org .elasticsearch .common .bytes .BytesReference ;
42
- import org .elasticsearch .common .settings .Settings ;
43
42
import org .elasticsearch .common .unit .ByteSizeUnit ;
44
43
import org .elasticsearch .common .unit .ByteSizeValue ;
45
44
import org .elasticsearch .common .xcontent .XContentBuilder ;
50
49
import org .elasticsearch .script .Script ;
51
50
import org .elasticsearch .script .ScriptType ;
52
51
import org .elasticsearch .search .fetch .subphase .FetchSourceContext ;
53
- import org .elasticsearch .threadpool .ThreadPool ;
54
52
55
53
import java .io .IOException ;
56
54
import java .util .Collections ;
@@ -614,14 +612,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
614
612
}
615
613
};
616
614
617
- ThreadPool threadPool = new ThreadPool (Settings .builder ().put ("node.name" , getClass ().getName ()).build ());
618
615
// Pull the client to a variable to work around https://bugs.eclipse.org/bugs/show_bug.cgi?id=514884
619
616
RestHighLevelClient hlClient = highLevelClient ();
620
- try (BulkProcessor processor = new BulkProcessor .Builder (hlClient ::bulkAsync , listener , threadPool )
621
- .setConcurrentRequests (0 )
622
- .setBulkSize (new ByteSizeValue (5 , ByteSizeUnit .GB ))
623
- .setBulkActions (nbItems + 1 )
624
- .build ()) {
617
+
618
+ try (BulkProcessor processor = BulkProcessor .builder (hlClient ::bulkAsync , listener )
619
+ .setConcurrentRequests (0 )
620
+ .setBulkSize (new ByteSizeValue (5 , ByteSizeUnit .GB ))
621
+ .setBulkActions (nbItems + 1 )
622
+ .build ()) {
625
623
for (int i = 0 ; i < nbItems ; i ++) {
626
624
String id = String .valueOf (i );
627
625
boolean erroneous = randomBoolean ();
@@ -631,7 +629,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
631
629
if (opType == DocWriteRequest .OpType .DELETE ) {
632
630
if (erroneous == false ) {
633
631
assertEquals (RestStatus .CREATED ,
634
- highLevelClient ().index (new IndexRequest ("index" , "test" , id ).source ("field" , -1 )).status ());
632
+ highLevelClient ().index (new IndexRequest ("index" , "test" , id ).source ("field" , -1 )).status ());
635
633
}
636
634
DeleteRequest deleteRequest = new DeleteRequest ("index" , "test" , id );
637
635
processor .add (deleteRequest );
@@ -653,10 +651,10 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
653
651
654
652
} else if (opType == DocWriteRequest .OpType .UPDATE ) {
655
653
UpdateRequest updateRequest = new UpdateRequest ("index" , "test" , id )
656
- .doc (new IndexRequest ().source (xContentType , "id" , i ));
654
+ .doc (new IndexRequest ().source (xContentType , "id" , i ));
657
655
if (erroneous == false ) {
658
656
assertEquals (RestStatus .CREATED ,
659
- highLevelClient ().index (new IndexRequest ("index" , "test" , id ).source ("field" , -1 )).status ());
657
+ highLevelClient ().index (new IndexRequest ("index" , "test" , id ).source ("field" , -1 )).status ());
660
658
}
661
659
processor .add (updateRequest );
662
660
}
@@ -676,8 +674,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
676
674
assertNull (error .get ());
677
675
678
676
validateBulkResponses (nbItems , errors , bulkResponse , bulkRequest );
679
-
680
- terminate (threadPool );
681
677
}
682
678
683
679
private void validateBulkResponses (int nbItems , boolean [] errors , BulkResponse bulkResponse , BulkRequest bulkRequest ) {
0 commit comments