27
27
import org .elasticsearch .action .ActionListener ;
28
28
import org .elasticsearch .action .DocWriteRequest ;
29
29
import org .elasticsearch .action .DocWriteResponse ;
30
+ import org .elasticsearch .action .LatchedActionListener ;
30
31
import org .elasticsearch .action .bulk .BackoffPolicy ;
31
32
import org .elasticsearch .action .bulk .BulkItemResponse ;
32
33
import org .elasticsearch .action .bulk .BulkProcessor ;
60
61
import org .elasticsearch .script .ScriptType ;
61
62
import org .elasticsearch .search .fetch .subphase .FetchSourceContext ;
62
63
63
- import java .io .IOException ;
64
64
import java .util .Collections ;
65
65
import java .util .Date ;
66
66
import java .util .HashMap ;
67
67
import java .util .Map ;
68
+ import java .util .concurrent .CountDownLatch ;
68
69
import java .util .concurrent .TimeUnit ;
69
70
70
71
import static java .util .Collections .emptyMap ;
@@ -227,9 +228,8 @@ public void testIndex() throws Exception {
227
228
}
228
229
{
229
230
IndexRequest request = new IndexRequest ("posts" , "doc" , "async" ).source ("field" , "value" );
230
-
231
- // tag::index-execute-async
232
- client .indexAsync (request , new ActionListener <IndexResponse >() {
231
+ // tag::index-execute-listener
232
+ ActionListener <IndexResponse > listener = new ActionListener <IndexResponse >() {
233
233
@ Override
234
234
public void onResponse (IndexResponse indexResponse ) {
235
235
// <1>
@@ -239,10 +239,18 @@ public void onResponse(IndexResponse indexResponse) {
239
239
public void onFailure (Exception e ) {
240
240
// <2>
241
241
}
242
- });
242
+ };
243
+ // end::index-execute-listener
244
+
245
+ // Replace the empty listener by a blocking listener in test
246
+ final CountDownLatch latch = new CountDownLatch (1 );
247
+ listener = new LatchedActionListener <>(listener , latch );
248
+
249
+ // tag::index-execute-async
250
+ client .indexAsync (request , listener ); // <1>
243
251
// end::index-execute-async
244
252
245
- assertBusy (() -> assertTrue (client . exists ( new GetRequest ( "posts" , "doc" , "async" )) ));
253
+ assertTrue (latch . await ( 30L , TimeUnit . SECONDS ));
246
254
}
247
255
}
248
256
@@ -490,8 +498,8 @@ public void testUpdate() throws Exception {
490
498
{
491
499
UpdateRequest request = new UpdateRequest ("posts" , "doc" , "async" ).doc ("reason" , "async update" ).docAsUpsert (true );
492
500
493
- // tag::update-execute-async
494
- client . updateAsync ( request , new ActionListener <UpdateResponse >() {
501
+ // tag::update-execute-listener
502
+ ActionListener < UpdateResponse > listener = new ActionListener <UpdateResponse >() {
495
503
@ Override
496
504
public void onResponse (UpdateResponse updateResponse ) {
497
505
// <1>
@@ -501,10 +509,18 @@ public void onResponse(UpdateResponse updateResponse) {
501
509
public void onFailure (Exception e ) {
502
510
// <2>
503
511
}
504
- });
512
+ };
513
+ // end::update-execute-listener
514
+
515
+ // Replace the empty listener by a blocking listener in test
516
+ final CountDownLatch latch = new CountDownLatch (1 );
517
+ listener = new LatchedActionListener <>(listener , latch );
518
+
519
+ // tag::update-execute-async
520
+ client .updateAsync (request , listener ); // <1>
505
521
// end::update-execute-async
506
522
507
- assertBusy (() -> assertTrue (client . exists ( new GetRequest ( "posts" , "doc" , "async" )) ));
523
+ assertTrue (latch . await ( 30L , TimeUnit . SECONDS ));
508
524
}
509
525
}
510
526
@@ -602,8 +618,8 @@ public void testDelete() throws Exception {
602
618
603
619
DeleteRequest request = new DeleteRequest ("posts" , "doc" , "async" );
604
620
605
- // tag::delete-execute-async
606
- client . deleteAsync ( request , new ActionListener <DeleteResponse >() {
621
+ // tag::delete-execute-listener
622
+ ActionListener < DeleteResponse > listener = new ActionListener <DeleteResponse >() {
607
623
@ Override
608
624
public void onResponse (DeleteResponse deleteResponse ) {
609
625
// <1>
@@ -613,14 +629,22 @@ public void onResponse(DeleteResponse deleteResponse) {
613
629
public void onFailure (Exception e ) {
614
630
// <2>
615
631
}
616
- });
632
+ };
633
+ // end::delete-execute-listener
634
+
635
+ // Replace the empty listener by a blocking listener in test
636
+ final CountDownLatch latch = new CountDownLatch (1 );
637
+ listener = new LatchedActionListener <>(listener , latch );
638
+
639
+ // tag::delete-execute-async
640
+ client .deleteAsync (request , listener ); // <1>
617
641
// end::delete-execute-async
618
642
619
- assertBusy (() -> assertFalse ( client . exists ( new GetRequest ( "posts" , "doc" , "async" )) ));
643
+ assertTrue ( latch . await ( 30L , TimeUnit . SECONDS ));
620
644
}
621
645
}
622
646
623
- public void testBulk () throws IOException {
647
+ public void testBulk () throws Exception {
624
648
RestHighLevelClient client = highLevelClient ();
625
649
{
626
650
// tag::bulk-request
@@ -696,8 +720,8 @@ public void testBulk() throws IOException {
696
720
request .waitForActiveShards (ActiveShardCount .ALL ); // <2>
697
721
// end::bulk-request-active-shards
698
722
699
- // tag::bulk-execute-async
700
- client . bulkAsync ( request , new ActionListener <BulkResponse >() {
723
+ // tag::bulk-execute-listener
724
+ ActionListener < BulkResponse > listener = new ActionListener <BulkResponse >() {
701
725
@ Override
702
726
public void onResponse (BulkResponse bulkResponse ) {
703
727
// <1>
@@ -707,12 +731,22 @@ public void onResponse(BulkResponse bulkResponse) {
707
731
public void onFailure (Exception e ) {
708
732
// <2>
709
733
}
710
- });
734
+ };
735
+ // end::bulk-execute-listener
736
+
737
+ // Replace the empty listener by a blocking listener in test
738
+ final CountDownLatch latch = new CountDownLatch (1 );
739
+ listener = new LatchedActionListener <>(listener , latch );
740
+
741
+ // tag::bulk-execute-async
742
+ client .bulkAsync (request , listener ); // <1>
711
743
// end::bulk-execute-async
744
+
745
+ assertTrue (latch .await (30L , TimeUnit .SECONDS ));
712
746
}
713
747
}
714
748
715
- public void testGet () throws IOException {
749
+ public void testGet () throws Exception {
716
750
RestHighLevelClient client = highLevelClient ();
717
751
{
718
752
String mappings = "{\n " +
@@ -839,8 +873,9 @@ public void testGet() throws IOException {
839
873
}
840
874
{
841
875
GetRequest request = new GetRequest ("posts" , "doc" , "1" );
842
- //tag::get-execute-async
843
- client .getAsync (request , new ActionListener <GetResponse >() {
876
+
877
+ //tag::get-execute-listener
878
+ ActionListener <GetResponse > listener = new ActionListener <GetResponse >() {
844
879
@ Override
845
880
public void onResponse (GetResponse getResponse ) {
846
881
// <1>
@@ -850,8 +885,18 @@ public void onResponse(GetResponse getResponse) {
850
885
public void onFailure (Exception e ) {
851
886
// <2>
852
887
}
853
- });
888
+ };
889
+ //end::get-execute-listener
890
+
891
+ // Replace the empty listener by a blocking listener in test
892
+ final CountDownLatch latch = new CountDownLatch (1 );
893
+ listener = new LatchedActionListener <>(listener , latch );
894
+
895
+ //tag::get-execute-async
896
+ client .getAsync (request , listener ); // <1>
854
897
//end::get-execute-async
898
+
899
+ assertTrue (latch .await (30L , TimeUnit .SECONDS ));
855
900
}
856
901
{
857
902
//tag::get-indexnotfound
@@ -879,7 +924,7 @@ public void onFailure(Exception e) {
879
924
}
880
925
}
881
926
882
- public void testBulkProcessor () throws InterruptedException , IOException {
927
+ public void testBulkProcessor () throws InterruptedException {
883
928
RestHighLevelClient client = highLevelClient ();
884
929
{
885
930
// tag::bulk-processor-init
0 commit comments