7
7
8
8
import org .apache .logging .log4j .LogManager ;
9
9
import org .apache .logging .log4j .Logger ;
10
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
11
+ import org .elasticsearch .ElasticsearchException ;
10
12
import org .elasticsearch .action .ActionListener ;
11
13
import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
12
14
import org .elasticsearch .client .Client ;
17
19
import org .elasticsearch .cluster .metadata .IndexMetaData ;
18
20
import org .elasticsearch .cluster .metadata .MetaData ;
19
21
import org .elasticsearch .cluster .service .ClusterService ;
22
+ import org .elasticsearch .common .collect .Tuple ;
20
23
import org .elasticsearch .common .settings .Settings ;
21
24
import org .elasticsearch .common .unit .TimeValue ;
25
+ import org .elasticsearch .common .util .concurrent .AtomicArray ;
22
26
import org .elasticsearch .common .util .concurrent .CountDown ;
23
27
import org .elasticsearch .index .Index ;
24
28
import org .elasticsearch .license .LicenseUtils ;
27
31
import org .elasticsearch .xpack .ccr .CcrSettings ;
28
32
import org .elasticsearch .xpack .core .ccr .AutoFollowMetadata ;
29
33
import org .elasticsearch .xpack .core .ccr .AutoFollowMetadata .AutoFollowPattern ;
34
+ import org .elasticsearch .xpack .core .ccr .AutoFollowStats ;
30
35
import org .elasticsearch .xpack .core .ccr .action .CreateAndFollowIndexAction ;
31
36
import org .elasticsearch .xpack .core .ccr .action .FollowIndexAction ;
32
37
33
38
import java .util .ArrayList ;
39
+ import java .util .Collections ;
34
40
import java .util .HashMap ;
41
+ import java .util .LinkedHashMap ;
35
42
import java .util .List ;
36
43
import java .util .Map ;
37
44
import java .util .Objects ;
38
- import java .util .concurrent . atomic . AtomicReference ;
45
+ import java .util .TreeMap ;
39
46
import java .util .function .BiConsumer ;
40
47
import java .util .function .Consumer ;
41
48
import java .util .function .Function ;
47
54
public class AutoFollowCoordinator implements ClusterStateApplier {
48
55
49
56
private static final Logger LOGGER = LogManager .getLogger (AutoFollowCoordinator .class );
57
+ private static final int MAX_AUTO_FOLLOW_ERRORS = 256 ;
50
58
51
59
private final Client client ;
52
60
private final TimeValue pollInterval ;
@@ -56,6 +64,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
56
64
57
65
private volatile boolean localNodeMaster = false ;
58
66
67
+ // The following fields are read and updated under a lock:
68
+ private long numberOfSuccessfulIndicesAutoFollowed = 0 ;
69
+ private long numberOfFailedIndicesAutoFollowed = 0 ;
70
+ private long numberOfFailedRemoteClusterStateRequests = 0 ;
71
+ private final LinkedHashMap <String , ElasticsearchException > recentAutoFollowErrors ;
72
+
59
73
public AutoFollowCoordinator (
60
74
Settings settings ,
61
75
Client client ,
@@ -69,6 +83,47 @@ public AutoFollowCoordinator(
69
83
70
84
this .pollInterval = CcrSettings .CCR_AUTO_FOLLOW_POLL_INTERVAL .get (settings );
71
85
clusterService .addStateApplier (this );
86
+
87
+ this .recentAutoFollowErrors = new LinkedHashMap <String , ElasticsearchException >() {
88
+ @ Override
89
+ protected boolean removeEldestEntry (final Map .Entry <String , ElasticsearchException > eldest ) {
90
+ return size () > MAX_AUTO_FOLLOW_ERRORS ;
91
+ }
92
+ };
93
+ }
94
+
95
+ public synchronized AutoFollowStats getStats () {
96
+ return new AutoFollowStats (
97
+ numberOfFailedIndicesAutoFollowed ,
98
+ numberOfFailedRemoteClusterStateRequests ,
99
+ numberOfSuccessfulIndicesAutoFollowed ,
100
+ new TreeMap <>(recentAutoFollowErrors )
101
+ );
102
+ }
103
+
104
+ synchronized void updateStats (List <AutoFollowResult > results ) {
105
+ for (AutoFollowResult result : results ) {
106
+ if (result .clusterStateFetchException != null ) {
107
+ recentAutoFollowErrors .put (result .clusterAlias ,
108
+ new ElasticsearchException (result .clusterStateFetchException ));
109
+ numberOfFailedRemoteClusterStateRequests ++;
110
+ LOGGER .warn (new ParameterizedMessage ("failure occurred while fetching cluster state in leader cluster [{}]" ,
111
+ result .clusterAlias ), result .clusterStateFetchException );
112
+ } else {
113
+ for (Map .Entry <Index , Exception > entry : result .autoFollowExecutionResults .entrySet ()) {
114
+ if (entry .getValue () != null ) {
115
+ numberOfFailedIndicesAutoFollowed ++;
116
+ recentAutoFollowErrors .put (result .clusterAlias + ":" + entry .getKey ().getName (),
117
+ new ElasticsearchException (entry .getValue ()));
118
+ LOGGER .warn (new ParameterizedMessage ("failure occurred while auto following index [{}] in leader cluster [{}]" ,
119
+ entry .getKey (), result .clusterAlias ), entry .getValue ());
120
+ } else {
121
+ numberOfSuccessfulIndicesAutoFollowed ++;
122
+ }
123
+ }
124
+ }
125
+
126
+ }
72
127
}
73
128
74
129
private void doAutoFollow () {
@@ -94,10 +149,8 @@ private void doAutoFollow() {
94
149
return ;
95
150
}
96
151
97
- Consumer <Exception > handler = e -> {
98
- if (e != null ) {
99
- LOGGER .warn ("failure occurred during auto-follower coordination" , e );
100
- }
152
+ Consumer <List <AutoFollowResult >> handler = results -> {
153
+ updateStats (results );
101
154
threadPool .schedule (pollInterval , ThreadPool .Names .SAME , this ::doAutoFollow );
102
155
};
103
156
AutoFollower operation = new AutoFollower (handler , followerClusterState ) {
@@ -178,101 +231,97 @@ public void applyClusterState(ClusterChangedEvent event) {
178
231
179
232
abstract static class AutoFollower {
180
233
181
- private final Consumer <Exception > handler ;
234
+ private final Consumer <List < AutoFollowResult > > handler ;
182
235
private final ClusterState followerClusterState ;
183
236
private final AutoFollowMetadata autoFollowMetadata ;
184
237
185
238
private final CountDown autoFollowPatternsCountDown ;
186
- private final AtomicReference < Exception > autoFollowPatternsErrorHolder = new AtomicReference <>() ;
239
+ private final AtomicArray < AutoFollowResult > autoFollowResults ;
187
240
188
- AutoFollower (final Consumer <Exception > handler , final ClusterState followerClusterState ) {
241
+ AutoFollower (final Consumer <List < AutoFollowResult > > handler , final ClusterState followerClusterState ) {
189
242
this .handler = handler ;
190
243
this .followerClusterState = followerClusterState ;
191
244
this .autoFollowMetadata = followerClusterState .getMetaData ().custom (AutoFollowMetadata .TYPE );
192
245
this .autoFollowPatternsCountDown = new CountDown (autoFollowMetadata .getPatterns ().size ());
246
+ this .autoFollowResults = new AtomicArray <>(autoFollowMetadata .getPatterns ().size ());
193
247
}
194
248
195
249
void autoFollowIndices () {
250
+ int i = 0 ;
196
251
for (Map .Entry <String , AutoFollowPattern > entry : autoFollowMetadata .getPatterns ().entrySet ()) {
197
- String clusterAlias = entry . getKey () ;
198
- AutoFollowPattern autoFollowPattern = entry .getValue ();
199
- List < String > followedIndices = autoFollowMetadata . getFollowedLeaderIndexUUIDs (). get ( clusterAlias );
252
+ final int slot = i ;
253
+ final String clusterAlias = entry .getKey ();
254
+ final AutoFollowPattern autoFollowPattern = entry . getValue ( );
200
255
201
256
getLeaderClusterState (autoFollowPattern .getHeaders (), clusterAlias , (leaderClusterState , e ) -> {
202
257
if (leaderClusterState != null ) {
203
258
assert e == null ;
204
- handleClusterAlias (clusterAlias , autoFollowPattern , followedIndices , leaderClusterState );
259
+ final List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (clusterAlias );
260
+ final List <Index > leaderIndicesToFollow =
261
+ getLeaderIndicesToFollow (autoFollowPattern , leaderClusterState , followerClusterState , followedIndices );
262
+ if (leaderIndicesToFollow .isEmpty ()) {
263
+ finalise (slot , new AutoFollowResult (clusterAlias ));
264
+ } else {
265
+ Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
266
+ checkAutoFollowPattern (clusterAlias , autoFollowPattern , leaderIndicesToFollow , resultHandler );
267
+ }
205
268
} else {
206
- finalise (e );
269
+ finalise (slot , new AutoFollowResult ( clusterAlias , e ) );
207
270
}
208
271
});
272
+ i ++;
209
273
}
210
274
}
211
275
212
- private void handleClusterAlias (String clusterAlias , AutoFollowPattern autoFollowPattern ,
213
- List <String > followedIndexUUIDs , ClusterState leaderClusterState ) {
214
- final List <Index > leaderIndicesToFollow =
215
- getLeaderIndicesToFollow (autoFollowPattern , leaderClusterState , followerClusterState , followedIndexUUIDs );
216
- if (leaderIndicesToFollow .isEmpty ()) {
217
- finalise (null );
218
- } else {
219
- final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
220
- final AtomicReference <Exception > leaderIndicesErrorHolder = new AtomicReference <>();
221
- for (Index indexToFollow : leaderIndicesToFollow ) {
222
- final String leaderIndexName = indexToFollow .getName ();
223
- final String followIndexName = getFollowerIndexName (autoFollowPattern , leaderIndexName );
224
-
225
- String leaderIndexNameWithClusterAliasPrefix = clusterAlias .equals ("_local_" ) ? leaderIndexName :
226
- clusterAlias + ":" + leaderIndexName ;
227
- FollowIndexAction .Request followRequest =
228
- new FollowIndexAction .Request (leaderIndexNameWithClusterAliasPrefix , followIndexName ,
229
- autoFollowPattern .getMaxBatchOperationCount (), autoFollowPattern .getMaxConcurrentReadBatches (),
230
- autoFollowPattern .getMaxOperationSizeInBytes (), autoFollowPattern .getMaxConcurrentWriteBatches (),
231
- autoFollowPattern .getMaxWriteBufferSize (), autoFollowPattern .getMaxRetryDelay (),
232
- autoFollowPattern .getIdleShardRetryDelay ());
233
-
234
- // Execute if the create and follow api call succeeds:
235
- Runnable successHandler = () -> {
236
- LOGGER .info ("Auto followed leader index [{}] as follow index [{}]" , leaderIndexName , followIndexName );
237
-
238
- // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
239
- // (so that we do not try to follow it in subsequent auto follow runs)
240
- Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (clusterAlias , indexToFollow );
241
- // The coordinator always runs on the elected master node, so we can update cluster state here:
242
- updateAutoFollowMetadata (function , updateError -> {
243
- if (updateError != null ) {
244
- LOGGER .error ("Failed to mark leader index [" + leaderIndexName + "] as auto followed" , updateError );
245
- if (leaderIndicesErrorHolder .compareAndSet (null , updateError ) == false ) {
246
- leaderIndicesErrorHolder .get ().addSuppressed (updateError );
247
- }
248
- } else {
249
- LOGGER .debug ("Successfully marked leader index [{}] as auto followed" , leaderIndexName );
250
- }
251
- if (leaderIndicesCountDown .countDown ()) {
252
- finalise (leaderIndicesErrorHolder .get ());
253
- }
254
- });
255
- };
256
- // Execute if the create and follow apu call fails:
257
- Consumer <Exception > failureHandler = followError -> {
258
- assert followError != null ;
259
- LOGGER .warn ("Failed to auto follow leader index [" + leaderIndexName + "]" , followError );
260
- if (leaderIndicesCountDown .countDown ()) {
261
- finalise (followError );
262
- }
263
- };
264
- createAndFollow (autoFollowPattern .getHeaders (), followRequest , successHandler , failureHandler );
265
- }
276
+ private void checkAutoFollowPattern (String clusterAlias , AutoFollowPattern autoFollowPattern ,
277
+ List <Index > leaderIndicesToFollow , Consumer <AutoFollowResult > resultHandler ) {
278
+
279
+ final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
280
+ final AtomicArray <Tuple <Index , Exception >> results = new AtomicArray <>(leaderIndicesToFollow .size ());
281
+ for (int i = 0 ; i < leaderIndicesToFollow .size (); i ++) {
282
+ final Index indexToFollow = leaderIndicesToFollow .get (i );
283
+ final int slot = i ;
284
+ followLeaderIndex (clusterAlias , indexToFollow , autoFollowPattern , error -> {
285
+ results .set (slot , new Tuple <>(indexToFollow , error ));
286
+ if (leaderIndicesCountDown .countDown ()) {
287
+ resultHandler .accept (new AutoFollowResult (clusterAlias , results .asList ()));
288
+ }
289
+ });
266
290
}
267
291
}
268
292
269
- private void finalise (Exception failure ) {
270
- if (autoFollowPatternsErrorHolder .compareAndSet (null , failure ) == false ) {
271
- autoFollowPatternsErrorHolder .get ().addSuppressed (failure );
272
- }
293
+ private void followLeaderIndex (String clusterAlias , Index indexToFollow ,
294
+ AutoFollowPattern pattern , Consumer <Exception > onResult ) {
295
+ final String leaderIndexName = indexToFollow .getName ();
296
+ final String followIndexName = getFollowerIndexName (pattern , leaderIndexName );
297
+
298
+ String leaderIndexNameWithClusterAliasPrefix = clusterAlias .equals ("_local_" ) ? leaderIndexName :
299
+ clusterAlias + ":" + leaderIndexName ;
300
+ FollowIndexAction .Request request =
301
+ new FollowIndexAction .Request (leaderIndexNameWithClusterAliasPrefix , followIndexName ,
302
+ pattern .getMaxBatchOperationCount (), pattern .getMaxConcurrentReadBatches (),
303
+ pattern .getMaxOperationSizeInBytes (), pattern .getMaxConcurrentWriteBatches (),
304
+ pattern .getMaxWriteBufferSize (), pattern .getMaxRetryDelay (),
305
+ pattern .getIdleShardRetryDelay ());
306
+
307
+ // Execute if the create and follow api call succeeds:
308
+ Runnable successHandler = () -> {
309
+ LOGGER .info ("Auto followed leader index [{}] as follow index [{}]" , leaderIndexName , followIndexName );
310
+
311
+ // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
312
+ // (so that we do not try to follow it in subsequent auto follow runs)
313
+ Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (clusterAlias , indexToFollow );
314
+ // The coordinator always runs on the elected master node, so we can update cluster state here:
315
+ updateAutoFollowMetadata (function , onResult );
316
+ };
317
+ createAndFollow (pattern .getHeaders (), request , successHandler , onResult );
318
+ }
273
319
320
+ private void finalise (int slot , AutoFollowResult result ) {
321
+ assert autoFollowResults .get (slot ) == null ;
322
+ autoFollowResults .set (slot , result );
274
323
if (autoFollowPatternsCountDown .countDown ()) {
275
- handler .accept (autoFollowPatternsErrorHolder . get ());
324
+ handler .accept (autoFollowResults . asList ());
276
325
}
277
326
}
278
327
@@ -347,4 +396,33 @@ abstract void updateAutoFollowMetadata(
347
396
);
348
397
349
398
}
399
+
400
+ static class AutoFollowResult {
401
+
402
+ final String clusterAlias ;
403
+ final Exception clusterStateFetchException ;
404
+ final Map <Index , Exception > autoFollowExecutionResults ;
405
+
406
+ AutoFollowResult (String clusterAlias , List <Tuple <Index , Exception >> results ) {
407
+ this .clusterAlias = clusterAlias ;
408
+
409
+ Map <Index , Exception > autoFollowExecutionResults = new HashMap <>();
410
+ for (Tuple <Index , Exception > result : results ) {
411
+ autoFollowExecutionResults .put (result .v1 (), result .v2 ());
412
+ }
413
+
414
+ this .clusterStateFetchException = null ;
415
+ this .autoFollowExecutionResults = Collections .unmodifiableMap (autoFollowExecutionResults );
416
+ }
417
+
418
+ AutoFollowResult (String clusterAlias , Exception e ) {
419
+ this .clusterAlias = clusterAlias ;
420
+ this .clusterStateFetchException = e ;
421
+ this .autoFollowExecutionResults = Collections .emptyMap ();
422
+ }
423
+
424
+ AutoFollowResult (String clusterAlias ) {
425
+ this (clusterAlias , (Exception ) null );
426
+ }
427
+ }
350
428
}
0 commit comments