47
47
import java .util .function .BiConsumer ;
48
48
import java .util .function .Consumer ;
49
49
import java .util .function .Function ;
50
+ import java .util .stream .Collectors ;
50
51
51
52
/**
52
53
* A component that runs only on the elected master node and follows leader indices automatically
@@ -105,19 +106,19 @@ public synchronized AutoFollowStats getStats() {
105
106
synchronized void updateStats (List <AutoFollowResult > results ) {
106
107
for (AutoFollowResult result : results ) {
107
108
if (result .clusterStateFetchException != null ) {
108
- recentAutoFollowErrors .put (result .clusterAlias ,
109
+ recentAutoFollowErrors .put (result .autoFollowPatternName ,
109
110
new ElasticsearchException (result .clusterStateFetchException ));
110
111
numberOfFailedRemoteClusterStateRequests ++;
111
- LOGGER .warn (new ParameterizedMessage ("failure occurred while fetching cluster state in leader cluster [{}]" ,
112
- result .clusterAlias ), result .clusterStateFetchException );
112
+ LOGGER .warn (new ParameterizedMessage ("failure occurred while fetching cluster state for auto follow pattern [{}]" ,
113
+ result .autoFollowPatternName ), result .clusterStateFetchException );
113
114
} else {
114
115
for (Map .Entry <Index , Exception > entry : result .autoFollowExecutionResults .entrySet ()) {
115
116
if (entry .getValue () != null ) {
116
117
numberOfFailedIndicesAutoFollowed ++;
117
- recentAutoFollowErrors .put (result .clusterAlias + ":" + entry .getKey ().getName (),
118
+ recentAutoFollowErrors .put (result .autoFollowPatternName + ":" + entry .getKey ().getName (),
118
119
ExceptionsHelper .convertToElastic (entry .getValue ()));
119
- LOGGER .warn (new ParameterizedMessage ("failure occurred while auto following index [{}] in leader cluster [{}]" ,
120
- entry .getKey (), result .clusterAlias ), entry .getValue ());
120
+ LOGGER .warn (new ParameterizedMessage ("failure occurred while auto following index [{}] for auto follow " +
121
+ "pattern [{}]" , entry .getKey (), result .autoFollowPatternName ), entry .getValue ());
121
122
} else {
122
123
numberOfSuccessfulIndicesAutoFollowed ++;
123
124
}
@@ -243,51 +244,77 @@ void autoFollowIndices() {
243
244
int i = 0 ;
244
245
for (Map .Entry <String , AutoFollowPattern > entry : autoFollowMetadata .getPatterns ().entrySet ()) {
245
246
final int slot = i ;
246
- final String clusterAlias = entry .getKey ();
247
+ final String autoFollowPattenName = entry .getKey ();
247
248
final AutoFollowPattern autoFollowPattern = entry .getValue ();
249
+ final String leaderCluster = autoFollowPattern .getLeaderCluster ();
248
250
249
- Map <String , String > headers = autoFollowMetadata .getHeaders ().get (clusterAlias );
250
- getLeaderClusterState (headers , clusterAlias , (leaderClusterState , e ) -> {
251
+ Map <String , String > headers = autoFollowMetadata .getHeaders ().get (autoFollowPattenName );
252
+ getLeaderClusterState (headers , leaderCluster , (leaderClusterState , e ) -> {
251
253
if (leaderClusterState != null ) {
252
254
assert e == null ;
253
- final List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (clusterAlias );
254
- final List <Index > leaderIndicesToFollow = getLeaderIndicesToFollow (clusterAlias , autoFollowPattern ,
255
+ final List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (autoFollowPattenName );
256
+ final List <Index > leaderIndicesToFollow = getLeaderIndicesToFollow (leaderCluster , autoFollowPattern ,
255
257
leaderClusterState , followerClusterState , followedIndices );
256
258
if (leaderIndicesToFollow .isEmpty ()) {
257
- finalise (slot , new AutoFollowResult (clusterAlias ));
259
+ finalise (slot , new AutoFollowResult (autoFollowPattenName ));
258
260
} else {
261
+ List <Tuple <String , AutoFollowPattern >> patternsForTheSameLeaderCluster = autoFollowMetadata .getPatterns ()
262
+ .entrySet ().stream ()
263
+ .filter (item -> autoFollowPattenName .equals (item .getKey ()) == false )
264
+ .filter (item -> leaderCluster .equals (item .getValue ().getLeaderCluster ()))
265
+ .map (item -> new Tuple <>(item .getKey (), item .getValue ()))
266
+ .collect (Collectors .toList ());
267
+
259
268
Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
260
- checkAutoFollowPattern (clusterAlias , autoFollowPattern , leaderIndicesToFollow , headers , resultHandler );
269
+ checkAutoFollowPattern (autoFollowPattenName , leaderCluster , autoFollowPattern , leaderIndicesToFollow , headers ,
270
+ patternsForTheSameLeaderCluster , resultHandler );
261
271
}
262
272
} else {
263
- finalise (slot , new AutoFollowResult (clusterAlias , e ));
273
+ finalise (slot , new AutoFollowResult (autoFollowPattenName , e ));
264
274
}
265
275
});
266
276
i ++;
267
277
}
268
278
}
269
279
270
- private void checkAutoFollowPattern (String clusterAlias ,
280
+ private void checkAutoFollowPattern (String autoFollowPattenName ,
281
+ String clusterAlias ,
271
282
AutoFollowPattern autoFollowPattern ,
272
283
List <Index > leaderIndicesToFollow ,
273
284
Map <String , String > headers ,
285
+ List <Tuple <String , AutoFollowPattern >> patternsForTheSameLeaderCluster ,
274
286
Consumer <AutoFollowResult > resultHandler ) {
275
287
276
288
final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
277
289
final AtomicArray <Tuple <Index , Exception >> results = new AtomicArray <>(leaderIndicesToFollow .size ());
278
290
for (int i = 0 ; i < leaderIndicesToFollow .size (); i ++) {
279
291
final Index indexToFollow = leaderIndicesToFollow .get (i );
280
292
final int slot = i ;
281
- followLeaderIndex (clusterAlias , indexToFollow , autoFollowPattern , headers , error -> {
282
- results .set (slot , new Tuple <>(indexToFollow , error ));
293
+
294
+ List <String > otherMatchingPatterns = patternsForTheSameLeaderCluster .stream ()
295
+ .filter (otherPattern -> otherPattern .v2 ().match (indexToFollow .getName ()))
296
+ .map (Tuple ::v1 )
297
+ .collect (Collectors .toList ());
298
+ if (otherMatchingPatterns .size () != 0 ) {
299
+ results .set (slot , new Tuple <>(indexToFollow , new ElasticsearchException ("index to follow [" + indexToFollow .getName () +
300
+ "] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + "" )));
283
301
if (leaderIndicesCountDown .countDown ()) {
284
- resultHandler .accept (new AutoFollowResult (clusterAlias , results .asList ()));
302
+ resultHandler .accept (new AutoFollowResult (autoFollowPattenName , results .asList ()));
285
303
}
286
- });
304
+ } else {
305
+ followLeaderIndex (autoFollowPattenName , clusterAlias , indexToFollow , autoFollowPattern , headers , error -> {
306
+ results .set (slot , new Tuple <>(indexToFollow , error ));
307
+ if (leaderIndicesCountDown .countDown ()) {
308
+ resultHandler .accept (new AutoFollowResult (autoFollowPattenName , results .asList ()));
309
+ }
310
+ });
311
+ }
312
+
287
313
}
288
314
}
289
315
290
- private void followLeaderIndex (String clusterAlias ,
316
+ private void followLeaderIndex (String autoFollowPattenName ,
317
+ String clusterAlias ,
291
318
Index indexToFollow ,
292
319
AutoFollowPattern pattern ,
293
320
Map <String ,String > headers ,
@@ -313,7 +340,7 @@ private void followLeaderIndex(String clusterAlias,
313
340
314
341
// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
315
342
// (so that we do not try to follow it in subsequent auto follow runs)
316
- Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (clusterAlias , indexToFollow );
343
+ Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (autoFollowPattenName , indexToFollow );
317
344
// The coordinator always runs on the elected master node, so we can update cluster state here:
318
345
updateAutoFollowMetadata (function , onResult );
319
346
};
@@ -356,12 +383,12 @@ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String l
356
383
}
357
384
}
358
385
359
- static Function <ClusterState , ClusterState > recordLeaderIndexAsFollowFunction (String clusterAlias ,
386
+ static Function <ClusterState , ClusterState > recordLeaderIndexAsFollowFunction (String name ,
360
387
Index indexToFollow ) {
361
388
return currentState -> {
362
389
AutoFollowMetadata currentAutoFollowMetadata = currentState .metaData ().custom (AutoFollowMetadata .TYPE );
363
390
Map <String , List <String >> newFollowedIndexUUIDS = new HashMap <>(currentAutoFollowMetadata .getFollowedLeaderIndexUUIDs ());
364
- newFollowedIndexUUIDS .compute (clusterAlias , (key , existingUUIDs ) -> {
391
+ newFollowedIndexUUIDS .compute (name , (key , existingUUIDs ) -> {
365
392
assert existingUUIDs != null ;
366
393
List <String > newUUIDs = new ArrayList <>(existingUUIDs );
367
394
newUUIDs .add (indexToFollow .getUUID ());
@@ -405,12 +432,12 @@ abstract void updateAutoFollowMetadata(
405
432
406
433
static class AutoFollowResult {
407
434
408
- final String clusterAlias ;
435
+ final String autoFollowPatternName ;
409
436
final Exception clusterStateFetchException ;
410
437
final Map <Index , Exception > autoFollowExecutionResults ;
411
438
412
- AutoFollowResult (String clusterAlias , List <Tuple <Index , Exception >> results ) {
413
- this .clusterAlias = clusterAlias ;
439
+ AutoFollowResult (String autoFollowPatternName , List <Tuple <Index , Exception >> results ) {
440
+ this .autoFollowPatternName = autoFollowPatternName ;
414
441
415
442
Map <Index , Exception > autoFollowExecutionResults = new HashMap <>();
416
443
for (Tuple <Index , Exception > result : results ) {
@@ -421,14 +448,14 @@ static class AutoFollowResult {
421
448
this .autoFollowExecutionResults = Collections .unmodifiableMap (autoFollowExecutionResults );
422
449
}
423
450
424
- AutoFollowResult (String clusterAlias , Exception e ) {
425
- this .clusterAlias = clusterAlias ;
451
+ AutoFollowResult (String autoFollowPatternName , Exception e ) {
452
+ this .autoFollowPatternName = autoFollowPatternName ;
426
453
this .clusterStateFetchException = e ;
427
454
this .autoFollowExecutionResults = Collections .emptyMap ();
428
455
}
429
456
430
- AutoFollowResult (String clusterAlias ) {
431
- this (clusterAlias , (Exception ) null );
457
+ AutoFollowResult (String autoFollowPatternName ) {
458
+ this (autoFollowPatternName , (Exception ) null );
432
459
}
433
460
}
434
461
}
0 commit comments