55
55
import java .util .Objects ;
56
56
import java .util .Set ;
57
57
import java .util .TreeMap ;
58
+ import java .util .concurrent .Executor ;
58
59
import java .util .function .BiConsumer ;
59
60
import java .util .function .Consumer ;
60
61
import java .util .function .Function ;
@@ -78,6 +79,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
78
79
private final CcrLicenseChecker ccrLicenseChecker ;
79
80
private final LongSupplier relativeMillisTimeProvider ;
80
81
private final LongSupplier absoluteMillisTimeProvider ;
82
+ private final Executor executor ;
81
83
82
84
private volatile TimeValue waitForMetadataTimeOut ;
83
85
private volatile Map <String , AutoFollower > autoFollowers = Collections .emptyMap ();
@@ -89,18 +91,20 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
89
91
private final LinkedHashMap <String , Tuple <Long , ElasticsearchException >> recentAutoFollowErrors ;
90
92
91
93
public AutoFollowCoordinator (
92
- Settings settings ,
93
- Client client ,
94
- ClusterService clusterService ,
95
- CcrLicenseChecker ccrLicenseChecker ,
96
- LongSupplier relativeMillisTimeProvider ,
97
- LongSupplier absoluteMillisTimeProvider ) {
94
+ final Settings settings ,
95
+ final Client client ,
96
+ final ClusterService clusterService ,
97
+ final CcrLicenseChecker ccrLicenseChecker ,
98
+ final LongSupplier relativeMillisTimeProvider ,
99
+ final LongSupplier absoluteMillisTimeProvider ,
100
+ final Executor executor ) {
98
101
99
102
this .client = client ;
100
103
this .clusterService = clusterService ;
101
104
this .ccrLicenseChecker = Objects .requireNonNull (ccrLicenseChecker , "ccrLicenseChecker" );
102
105
this .relativeMillisTimeProvider = relativeMillisTimeProvider ;
103
106
this .absoluteMillisTimeProvider = absoluteMillisTimeProvider ;
107
+ this .executor = Objects .requireNonNull (executor );
104
108
this .recentAutoFollowErrors = new LinkedHashMap <String , Tuple <Long , ElasticsearchException >>() {
105
109
@ Override
106
110
protected boolean removeEldestEntry (final Map .Entry <String , Tuple <Long , ElasticsearchException >> eldest ) {
@@ -210,7 +214,7 @@ void updateAutoFollowers(ClusterState followerClusterState) {
210
214
Map <String , AutoFollower > newAutoFollowers = new HashMap <>(newRemoteClusters .size ());
211
215
for (String remoteCluster : newRemoteClusters ) {
212
216
AutoFollower autoFollower =
213
- new AutoFollower (remoteCluster , this ::updateStats , clusterService ::state , relativeMillisTimeProvider ) {
217
+ new AutoFollower (remoteCluster , this ::updateStats , clusterService ::state , relativeMillisTimeProvider , executor ) {
214
218
215
219
@ Override
216
220
void getRemoteClusterState (final String remoteCluster ,
@@ -332,6 +336,7 @@ abstract static class AutoFollower {
332
336
private final Consumer <List <AutoFollowResult >> statsUpdater ;
333
337
private final Supplier <ClusterState > followerClusterStateSupplier ;
334
338
private final LongSupplier relativeTimeProvider ;
339
+ private final Executor executor ;
335
340
336
341
private volatile long lastAutoFollowTimeInMillis = -1 ;
337
342
private volatile long metadataVersion = 0 ;
@@ -344,11 +349,13 @@ abstract static class AutoFollower {
344
349
AutoFollower (final String remoteCluster ,
345
350
final Consumer <List <AutoFollowResult >> statsUpdater ,
346
351
final Supplier <ClusterState > followerClusterStateSupplier ,
347
- LongSupplier relativeTimeProvider ) {
352
+ final LongSupplier relativeTimeProvider ,
353
+ final Executor executor ) {
348
354
this .remoteCluster = remoteCluster ;
349
355
this .statsUpdater = statsUpdater ;
350
356
this .followerClusterStateSupplier = followerClusterStateSupplier ;
351
357
this .relativeTimeProvider = relativeTimeProvider ;
358
+ this .executor = Objects .requireNonNull (executor );
352
359
}
353
360
354
361
void start () {
@@ -387,6 +394,7 @@ void start() {
387
394
this .autoFollowPatternsCountDown = new CountDown (patterns .size ());
388
395
this .autoFollowResults = new AtomicArray <>(patterns .size ());
389
396
397
+ final Thread thread = Thread .currentThread ();
390
398
getRemoteClusterState (remoteCluster , metadataVersion + 1 , (remoteClusterStateResponse , remoteError ) -> {
391
399
// Also check removed flag here, as it may take a while for this remote cluster state api call to return:
392
400
if (removed ) {
@@ -403,7 +411,7 @@ void start() {
403
411
}
404
412
ClusterState remoteClusterState = remoteClusterStateResponse .getState ();
405
413
metadataVersion = remoteClusterState .metaData ().version ();
406
- autoFollowIndices (autoFollowMetadata , clusterState , remoteClusterState , patterns );
414
+ autoFollowIndices (autoFollowMetadata , clusterState , remoteClusterState , patterns , thread );
407
415
} else {
408
416
assert remoteError != null ;
409
417
if (remoteError instanceof NoSuchRemoteClusterException ) {
@@ -414,7 +422,7 @@ void start() {
414
422
415
423
for (int i = 0 ; i < patterns .size (); i ++) {
416
424
String autoFollowPatternName = patterns .get (i );
417
- finalise (i , new AutoFollowResult (autoFollowPatternName , remoteError ));
425
+ finalise (i , new AutoFollowResult (autoFollowPatternName , remoteError ), thread );
418
426
}
419
427
}
420
428
});
@@ -428,7 +436,8 @@ void stop() {
428
436
private void autoFollowIndices (final AutoFollowMetadata autoFollowMetadata ,
429
437
final ClusterState clusterState ,
430
438
final ClusterState remoteClusterState ,
431
- final List <String > patterns ) {
439
+ final List <String > patterns ,
440
+ final Thread thread ) {
432
441
int i = 0 ;
433
442
for (String autoFollowPatternName : patterns ) {
434
443
final int slot = i ;
@@ -439,7 +448,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
439
448
final List <Index > leaderIndicesToFollow =
440
449
getLeaderIndicesToFollow (autoFollowPattern , remoteClusterState , followedIndices );
441
450
if (leaderIndicesToFollow .isEmpty ()) {
442
- finalise (slot , new AutoFollowResult (autoFollowPatternName ));
451
+ finalise (slot , new AutoFollowResult (autoFollowPatternName ), thread );
443
452
} else {
444
453
List <Tuple <String , AutoFollowPattern >> patternsForTheSameRemoteCluster = autoFollowMetadata .getPatterns ()
445
454
.entrySet ().stream ()
@@ -448,7 +457,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
448
457
.map (item -> new Tuple <>(item .getKey (), item .getValue ()))
449
458
.collect (Collectors .toList ());
450
459
451
- Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
460
+ Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result , thread );
452
461
checkAutoFollowPattern (autoFollowPatternName , remoteCluster , autoFollowPattern , leaderIndicesToFollow , headers ,
453
462
patternsForTheSameRemoteCluster , remoteClusterState .metaData (), clusterState .metaData (), resultHandler );
454
463
}
@@ -561,11 +570,23 @@ private void followLeaderIndex(String autoFollowPattenName,
561
570
createAndFollow (headers , request , successHandler , onResult );
562
571
}
563
572
564
- private void finalise (int slot , AutoFollowResult result ) {
573
+ private void finalise (int slot , AutoFollowResult result , final Thread thread ) {
565
574
assert autoFollowResults .get (slot ) == null ;
566
575
autoFollowResults .set (slot , result );
567
576
if (autoFollowPatternsCountDown .countDown ()) {
568
577
statsUpdater .accept (autoFollowResults .asList ());
578
+ /*
579
+ * In the face of a failure, we could be called back on the same thread. That is, it could be that we
580
+ * never fired off the asynchronous remote cluster state call, instead failing beforehand. In this case,
581
+ * we will recurse on the same thread. If there are repeated failures, we could blow the stack and
582
+ * overflow. A real-world scenario in which this can occur is if the local connect queue is full. To
583
+ * avoid this, if we are called back on the same thread, then we truncate the stack by forking to
584
+ * another thread.
585
+ */
586
+ if (thread == Thread .currentThread ()) {
587
+ executor .execute (this ::start );
588
+ return ;
589
+ }
569
590
start ();
570
591
}
571
592
}
0 commit comments