6
6
7
7
package org .elasticsearch .xpack .ccr ;
8
8
9
- import org .apache .http .HttpHost ;
10
- import org .apache .http .util .EntityUtils ;
11
- import org .elasticsearch .client .Request ;
12
- import org .elasticsearch .client .Response ;
13
9
import org .elasticsearch .client .RestClient ;
14
- import org .elasticsearch .common .Strings ;
15
10
import org .elasticsearch .common .settings .Settings ;
16
- import org .elasticsearch .common .xcontent .XContentBuilder ;
17
- import org .elasticsearch .common .xcontent .XContentHelper ;
18
- import org .elasticsearch .common .xcontent .json .JsonXContent ;
19
- import org .elasticsearch .common .xcontent .support .XContentMapValues ;
20
- import org .elasticsearch .test .rest .ESRestTestCase ;
21
11
22
- import java .io .IOException ;
23
- import java .util .List ;
24
- import java .util .Map ;
25
-
26
- import static org .elasticsearch .common .xcontent .XContentFactory .jsonBuilder ;
27
- import static org .hamcrest .Matchers .equalTo ;
28
-
29
- public class ChainIT extends ESRestTestCase {
30
-
31
- private final String targetCluster = System .getProperty ("tests.target_cluster" );
32
-
33
- @ Override
34
- protected boolean preserveClusterUponCompletion () {
35
- return true ;
36
- }
12
+ public class ChainIT extends ESCCRRestTestCase {
37
13
38
14
public void testFollowIndex () throws Exception {
39
15
final int numDocs = 128 ;
@@ -60,23 +36,23 @@ public void testFollowIndex() throws Exception {
60
36
index (client (), leaderIndexName , Integer .toString (i ), "field" , i , "filtered_field" , "true" );
61
37
}
62
38
refresh (leaderIndexName );
63
- verifyDocuments (leaderIndexName , numDocs );
39
+ verifyDocuments (leaderIndexName , numDocs , "filtered_field:true" );
64
40
} else if ("middle" .equals (targetCluster )) {
65
41
logger .info ("Running against middle cluster" );
66
42
followIndex ("leader_cluster" , leaderIndexName , middleIndexName );
67
- assertBusy (() -> verifyDocuments (middleIndexName , numDocs ));
43
+ assertBusy (() -> verifyDocuments (middleIndexName , numDocs , "filtered_field:true" ));
68
44
try (RestClient leaderClient = buildLeaderClient ()) {
69
45
int id = numDocs ;
70
46
index (leaderClient , leaderIndexName , Integer .toString (id ), "field" , id , "filtered_field" , "true" );
71
47
index (leaderClient , leaderIndexName , Integer .toString (id + 1 ), "field" , id + 1 , "filtered_field" , "true" );
72
48
index (leaderClient , leaderIndexName , Integer .toString (id + 2 ), "field" , id + 2 , "filtered_field" , "true" );
73
49
}
74
- assertBusy (() -> verifyDocuments (middleIndexName , numDocs + 3 ));
50
+ assertBusy (() -> verifyDocuments (middleIndexName , numDocs + 3 , "filtered_field:true" ));
75
51
} else if ("follow" .equals (targetCluster )) {
76
52
logger .info ("Running against follow cluster" );
77
53
final String followIndexName = "follow" ;
78
54
followIndex ("middle_cluster" , middleIndexName , followIndexName );
79
- assertBusy (() -> verifyDocuments (followIndexName , numDocs + 3 ));
55
+ assertBusy (() -> verifyDocuments (followIndexName , numDocs + 3 , "filtered_field:true" ));
80
56
81
57
try (RestClient leaderClient = buildLeaderClient ()) {
82
58
int id = numDocs + 3 ;
@@ -86,82 +62,13 @@ public void testFollowIndex() throws Exception {
86
62
}
87
63
88
64
try (RestClient middleClient = buildMiddleClient ()) {
89
- assertBusy (() -> verifyDocuments (middleIndexName , numDocs + 6 , middleClient ));
65
+ assertBusy (() -> verifyDocuments (middleIndexName , numDocs + 6 , "filtered_field:true" , middleClient ));
90
66
}
91
67
92
- assertBusy (() -> verifyDocuments (followIndexName , numDocs + 6 ));
68
+ assertBusy (() -> verifyDocuments (followIndexName , numDocs + 6 , "filtered_field:true" ));
93
69
} else {
94
70
fail ("unexpected target cluster [" + targetCluster + "]" );
95
71
}
96
72
}
97
73
98
- private static void index (RestClient client , String index , String id , Object ... fields ) throws IOException {
99
- XContentBuilder document = jsonBuilder ().startObject ();
100
- for (int i = 0 ; i < fields .length ; i += 2 ) {
101
- document .field ((String ) fields [i ], fields [i + 1 ]);
102
- }
103
- document .endObject ();
104
- final Request request = new Request ("POST" , "/" + index + "/_doc/" + id );
105
- request .setJsonEntity (Strings .toString (document ));
106
- assertOK (client .performRequest (request ));
107
- }
108
-
109
- private static void refresh (String index ) throws IOException {
110
- assertOK (client ().performRequest (new Request ("POST" , "/" + index + "/_refresh" )));
111
- }
112
-
113
- private static void followIndex (String leaderCluster , String leaderIndex , String followIndex ) throws IOException {
114
- final Request request = new Request ("PUT" , "/" + followIndex + "/_ccr/follow" );
115
- request .setJsonEntity (
116
- "{\" leader_cluster\" : \" " + leaderCluster + "\" , \" leader_index\" : \" " + leaderIndex + "\" , \" poll_timeout\" : \" 10ms\" }" );
117
- assertOK (client ().performRequest (request ));
118
- }
119
-
120
- private static void verifyDocuments (String index , int expectedNumDocs ) throws IOException {
121
- verifyDocuments (index , expectedNumDocs , client ());
122
- }
123
-
124
- private static void verifyDocuments (final String index , final int expectedNumDocs , final RestClient client ) throws IOException {
125
- final Request request = new Request ("GET" , "/" + index + "/_search" );
126
- request .addParameter ("size" , Integer .toString (expectedNumDocs ));
127
- request .addParameter ("sort" , "field:asc" );
128
- request .addParameter ("q" , "filtered_field:true" );
129
- Map <String , ?> response = toMap (client .performRequest (request ));
130
-
131
- int numDocs = (int ) XContentMapValues .extractValue ("hits.total" , response );
132
- assertThat (numDocs , equalTo (expectedNumDocs ));
133
-
134
- List <?> hits = (List <?>) XContentMapValues .extractValue ("hits.hits" , response );
135
- assertThat (hits .size (), equalTo (expectedNumDocs ));
136
- for (int i = 0 ; i < expectedNumDocs ; i ++) {
137
- int value = (int ) XContentMapValues .extractValue ("_source.field" , (Map <?, ?>) hits .get (i ));
138
- assertThat (i , equalTo (value ));
139
- }
140
- }
141
-
142
- private static Map <String , Object > toMap (Response response ) throws IOException {
143
- return toMap (EntityUtils .toString (response .getEntity ()));
144
- }
145
-
146
- private static Map <String , Object > toMap (String response ) {
147
- return XContentHelper .convertToMap (JsonXContent .jsonXContent , response , false );
148
- }
149
-
150
- private RestClient buildLeaderClient () throws IOException {
151
- assert "leader" .equals (targetCluster ) == false ;
152
- return buildClient (System .getProperty ("tests.leader_host" ));
153
- }
154
-
155
- private RestClient buildMiddleClient () throws IOException {
156
- assert "middle" .equals (targetCluster ) == false ;
157
- return buildClient (System .getProperty ("tests.middle_host" ));
158
- }
159
-
160
- private RestClient buildClient (final String url ) throws IOException {
161
- int portSeparator = url .lastIndexOf (':' );
162
- HttpHost httpHost = new HttpHost (url .substring (0 , portSeparator ),
163
- Integer .parseInt (url .substring (portSeparator + 1 )), getProtocol ());
164
- return buildClient (Settings .EMPTY , new HttpHost []{httpHost });
165
- }
166
-
167
74
}
0 commit comments