Skip to content

Commit 7681200

Browse files
committed
Introduce SimpleAsyncTaskScheduler (extending SimpleAsyncTaskExecutor)
Closes gh-30956
1 parent 5e4ed68 commit 7681200

File tree

9 files changed

+493
-106
lines changed

9 files changed

+493
-106
lines changed

spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java

+13-64
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.concurrent.ThreadFactory;
2424
import java.util.concurrent.ThreadPoolExecutor;
2525
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.locks.Condition;
27-
import java.util.concurrent.locks.ReentrantLock;
2826

2927
import org.apache.commons.logging.Log;
3028
import org.apache.commons.logging.LogFactory;
@@ -85,16 +83,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
8583
@Nullable
8684
private ExecutorService executor;
8785

88-
private final ReentrantLock pauseLock = new ReentrantLock();
89-
90-
private final Condition unpaused = this.pauseLock.newCondition();
91-
92-
private volatile boolean paused;
93-
94-
private int executingTaskCount = 0;
95-
9686
@Nullable
97-
private Runnable stopCallback;
87+
private ExecutorLifecycleDelegate lifecycleDelegate;
9888

9989

10090
/**
@@ -258,6 +248,7 @@ public void initialize() {
258248
setThreadNamePrefix(this.beanName + "-");
259249
}
260250
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
251+
this.lifecycleDelegate = new ExecutorLifecycleDelegate(this.executor);
261252
}
262253

263254
/**
@@ -372,13 +363,8 @@ private void awaitTerminationIfNecessary(ExecutorService executor) {
372363
*/
373364
@Override
374365
public void start() {
375-
this.pauseLock.lock();
376-
try {
377-
this.paused = false;
378-
this.unpaused.signalAll();
379-
}
380-
finally {
381-
this.pauseLock.unlock();
366+
if (this.lifecycleDelegate != null) {
367+
this.lifecycleDelegate.start();
382368
}
383369
}
384370

@@ -388,13 +374,8 @@ public void start() {
388374
*/
389375
@Override
390376
public void stop() {
391-
this.pauseLock.lock();
392-
try {
393-
this.paused = true;
394-
this.stopCallback = null;
395-
}
396-
finally {
397-
this.pauseLock.unlock();
377+
if (this.lifecycleDelegate != null) {
378+
this.lifecycleDelegate.stop();
398379
}
399380
}
400381

@@ -405,19 +386,8 @@ public void stop() {
405386
*/
406387
@Override
407388
public void stop(Runnable callback) {
408-
this.pauseLock.lock();
409-
try {
410-
this.paused = true;
411-
if (this.executingTaskCount == 0) {
412-
this.stopCallback = null;
413-
callback.run();
414-
}
415-
else {
416-
this.stopCallback = callback;
417-
}
418-
}
419-
finally {
420-
this.pauseLock.unlock();
389+
if (this.lifecycleDelegate != null) {
390+
this.lifecycleDelegate.stop(callback);
421391
}
422392
}
423393

@@ -429,7 +399,7 @@ public void stop(Runnable callback) {
429399
*/
430400
@Override
431401
public boolean isRunning() {
432-
return (this.executor != null && !this.executor.isShutdown() & !this.paused);
402+
return (this.lifecycleDelegate != null && this.lifecycleDelegate.isRunning());
433403
}
434404

435405
/**
@@ -442,18 +412,8 @@ public boolean isRunning() {
442412
* @see ThreadPoolExecutor#beforeExecute(Thread, Runnable)
443413
*/
444414
protected void beforeExecute(Thread thread, Runnable task) {
445-
this.pauseLock.lock();
446-
try {
447-
while (this.paused && this.executor != null && !this.executor.isShutdown()) {
448-
this.unpaused.await();
449-
}
450-
}
451-
catch (InterruptedException ex) {
452-
thread.interrupt();
453-
}
454-
finally {
455-
this.executingTaskCount++;
456-
this.pauseLock.unlock();
415+
if (this.lifecycleDelegate != null) {
416+
this.lifecycleDelegate.beforeExecute(thread);
457417
}
458418
}
459419

@@ -467,19 +427,8 @@ protected void beforeExecute(Thread thread, Runnable task) {
467427
* @see ThreadPoolExecutor#afterExecute(Runnable, Throwable)
468428
*/
469429
protected void afterExecute(Runnable task, @Nullable Throwable ex) {
470-
this.pauseLock.lock();
471-
try {
472-
this.executingTaskCount--;
473-
if (this.executingTaskCount == 0) {
474-
Runnable callback = this.stopCallback;
475-
if (callback != null) {
476-
callback.run();
477-
this.stopCallback = null;
478-
}
479-
}
480-
}
481-
finally {
482-
this.pauseLock.unlock();
430+
if (this.lifecycleDelegate != null) {
431+
this.lifecycleDelegate.afterExecute();
483432
}
484433
}
485434

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.scheduling.concurrent;
18+
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.locks.Condition;
21+
import java.util.concurrent.locks.ReentrantLock;
22+
23+
import org.springframework.context.SmartLifecycle;
24+
import org.springframework.lang.Nullable;
25+
26+
/**
27+
* An internal delegate for common {@link ExecutorService} lifecycle management
28+
* with pause/resume support.
29+
*
30+
* @author Juergen Hoeller
31+
* @since 6.1
32+
* @see ExecutorConfigurationSupport
33+
* @see SimpleAsyncTaskScheduler
34+
*/
35+
final class ExecutorLifecycleDelegate implements SmartLifecycle {
36+
37+
private final ExecutorService executor;
38+
39+
private final ReentrantLock pauseLock = new ReentrantLock();
40+
41+
private final Condition unpaused = this.pauseLock.newCondition();
42+
43+
private volatile boolean paused;
44+
45+
private int executingTaskCount = 0;
46+
47+
@Nullable
48+
private Runnable stopCallback;
49+
50+
51+
public ExecutorLifecycleDelegate(ExecutorService executor) {
52+
this.executor = executor;
53+
}
54+
55+
56+
@Override
57+
public void start() {
58+
this.pauseLock.lock();
59+
try {
60+
this.paused = false;
61+
this.unpaused.signalAll();
62+
}
63+
finally {
64+
this.pauseLock.unlock();
65+
}
66+
}
67+
68+
@Override
69+
public void stop() {
70+
this.pauseLock.lock();
71+
try {
72+
this.paused = true;
73+
this.stopCallback = null;
74+
}
75+
finally {
76+
this.pauseLock.unlock();
77+
}
78+
}
79+
80+
@Override
81+
public void stop(Runnable callback) {
82+
this.pauseLock.lock();
83+
try {
84+
this.paused = true;
85+
if (this.executingTaskCount == 0) {
86+
this.stopCallback = null;
87+
callback.run();
88+
}
89+
else {
90+
this.stopCallback = callback;
91+
}
92+
}
93+
finally {
94+
this.pauseLock.unlock();
95+
}
96+
}
97+
98+
@Override
99+
public boolean isRunning() {
100+
return (!this.executor.isShutdown() & !this.paused);
101+
}
102+
103+
void beforeExecute(Thread thread) {
104+
this.pauseLock.lock();
105+
try {
106+
while (this.paused && !this.executor.isShutdown()) {
107+
this.unpaused.await();
108+
}
109+
}
110+
catch (InterruptedException ex) {
111+
thread.interrupt();
112+
}
113+
finally {
114+
this.executingTaskCount++;
115+
this.pauseLock.unlock();
116+
}
117+
}
118+
119+
void afterExecute() {
120+
this.pauseLock.lock();
121+
try {
122+
this.executingTaskCount--;
123+
if (this.executingTaskCount == 0) {
124+
Runnable callback = this.stopCallback;
125+
if (callback != null) {
126+
callback.run();
127+
this.stopCallback = null;
128+
}
129+
}
130+
}
131+
finally {
132+
this.pauseLock.unlock();
133+
}
134+
}
135+
136+
}

0 commit comments

Comments
 (0)