diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java index a646dfeeabc..1e5323cdc9d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -344,6 +344,8 @@ protected class LeaderSelector implements Callable { private volatile boolean locked = false; + private volatile boolean yielding = false; + LeaderSelector(String lockKey) { this.lock = LockRegistryLeaderInitiator.this.locks.obtain(lockKey); this.lockKey = lockKey; @@ -353,11 +355,22 @@ protected class LeaderSelector implements Callable { public Void call() { try { while (isRunning()) { + if (Thread.currentThread().isInterrupted()) { + // No need to try to lock in the interrupted thread, and we might not be able to unlock + restartSelectorBecauseOfError(new InterruptedException()); + return null; + } + if (this.yielding) { + this.yielding = false; + // When yielding, we have to unlock and continue after busyWaitMillis to elect + unlockAndHandleException(null); + continue; + } try { tryAcquireLock(); } catch (Exception e) { - if (handleLockException(e)) { + if (unlockAndHandleException(e)) { return null; } } @@ -373,7 +386,7 @@ public Void call() { LOGGER.debug("Could not unlock during stop for " + this.context + " - treat as broken. Revoking...", e); } - // We are stopping, therefore not leading any more + // We are stopping, therefore not leading anymore handleRevoked(); } } @@ -385,8 +398,8 @@ private void tryAcquireLock() throws InterruptedException { LOGGER.debug("Acquiring the lock for " + this.context); } // We always try to acquire the lock, in case it expired - boolean acquired = this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, - TimeUnit.MILLISECONDS); + boolean acquired = + this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS); if (!this.locked) { if (acquired) { // Success: we are now leader @@ -398,8 +411,7 @@ else if (isPublishFailedEvents()) { } } else if (acquired) { - // If we were able to acquire it but we were already locked we - // should release it + // If we were able to acquire it, but we were already locked, we should release it this.lock.unlock(); if (isRunning()) { // Give it a chance to expire. @@ -408,7 +420,7 @@ else if (acquired) { } else { this.locked = false; - // We were not able to acquire it, therefore not leading any more + // We were not able to acquire it, therefore not leading anymore handleRevoked(); if (isRunning()) { // Try again quickly in case the lock holder dropped it @@ -417,7 +429,7 @@ else if (acquired) { } } - private boolean handleLockException(Exception ex) { // NOSONAR + private boolean unlockAndHandleException(Exception ex) { // NOSONAR if (this.locked) { this.locked = false; try { @@ -446,7 +458,7 @@ private boolean handleLockException(Exception ex) { // NOSONAR Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis); } catch (InterruptedException e1) { - // Ignore interruption and let it to be caught on the next cycle. + // Ignore interruption and let it be caught on the next cycle. Thread.currentThread().interrupt(); } } @@ -534,9 +546,7 @@ public void yield() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Yielding leadership from " + this); } - if (LockRegistryLeaderInitiator.this.future != null) { - LockRegistryLeaderInitiator.this.future.cancel(true); - } + LockRegistryLeaderInitiator.this.leaderSelector.yielding = true; } @Override