Skip to content

Commit 2adf334

Browse files
artembilangaryrussell
authored andcommitted
Some LockRegistryLeaderInitiator improvements (#8570)
* Some `LockRegistryLeaderInitiator` improvements It is better to not go to the target lock provider at all if the current thread is already interrupted. * Check for the `Thread.currentThread().isInterrupted()` in the `while` loop and `restartSelectorBecauseOfError()` immediately without checking for a lock * Fix some other simple typos in the `LockRegistryLeaderInitiator` **Cherry-pick to `6.0.x`** * * Introduce a `LeaderSelector.yielding` flag to revoke leader smoothly. Turns out just canceling the `Future` may lead to a broken lock where we cannot unlock it because the target lock repository may not work with interrupted threads. This way a new leader must wait until the lock is expired in the store
1 parent 5c96c0d commit 2adf334

File tree

1 file changed

+23
-13
lines changed

1 file changed

+23
-13
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

+23-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -344,6 +344,8 @@ protected class LeaderSelector implements Callable<Void> {
344344

345345
private volatile boolean locked = false;
346346

347+
private volatile boolean yielding = false;
348+
347349
LeaderSelector(String lockKey) {
348350
this.lock = LockRegistryLeaderInitiator.this.locks.obtain(lockKey);
349351
this.lockKey = lockKey;
@@ -353,11 +355,22 @@ protected class LeaderSelector implements Callable<Void> {
353355
public Void call() {
354356
try {
355357
while (isRunning()) {
358+
if (Thread.currentThread().isInterrupted()) {
359+
// No need to try to lock in the interrupted thread, and we might not be able to unlock
360+
restartSelectorBecauseOfError(new InterruptedException());
361+
return null;
362+
}
363+
if (this.yielding) {
364+
this.yielding = false;
365+
// When yielding, we have to unlock and continue after busyWaitMillis to elect
366+
unlockAndHandleException(null);
367+
continue;
368+
}
356369
try {
357370
tryAcquireLock();
358371
}
359372
catch (Exception e) {
360-
if (handleLockException(e)) {
373+
if (unlockAndHandleException(e)) {
361374
return null;
362375
}
363376
}
@@ -373,7 +386,7 @@ public Void call() {
373386
LOGGER.debug("Could not unlock during stop for " + this.context
374387
+ " - treat as broken. Revoking...", e);
375388
}
376-
// We are stopping, therefore not leading any more
389+
// We are stopping, therefore not leading anymore
377390
handleRevoked();
378391
}
379392
}
@@ -385,8 +398,8 @@ private void tryAcquireLock() throws InterruptedException {
385398
LOGGER.debug("Acquiring the lock for " + this.context);
386399
}
387400
// We always try to acquire the lock, in case it expired
388-
boolean acquired = this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis,
389-
TimeUnit.MILLISECONDS);
401+
boolean acquired =
402+
this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS);
390403
if (!this.locked) {
391404
if (acquired) {
392405
// Success: we are now leader
@@ -398,8 +411,7 @@ else if (isPublishFailedEvents()) {
398411
}
399412
}
400413
else if (acquired) {
401-
// If we were able to acquire it but we were already locked we
402-
// should release it
414+
// If we were able to acquire it, but we were already locked, we should release it
403415
this.lock.unlock();
404416
if (isRunning()) {
405417
// Give it a chance to expire.
@@ -408,7 +420,7 @@ else if (acquired) {
408420
}
409421
else {
410422
this.locked = false;
411-
// We were not able to acquire it, therefore not leading any more
423+
// We were not able to acquire it, therefore not leading anymore
412424
handleRevoked();
413425
if (isRunning()) {
414426
// Try again quickly in case the lock holder dropped it
@@ -417,7 +429,7 @@ else if (acquired) {
417429
}
418430
}
419431

420-
private boolean handleLockException(Exception ex) { // NOSONAR
432+
private boolean unlockAndHandleException(Exception ex) { // NOSONAR
421433
if (this.locked) {
422434
this.locked = false;
423435
try {
@@ -446,7 +458,7 @@ private boolean handleLockException(Exception ex) { // NOSONAR
446458
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
447459
}
448460
catch (InterruptedException e1) {
449-
// Ignore interruption and let it to be caught on the next cycle.
461+
// Ignore interruption and let it be caught on the next cycle.
450462
Thread.currentThread().interrupt();
451463
}
452464
}
@@ -534,9 +546,7 @@ public void yield() {
534546
if (LOGGER.isDebugEnabled()) {
535547
LOGGER.debug("Yielding leadership from " + this);
536548
}
537-
if (LockRegistryLeaderInitiator.this.future != null) {
538-
LockRegistryLeaderInitiator.this.future.cancel(true);
539-
}
549+
LockRegistryLeaderInitiator.this.leaderSelector.yielding = true;
540550
}
541551

542552
@Override

0 commit comments

Comments
 (0)