5
5
*/
6
6
package org .elasticsearch .upgrades ;
7
7
8
+ import org .elasticsearch .Version ;
8
9
import org .elasticsearch .client .Request ;
9
10
import org .elasticsearch .client .ResponseException ;
10
11
import org .elasticsearch .client .RestClient ;
11
12
import org .elasticsearch .common .Strings ;
12
13
import org .elasticsearch .common .settings .Settings ;
14
+ import org .elasticsearch .common .xcontent .ObjectPath ;
13
15
import org .elasticsearch .common .xcontent .support .XContentMapValues ;
14
16
15
17
import java .io .IOException ;
@@ -88,6 +90,123 @@ public void testIndexFollowing() throws Exception {
88
90
}
89
91
}
90
92
93
+ public void testAutoFollowing () throws Exception {
94
+ String leaderIndex1 = "logs-20200101" ;
95
+ String leaderIndex2 = "logs-20200102" ;
96
+ String leaderIndex3 = "logs-20200103" ;
97
+
98
+ if (clusterName == ClusterName .LEADER ) {
99
+ switch (upgradeState ) {
100
+ case NONE :
101
+ case ONE_THIRD :
102
+ case TWO_THIRD :
103
+ break ;
104
+ case ALL :
105
+ index (leaderClient (), leaderIndex1 , 64 );
106
+ assertBusy (() -> {
107
+ String followerIndex = "copy-" + leaderIndex1 ;
108
+ assertTotalHitCount (followerIndex , 320 , followerClient ());
109
+ });
110
+ index (leaderClient (), leaderIndex2 , 64 );
111
+ assertBusy (() -> {
112
+ String followerIndex = "copy-" + leaderIndex2 ;
113
+ assertTotalHitCount (followerIndex , 256 , followerClient ());
114
+ });
115
+ index (leaderClient (), leaderIndex3 , 64 );
116
+ assertBusy (() -> {
117
+ String followerIndex = "copy-" + leaderIndex3 ;
118
+ assertTotalHitCount (followerIndex , 192 , followerClient ());
119
+ });
120
+
121
+ deleteAutoFollowPattern (followerClient (), "test_pattern" );
122
+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex1 );
123
+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex2 );
124
+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex3 );
125
+ break ;
126
+ default :
127
+ throw new AssertionError ("unexpected upgrade_state [" + upgradeState + "]" );
128
+ }
129
+ } else if (clusterName == ClusterName .FOLLOWER ) {
130
+ switch (upgradeState ) {
131
+ case NONE :
132
+ putAutoFollowPattern (followerClient (), "test_pattern" , "leader" , "logs-*" );
133
+ createLeaderIndex (leaderClient (), leaderIndex1 );
134
+ index (leaderClient (), leaderIndex1 , 64 );
135
+ assertBusy (() -> {
136
+ String followerIndex = "copy-" + leaderIndex1 ;
137
+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (1 ));
138
+ assertTotalHitCount (followerIndex , 64 , followerClient ());
139
+ });
140
+ break ;
141
+ case ONE_THIRD :
142
+ index (leaderClient (), leaderIndex1 , 64 );
143
+ assertBusy (() -> {
144
+ String followerIndex = "copy-" + leaderIndex1 ;
145
+ assertTotalHitCount (followerIndex , 128 , followerClient ());
146
+ });
147
+ // Auto follow stats are kept in-memory on master elected node
148
+ // and if this node get updated then auto follow stats are reset
149
+ {
150
+ int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices ();
151
+ createLeaderIndex (leaderClient (), leaderIndex2 );
152
+ index (leaderClient (), leaderIndex2 , 64 );
153
+ assertBusy (() -> {
154
+ String followerIndex = "copy-" + leaderIndex2 ;
155
+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (previousNumberOfSuccessfulFollowedIndices + 1 ));
156
+ assertTotalHitCount (followerIndex , 64 , followerClient ());
157
+ });
158
+ }
159
+ break ;
160
+ case TWO_THIRD :
161
+ index (leaderClient (), leaderIndex1 , 64 );
162
+ assertBusy (() -> {
163
+ String followerIndex = "copy-" + leaderIndex1 ;
164
+ assertTotalHitCount (followerIndex , 192 , followerClient ());
165
+ });
166
+ index (leaderClient (), leaderIndex2 , 64 );
167
+ assertBusy (() -> {
168
+ String followerIndex = "copy-" + leaderIndex2 ;
169
+ assertTotalHitCount (followerIndex , 128 , followerClient ());
170
+ });
171
+
172
+ // Auto follow stats are kept in-memory on master elected node
173
+ // and if this node get updated then auto follow stats are reset
174
+ {
175
+ int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices ();
176
+ createLeaderIndex (leaderClient (), leaderIndex3 );
177
+ index (leaderClient (), leaderIndex3 , 64 );
178
+ assertBusy (() -> {
179
+ String followerIndex = "copy-" + leaderIndex3 ;
180
+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (previousNumberOfSuccessfulFollowedIndices + 1 ));
181
+ assertTotalHitCount (followerIndex , 64 , followerClient ());
182
+ });
183
+ }
184
+ break ;
185
+ case ALL :
186
+ index (leaderClient (), leaderIndex1 , 64 );
187
+ assertBusy (() -> {
188
+ String followerIndex = "copy-" + leaderIndex1 ;
189
+ assertTotalHitCount (followerIndex , 256 , followerClient ());
190
+ });
191
+ index (leaderClient (), leaderIndex2 , 64 );
192
+ assertBusy (() -> {
193
+ String followerIndex = "copy-" + leaderIndex2 ;
194
+ assertTotalHitCount (followerIndex , 192 , followerClient ());
195
+ });
196
+ index (leaderClient (), leaderIndex3 , 64 );
197
+ assertBusy (() -> {
198
+ String followerIndex = "copy-" + leaderIndex3 ;
199
+ assertTotalHitCount (followerIndex , 128 , followerClient ());
200
+ });
201
+ break ;
202
+ default :
203
+ throw new UnsupportedOperationException ("unexpected upgrade state [" + upgradeState + "]" );
204
+ }
205
+ } else {
206
+ throw new AssertionError ("unexpected cluster_name [" + clusterName + "]" );
207
+ }
208
+ }
209
+
91
210
public void testCannotFollowLeaderInUpgradedCluster () throws Exception {
92
211
assumeTrue ("Tests only runs with upgrade_state [all]" , upgradeState == UpgradeState .ALL );
93
212
@@ -113,12 +232,13 @@ public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
113
232
}
114
233
115
234
private static void createLeaderIndex (RestClient client , String indexName ) throws IOException {
116
- Settings indexSettings = Settings .builder ()
117
- .put ("index.soft_deletes.enabled" , true )
235
+ Settings .Builder indexSettings = Settings .builder ()
118
236
.put ("index.number_of_shards" , 1 )
119
- .put ("index.number_of_replicas" , 0 )
120
- .build ();
121
- createIndex (client , indexName , indexSettings );
237
+ .put ("index.number_of_replicas" , 0 );
238
+ if (UPGRADE_FROM_VERSION .before (Version .V_7_0_0 ) || randomBoolean ()) {
239
+ indexSettings .put ("index.soft_deletes.enabled" , true );
240
+ }
241
+ createIndex (client , indexName , indexSettings .build ());
122
242
}
123
243
124
244
private static void createIndex (RestClient client , String name , Settings settings ) throws IOException {
@@ -134,6 +254,29 @@ private static void followIndex(RestClient client, String leaderCluster, String
134
254
assertOK (client .performRequest (request ));
135
255
}
136
256
257
+ private static void putAutoFollowPattern (RestClient client , String name , String remoteCluster , String pattern ) throws IOException {
258
+ Request request = new Request ("PUT" , "/_ccr/auto_follow/" + name );
259
+ request .setJsonEntity ("{\" leader_index_patterns\" : [\" " + pattern + "\" ], \" remote_cluster\" : \" " + remoteCluster + "\" ," +
260
+ "\" follow_index_pattern\" : \" copy-{{leader_index}}\" , \" read_poll_timeout\" : \" 10ms\" }" );
261
+ assertOK (client .performRequest (request ));
262
+ }
263
+
264
+ private static void deleteAutoFollowPattern (RestClient client , String patternName ) throws IOException {
265
+ Request request = new Request ("DELETE" , "/_ccr/auto_follow/" + patternName );
266
+ assertOK (client .performRequest (request ));
267
+ }
268
+
269
+ private int getNumberOfSuccessfulFollowedIndices () throws IOException {
270
+ Request statsRequest = new Request ("GET" , "/_ccr/stats" );
271
+ Map <?, ?> response = toMap (client ().performRequest (statsRequest ));
272
+ Integer actualSuccessfulFollowedIndices = ObjectPath .eval ("auto_follow_stats.number_of_successful_follow_indices" , response );
273
+ if (actualSuccessfulFollowedIndices != null ) {
274
+ return actualSuccessfulFollowedIndices ;
275
+ } else {
276
+ return -1 ;
277
+ }
278
+ }
279
+
137
280
private static void index (RestClient client , String index , int numDocs ) throws IOException {
138
281
for (int i = 0 ; i < numDocs ; i ++) {
139
282
final Request request = new Request ("POST" , "/" + index + "/_doc/" );
@@ -162,4 +305,10 @@ private static void verifyTotalHitCount(final String index,
162
305
assertThat (totalHits , equalTo (expectedTotalHits ));
163
306
}
164
307
308
+ private static void stopIndexFollowing (RestClient client , String followerIndex ) throws IOException {
309
+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_ccr/pause_follow" )));
310
+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_close" )));
311
+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_ccr/unfollow" )));
312
+ }
313
+
165
314
}
0 commit comments