20
20
import io .javaoperatorsdk .operator .processing .event .DefaultEventSourceManager ;
21
21
import io .javaoperatorsdk .operator .processing .event .Event ;
22
22
import io .javaoperatorsdk .operator .processing .event .EventHandler ;
23
+ import io .javaoperatorsdk .operator .processing .event .internal .CustomResourceEvent ;
24
+ import io .javaoperatorsdk .operator .processing .event .internal .ResourceAction ;
23
25
import io .javaoperatorsdk .operator .processing .retry .GenericRetry ;
24
26
import io .javaoperatorsdk .operator .processing .retry .Retry ;
25
27
import io .javaoperatorsdk .operator .processing .retry .RetryExecution ;
26
28
27
- import static io .javaoperatorsdk .operator .EventListUtils .containsCustomResourceDeletedEvent ;
28
29
import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getName ;
29
30
30
31
/**
@@ -38,7 +39,6 @@ public class DefaultEventHandler<R extends CustomResource<?, ?>> implements Even
38
39
@ Deprecated
39
40
private static EventMonitor monitor = EventMonitor .NOOP ;
40
41
41
- private final EventBuffer eventBuffer ;
42
42
private final Set <CustomResourceID > underProcessing = new HashSet <>();
43
43
private final EventDispatcher <R > eventDispatcher ;
44
44
private final Retry retry ;
@@ -50,6 +50,7 @@ public class DefaultEventHandler<R extends CustomResource<?, ?>> implements Even
50
50
private volatile boolean running ;
51
51
private final ResourceCache <R > resourceCache ;
52
52
private DefaultEventSourceManager <R > eventSourceManager ;
53
+ private final EventMarker eventMarker ;
53
54
54
55
public DefaultEventHandler (ConfiguredController <R > controller , ResourceCache <R > resourceCache ) {
55
56
this (
@@ -58,18 +59,20 @@ public DefaultEventHandler(ConfiguredController<R> controller, ResourceCache<R>
58
59
controller .getConfiguration ().getName (),
59
60
new EventDispatcher <>(controller ),
60
61
GenericRetry .fromConfiguration (controller .getConfiguration ().getRetryConfiguration ()),
61
- controller .getConfiguration ().getConfigurationService ().getMetrics ().getEventMonitor ());
62
+ controller .getConfiguration ().getConfigurationService ().getMetrics ().getEventMonitor (),
63
+ new EventMarker ());
62
64
}
63
65
64
66
DefaultEventHandler (EventDispatcher <R > eventDispatcher , ResourceCache <R > resourceCache ,
65
67
String relatedControllerName ,
66
- Retry retry ) {
67
- this (resourceCache , null , relatedControllerName , eventDispatcher , retry , null );
68
+ Retry retry , EventMarker eventMarker ) {
69
+ this (resourceCache , null , relatedControllerName , eventDispatcher , retry , null , eventMarker );
68
70
}
69
71
70
72
private DefaultEventHandler (ResourceCache <R > resourceCache , ExecutorService executor ,
71
73
String relatedControllerName ,
72
- EventDispatcher <R > eventDispatcher , Retry retry , EventMonitor monitor ) {
74
+ EventDispatcher <R > eventDispatcher , Retry retry , EventMonitor monitor ,
75
+ EventMarker eventMarker ) {
73
76
this .running = true ;
74
77
this .executor =
75
78
executor == null
@@ -79,9 +82,9 @@ private DefaultEventHandler(ResourceCache<R> resourceCache, ExecutorService exec
79
82
this .controllerName = relatedControllerName ;
80
83
this .eventDispatcher = eventDispatcher ;
81
84
this .retry = retry ;
82
- eventBuffer = new EventBuffer ();
83
85
this .resourceCache = resourceCache ;
84
86
this .eventMonitor = monitor != null ? monitor : EventMonitor .NOOP ;
87
+ this .eventMarker = eventMarker ;
85
88
}
86
89
87
90
public void setEventSourceManager (DefaultEventSourceManager <R > eventSourceManager ) {
@@ -113,71 +116,75 @@ private EventMonitor monitor() {
113
116
114
117
@ Override
115
118
public void handleEvent (Event event ) {
119
+ lock .lock ();
116
120
try {
117
- lock .lock ();
118
121
log .debug ("Received event: {}" , event );
119
122
if (!this .running ) {
120
123
log .debug ("Skipping event: {} because the event handler is shutting down" , event );
121
124
return ;
122
125
}
123
126
final var monitor = monitor ();
124
- eventBuffer .addEvent (event .getRelatedCustomResourceID (), event );
125
127
monitor .processedEvent (event .getRelatedCustomResourceID (), event );
126
- executeBufferedEvents (event .getRelatedCustomResourceID ());
127
- } finally {
128
- lock .unlock ();
129
- }
130
- }
131
128
132
- @ Override
133
- public void close () {
134
- try {
135
- lock .lock ();
136
- this .running = false ;
129
+ handleEventMarking (event );
130
+ if (!eventMarker .deleteEventPresent (event .getRelatedCustomResourceID ())) {
131
+ submitReconciliationExecution (event .getRelatedCustomResourceID ());
132
+ } else {
133
+ cleanupForDeletedEvent (event .getRelatedCustomResourceID ());
134
+ }
137
135
} finally {
138
136
lock .unlock ();
139
137
}
140
138
}
141
139
142
- private boolean executeBufferedEvents (CustomResourceID customResourceUid ) {
143
- boolean newEventForResourceId = eventBuffer .containsEvents (customResourceUid );
140
+ private boolean submitReconciliationExecution (CustomResourceID customResourceUid ) {
144
141
boolean controllerUnderExecution = isControllerUnderExecution (customResourceUid );
145
142
Optional <R > latestCustomResource =
146
143
resourceCache .getCustomResource (customResourceUid );
147
144
148
- if (!controllerUnderExecution && newEventForResourceId && latestCustomResource .isPresent ()) {
145
+ if (!controllerUnderExecution
146
+ && latestCustomResource .isPresent ()) {
149
147
setUnderExecutionProcessing (customResourceUid );
150
148
ExecutionScope executionScope =
151
149
new ExecutionScope (
152
- eventBuffer .getAndRemoveEventsForExecution (customResourceUid ),
153
150
latestCustomResource .get (),
154
151
retryInfo (customResourceUid ));
152
+ eventMarker .unMarkEventReceived (customResourceUid );
155
153
log .debug ("Executing events for custom resource. Scope: {}" , executionScope );
156
154
executor .execute (new ControllerExecution (executionScope ));
157
155
return true ;
158
156
} else {
159
157
log .debug (
160
- "Skipping executing controller for resource id: {}. Events in queue: {}. "
158
+ "Skipping executing controller for resource id: {}."
161
159
+ " Controller in execution: {}. Latest CustomResource present: {}" ,
162
160
customResourceUid ,
163
- newEventForResourceId ,
164
161
controllerUnderExecution ,
165
162
latestCustomResource .isPresent ());
166
163
if (latestCustomResource .isEmpty ()) {
167
- log .warn ("no custom resource found in cache for CustomResourceID: {}" , customResourceUid );
164
+ log .warn ("no custom resource found in cache for CustomResourceID: {}" ,
165
+ customResourceUid );
168
166
}
169
167
return false ;
170
168
}
171
169
}
172
170
171
+ private void handleEventMarking (Event event ) {
172
+ if (event instanceof CustomResourceEvent &&
173
+ ((CustomResourceEvent ) event ).getAction () == ResourceAction .DELETED ) {
174
+ eventMarker .markDeleteEventReceived (event );
175
+ } else if (!eventMarker .deleteEventPresent (event .getRelatedCustomResourceID ())) {
176
+ eventMarker .markEventReceived (event );
177
+ }
178
+ }
179
+
173
180
private RetryInfo retryInfo (CustomResourceID customResourceUid ) {
174
181
return retryState .get (customResourceUid );
175
182
}
176
183
177
184
void eventProcessingFinished (
178
185
ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
186
+ lock .lock ();
179
187
try {
180
- lock .lock ();
181
188
if (!running ) {
182
189
return ;
183
190
}
@@ -188,23 +195,29 @@ void eventProcessingFinished(
188
195
postExecutionControl );
189
196
unsetUnderExecution (executionScope .getCustomResourceID ());
190
197
191
- if (retry != null && postExecutionControl .exceptionDuringExecution ()) {
198
+ // If a delete event present at this phase, it was received during reconciliation.
199
+ // So we either removed the finalizer during reconciliation or we don't use finalizers.
200
+ // Either way we don't want to retry.
201
+ if (retry != null && postExecutionControl .exceptionDuringExecution () &&
202
+ !eventMarker .deleteEventPresent (executionScope .getCustomResourceID ())) {
192
203
handleRetryOnException (executionScope );
193
- final var monitor = monitor ();
194
- executionScope .getEvents ()
195
- . forEach ( e -> monitor .failedEvent (executionScope .getCustomResourceID (), e ));
204
+ // todo revisit monitoring since events are not present anymore
205
+ // final var monitor = monitor(); executionScope.getEvents().forEach(e ->
206
+ // monitor.failedEvent(executionScope.getCustomResourceID(), e));
196
207
return ;
197
208
}
198
209
199
210
if (retry != null ) {
200
- markSuccessfulExecutionRegardingRetry (executionScope );
211
+ handleSuccessfulExecutionRegardingRetry (executionScope );
201
212
}
202
- if (containsCustomResourceDeletedEvent (executionScope .getEvents ())) {
203
- cleanupAfterDeletedEvent (executionScope .getCustomResourceID ());
213
+ if (eventMarker . deleteEventPresent (executionScope .getCustomResourceID ())) {
214
+ cleanupForDeletedEvent (executionScope .getCustomResourceID ());
204
215
} else {
205
- var executed = executeBufferedEvents (executionScope .getCustomResourceID ());
206
- if (!executed ) {
207
- reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getCustomResource ());
216
+ if (eventMarker .eventPresent (executionScope .getCustomResourceID ())) {
217
+ submitReconciliationExecution (executionScope .getCustomResourceID ());
218
+ } else {
219
+ reScheduleExecutionIfInstructed (postExecutionControl ,
220
+ executionScope .getCustomResource ());
208
221
}
209
222
}
210
223
} finally {
@@ -227,13 +240,13 @@ private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecuti
227
240
private void handleRetryOnException (ExecutionScope <R > executionScope ) {
228
241
RetryExecution execution = getOrInitRetryExecution (executionScope );
229
242
var customResourceID = executionScope .getCustomResourceID ();
230
- boolean newEventsExists = eventBuffer
231
- .newEventsExists (customResourceID );
232
- eventBuffer .putBackEvents (customResourceID , executionScope .getEvents ());
243
+ boolean eventPresent = eventMarker .eventPresent (customResourceID );
244
+ eventMarker .markEventReceived (customResourceID );
233
245
234
- if (newEventsExists ) {
235
- log .debug ("New events exists for for resource id: {}" , customResourceID );
236
- executeBufferedEvents (customResourceID );
246
+ if (eventPresent ) {
247
+ log .debug ("New events exists for for resource id: {}" ,
248
+ customResourceID );
249
+ submitReconciliationExecution (customResourceID );
237
250
return ;
238
251
}
239
252
Optional <Long > nextDelay = execution .nextDelay ();
@@ -251,7 +264,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope) {
251
264
() -> log .error ("Exhausted retries for {}" , executionScope ));
252
265
}
253
266
254
- private void markSuccessfulExecutionRegardingRetry (ExecutionScope <R > executionScope ) {
267
+ private void handleSuccessfulExecutionRegardingRetry (ExecutionScope <R > executionScope ) {
255
268
log .debug (
256
269
"Marking successful execution for resource: {}" ,
257
270
getName (executionScope .getCustomResource ()));
@@ -270,9 +283,9 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
270
283
return retryExecution ;
271
284
}
272
285
273
- private void cleanupAfterDeletedEvent (CustomResourceID customResourceUid ) {
286
+ private void cleanupForDeletedEvent (CustomResourceID customResourceUid ) {
274
287
eventSourceManager .cleanupForCustomResource (customResourceUid );
275
- eventBuffer .cleanup (customResourceUid );
288
+ eventMarker .cleanup (customResourceUid );
276
289
}
277
290
278
291
private boolean isControllerUnderExecution (CustomResourceID customResourceUid ) {
@@ -287,6 +300,15 @@ private void unsetUnderExecution(CustomResourceID customResourceUid) {
287
300
underProcessing .remove (customResourceUid );
288
301
}
289
302
303
+ @ Override
304
+ public void close () {
305
+ lock .lock ();
306
+ try {
307
+ this .running = false ;
308
+ } finally {
309
+ lock .unlock ();
310
+ }
311
+ }
290
312
291
313
private class ControllerExecution implements Runnable {
292
314
private final ExecutionScope <R > executionScope ;
0 commit comments