3
3
import io .fabric8 .kubernetes .client .CustomResource ;
4
4
import io .javaoperatorsdk .operator .processing .KubernetesResourceUtils ;
5
5
import io .javaoperatorsdk .operator .processing .event .AbstractEventSource ;
6
+ import java .io .IOException ;
6
7
import java .util .Map ;
7
8
import java .util .Timer ;
8
9
import java .util .TimerTask ;
9
10
import java .util .concurrent .ConcurrentHashMap ;
11
+ import java .util .concurrent .atomic .AtomicBoolean ;
10
12
import org .slf4j .Logger ;
11
13
import org .slf4j .LoggerFactory ;
12
14
13
15
public class TimerEventSource extends AbstractEventSource {
14
16
15
- private Logger log = LoggerFactory .getLogger (TimerEventSource .class );
16
-
17
17
private final Timer timer = new Timer ();
18
-
18
+ private final AtomicBoolean running = new AtomicBoolean ();
19
19
private final Map <String , EventProducerTimeTask > onceTasks = new ConcurrentHashMap <>();
20
20
private final Map <String , EventProducerTimeTask > timerTasks = new ConcurrentHashMap <>();
21
+ private Logger log = LoggerFactory .getLogger (TimerEventSource .class );
21
22
22
23
public void schedule (CustomResource customResource , long delay , long period ) {
24
+ if (!running .get ()) {
25
+ throw new IllegalStateException ("The TimerEventSource is not running" );
26
+ }
27
+
23
28
String resourceUid = KubernetesResourceUtils .getUID (customResource );
24
29
if (timerTasks .containsKey (resourceUid )) {
25
30
return ;
@@ -30,6 +35,10 @@ public void schedule(CustomResource customResource, long delay, long period) {
30
35
}
31
36
32
37
public void scheduleOnce (CustomResource customResource , long delay ) {
38
+ if (!running .get ()) {
39
+ throw new IllegalStateException ("The TimerEventSource is not running" );
40
+ }
41
+
33
42
String resourceUid = KubernetesResourceUtils .getUID (customResource );
34
43
if (onceTasks .containsKey (resourceUid )) {
35
44
cancelOnceSchedule (resourceUid );
@@ -59,6 +68,19 @@ public void cancelOnceSchedule(String customResourceUid) {
59
68
}
60
69
}
61
70
71
+ @ Override
72
+ public void start () {
73
+ running .set (true );
74
+ }
75
+
76
+ @ Override
77
+ public void close () throws IOException {
78
+ running .set (false );
79
+ onceTasks .keySet ().forEach (this ::cancelOnceSchedule );
80
+ timerTasks .keySet ().forEach (this ::cancelSchedule );
81
+ timer .cancel ();
82
+ }
83
+
62
84
public class EventProducerTimeTask extends TimerTask {
63
85
64
86
protected final String customResourceUid ;
@@ -69,8 +91,10 @@ public EventProducerTimeTask(String customResourceUid) {
69
91
70
92
@ Override
71
93
public void run () {
72
- log .debug ("Producing event for custom resource id: {}" , customResourceUid );
73
- eventHandler .handleEvent (new TimerEvent (customResourceUid , TimerEventSource .this ));
94
+ if (running .get ()) {
95
+ log .debug ("Producing event for custom resource id: {}" , customResourceUid );
96
+ eventHandler .handleEvent (new TimerEvent (customResourceUid , TimerEventSource .this ));
97
+ }
74
98
}
75
99
}
76
100
}
0 commit comments