Skip to content

Commit 4c0a462

Browse files
authored
Merge pull request #92 from ContainerSolutions/generation-aware-scheduling
Generation aware scheduling
2 parents a25ef48 + 051bd5a commit 4c0a462

File tree

12 files changed

+203
-53
lines changed

12 files changed

+203
-53
lines changed

Diff for: operator-framework/src/main/java/com/github/containersolutions/operator/ControllerUtils.java

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ static String getDefaultFinalizer(ResourceController controller) {
2828
return getAnnotation(controller).finalizerName();
2929
}
3030

31+
static boolean getGenerationEventProcessing(ResourceController controller) {
32+
return getAnnotation(controller).generationAwareEventProcessing();
33+
}
34+
3135
static <R extends CustomResource> Class<R> getCustomResourceClass(ResourceController controller) {
3236
return (Class<R>) getAnnotation(controller).customResourceClass();
3337
}

Diff for: operator-framework/src/main/java/com/github/containersolutions/operator/Operator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private <R extends CustomResource> void registerController(ResourceController<R>
6060
MixedOperation client = k8sClient.customResources(crd, resClass, CustomResourceList.class, getCustomResourceDoneableClass(controller));
6161
EventDispatcher eventDispatcher = new EventDispatcher(controller,
6262
getDefaultFinalizer(controller), new EventDispatcher.CustomResourceReplaceFacade(client));
63-
EventScheduler eventScheduler = new EventScheduler(eventDispatcher, retry);
63+
EventScheduler eventScheduler = new EventScheduler(eventDispatcher, retry, ControllerUtils.getGenerationEventProcessing(controller));
6464
registerWatches(controller, client, resClass, watchAllNamespaces, targetNamespaces, eventScheduler);
6565
}
6666

Diff for: operator-framework/src/main/java/com/github/containersolutions/operator/api/Controller.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,12 @@
1818
Class<? extends CustomResource> customResourceClass();
1919

2020
String finalizerName() default DEFAULT_FINALIZER;
21-
}
21+
22+
/**
23+
* If true, will process new event only if generation increased since the last processing, otherwise will
24+
* process all events.
25+
* See generation meta attribute
26+
* <a href="https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#status-subresource">here</a>
27+
*/
28+
boolean generationAwareEventProcessing() default true;
29+
}

Diff for: operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ public class EventScheduler implements Watcher<CustomResource> {
4040
private final ScheduledThreadPoolExecutor executor;
4141
private final EventStore eventStore = new EventStore();
4242
private final Retry retry;
43+
private final boolean generationAware;
4344

4445
private ReentrantLock lock = new ReentrantLock();
4546

46-
public EventScheduler(EventDispatcher eventDispatcher, Retry retry) {
47+
public EventScheduler(EventDispatcher eventDispatcher, Retry retry, boolean generationAware) {
4748
this.eventDispatcher = eventDispatcher;
4849
this.retry = retry;
50+
this.generationAware = generationAware;
4951
executor = new ScheduledThreadPoolExecutor(1);
5052
executor.setRemoveOnCancelPolicy(true);
5153
}
@@ -70,12 +72,19 @@ void scheduleEventFromApi(CustomResourceEvent event) {
7072
log.debug("Skipping delete event since deletion timestamp is present on resource, so finalizer was in place.");
7173
return;
7274
}
75+
// In case of generation aware processing, we want to replace this even if generation not increased,
76+
// to have the most recent copy of the event.
7377
if (eventStore.containsNotScheduledEvent(event.resourceUid())) {
7478
log.debug("Replacing not scheduled event with actual event." +
7579
" New event: {}", event);
7680
eventStore.addOrReplaceEventAsNotScheduled(event);
7781
return;
7882
}
83+
if (generationAware && !eventStore.hasLargerGenerationThanLastStored(event)) {
84+
log.debug("Skipping event, has not larger generation than last stored, actual generation: {}, last stored: {} ",
85+
event.getResource().getMetadata().getGeneration(), eventStore.getLastStoredGeneration(event));
86+
return;
87+
}
7988
if (eventStore.containsEventUnderProcessing(event.resourceUid())) {
8089
log.debug("Scheduling event for later processing since there is an event under processing for same kind." +
8190
" New event: {}", event);

Diff for: operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java

+19
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ public class EventStore {
77

88
private final Map<String, CustomResourceEvent> eventsNotScheduled = new HashMap<>();
99
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
10+
private final Map<String, Long> lastGeneration = new HashMap<>();
1011

1112
public boolean containsNotScheduledEvent(String uuid) {
1213
return eventsNotScheduled.containsKey(uuid);
@@ -18,6 +19,7 @@ public CustomResourceEvent removeEventNotScheduled(String uid) {
1819

1920
public void addOrReplaceEventAsNotScheduled(CustomResourceEvent event) {
2021
eventsNotScheduled.put(event.resourceUid(), event);
22+
updateLastGeneration(event);
2123
}
2224

2325
public boolean containsEventUnderProcessing(String uuid) {
@@ -26,10 +28,27 @@ public boolean containsEventUnderProcessing(String uuid) {
2628

2729
public void addEventUnderProcessing(CustomResourceEvent event) {
2830
eventsUnderProcessing.put(event.resourceUid(), event);
31+
updateLastGeneration(event);
2932
}
3033

3134
public CustomResourceEvent removeEventUnderProcessing(String uid) {
3235
return eventsUnderProcessing.remove(uid);
3336
}
3437

38+
private void updateLastGeneration(CustomResourceEvent event) {
39+
Long generation = event.getResource().getMetadata().getGeneration();
40+
Long storedGeneration = lastGeneration.get(event.getResource().getMetadata().getUid());
41+
if (storedGeneration == null || generation > storedGeneration) {
42+
lastGeneration.put(event.getResource().getMetadata().getUid(), generation);
43+
}
44+
}
45+
46+
public boolean hasLargerGenerationThanLastStored(CustomResourceEvent event) {
47+
return getLastStoredGeneration(event) == null || getLastStoredGeneration(event) <
48+
event.getResource().getMetadata().getGeneration();
49+
}
50+
51+
public Long getLastStoredGeneration(CustomResourceEvent event) {
52+
return lastGeneration.get(event.getResource().getMetadata().getUid());
53+
}
3554
}

Diff for: operator-framework/src/test/java/com/github/containersolutions/operator/ConcurrencyIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void cleanup() {
3838
}
3939

4040
@Test
41-
public void manyResourcesGetCreatedUpdatedAndDeleted() throws Exception {
41+
public void manyResourcesGetCreatedUpdatedAndDeleted() {
4242
integrationTest.teardownIfSuccess(() -> {
4343
log.info("Adding new resources.");
4444
for (int i = 0; i < NUMBER_OF_RESOURCES_CREATED; i++) {

Diff for: operator-framework/src/test/java/com/github/containersolutions/operator/ControllerExecutionIT.java

+55-33
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import com.github.containersolutions.operator.sample.TestCustomResourceSpec;
55
import io.fabric8.kubernetes.api.model.ConfigMap;
66
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
7-
import org.junit.jupiter.api.BeforeAll;
8-
import org.junit.jupiter.api.BeforeEach;
97
import org.junit.jupiter.api.Test;
108
import org.junit.jupiter.api.TestInstance;
119
import org.slf4j.Logger;
@@ -23,45 +21,69 @@ public class ControllerExecutionIT {
2321
private final static Logger log = LoggerFactory.getLogger(ControllerExecutionIT.class);
2422
private IntegrationTestSupport integrationTestSupport = new IntegrationTestSupport();
2523

26-
@BeforeAll
27-
public void setup() {
28-
integrationTestSupport.initialize();
24+
public void initAndCleanup(boolean controllerStatusUpdate) {
25+
integrationTestSupport.initialize(controllerStatusUpdate);
26+
integrationTestSupport.cleanup();
2927
}
3028

31-
@BeforeEach
32-
public void cleanup() {
33-
integrationTestSupport.cleanup();
29+
@Test
30+
public void configMapGetsCreatedForTestCustomResource() {
31+
initAndCleanup(true);
32+
integrationTestSupport.teardownIfSuccess(() -> {
33+
TestCustomResource resource = testCustomResource();
34+
35+
integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).create(resource);
36+
37+
awaitResourcesCreatedOrUpdated();
38+
awaitStatusUpdated();
39+
});
3440
}
3541

3642
@Test
37-
public void configMapGetsCreatedForTestCustomResource() throws Exception {
43+
public void eventIsSkippedChangedOnMetadataOnlyUpdate() {
44+
initAndCleanup(false);
3845
integrationTestSupport.teardownIfSuccess(() -> {
39-
TestCustomResource resource = new TestCustomResource();
40-
resource.setMetadata(new ObjectMetaBuilder()
41-
.withName("test-custom-resource")
42-
.withNamespace(TEST_NAMESPACE)
43-
.build());
44-
resource.setKind("CustomService");
45-
resource.setSpec(new TestCustomResourceSpec());
46-
resource.getSpec().setConfigMapName("test-config-map");
47-
resource.getSpec().setKey("test-key");
48-
resource.getSpec().setValue("test-value");
46+
TestCustomResource resource = testCustomResource();
47+
4948
integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).create(resource);
5049

51-
await("configmap created").atMost(5, TimeUnit.SECONDS)
52-
.untilAsserted(() -> {
53-
ConfigMap configMap = integrationTestSupport.getK8sClient().configMaps().inNamespace(TEST_NAMESPACE)
54-
.withName("test-config-map").get();
55-
assertThat(configMap).isNotNull();
56-
assertThat(configMap.getData().get("test-key")).isEqualTo("test-value");
57-
});
58-
await("cr status updated").atMost(5, TimeUnit.SECONDS)
59-
.untilAsserted(() -> {
60-
TestCustomResource cr = integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).withName("test-custom-resource").get();
61-
assertThat(cr).isNotNull();
62-
assertThat(cr.getStatus()).isNotNull();
63-
assertThat(cr.getStatus().getConfigMapStatus()).isEqualTo("ConfigMap Ready");
64-
});
50+
awaitResourcesCreatedOrUpdated();
51+
assertThat(integrationTestSupport.numberOfControllerExecutions()).isEqualTo(1);
6552
});
6653
}
54+
55+
void awaitResourcesCreatedOrUpdated() {
56+
await("configmap created").atMost(5, TimeUnit.SECONDS)
57+
.untilAsserted(() -> {
58+
ConfigMap configMap = integrationTestSupport.getK8sClient().configMaps().inNamespace(TEST_NAMESPACE)
59+
.withName("test-config-map").get();
60+
assertThat(configMap).isNotNull();
61+
assertThat(configMap.getData().get("test-key")).isEqualTo("test-value");
62+
});
63+
}
64+
65+
void awaitStatusUpdated() {
66+
await("cr status updated").atMost(5, TimeUnit.SECONDS)
67+
.untilAsserted(() -> {
68+
TestCustomResource cr = integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).withName("test-custom-resource").get();
69+
assertThat(cr).isNotNull();
70+
assertThat(cr.getStatus()).isNotNull();
71+
assertThat(cr.getStatus().getConfigMapStatus()).isEqualTo("ConfigMap Ready");
72+
});
73+
}
74+
75+
private TestCustomResource testCustomResource() {
76+
TestCustomResource resource = new TestCustomResource();
77+
resource.setMetadata(new ObjectMetaBuilder()
78+
.withName("test-custom-resource")
79+
.withNamespace(TEST_NAMESPACE)
80+
.build());
81+
resource.setKind("CustomService");
82+
resource.setSpec(new TestCustomResourceSpec());
83+
resource.getSpec().setConfigMapName("test-config-map");
84+
resource.getSpec().setKey("test-key");
85+
resource.getSpec().setValue("test-value");
86+
return resource;
87+
}
88+
6789
}

Diff for: operator-framework/src/test/java/com/github/containersolutions/operator/ControllerUtilsTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public void returnsValuesFromControllerAnnotationFinalizer() {
1717
assertEquals(DEFAULT_FINALIZER, ControllerUtils.getDefaultFinalizer(new TestCustomResourceController(null)));
1818
assertEquals(TestCustomResource.class, ControllerUtils.getCustomResourceClass(new TestCustomResourceController(null)));
1919
assertEquals(CRD_NAME, ControllerUtils.getCrdName(new TestCustomResourceController(null)));
20+
assertEquals(true, ControllerUtils.getGenerationEventProcessing(new TestCustomResourceController(null)));
2021
assertTrue(CustomResourceDoneable.class.isAssignableFrom(ControllerUtils.getCustomResourceDoneableClass(new TestCustomResourceController(null))));
2122
}
2223
}

Diff for: operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java

+61-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class EventSchedulerTest {
3232
@SuppressWarnings("unchecked")
3333
private EventDispatcher eventDispatcher = mock(EventDispatcher.class);
3434

35-
private EventScheduler eventScheduler = new EventScheduler(eventDispatcher, new GenericRetry().setMaxAttempts(MAX_RETRY_ATTEMPTS).withLinearRetry());
35+
private EventScheduler eventScheduler = initScheduler(true);
3636

3737
private List<EventProcessingDetail> eventProcessingList = Collections.synchronizedList(new ArrayList<>());
3838

@@ -55,6 +55,7 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
5555
CustomResource resource1 = sampleResource();
5656
CustomResource resource2 = sampleResource();
5757
resource2.getMetadata().setResourceVersion("2");
58+
resource2.getMetadata().setGeneration(2l);
5859

5960
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
6061
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
@@ -70,14 +71,55 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
7071
"Start time of event 2 is after end time of event 1");
7172
}
7273

74+
@Test
75+
public void generationAwareSchedulingSkipsEventsWithoutIncreasedGeneration() {
76+
normalDispatcherExecution();
77+
CustomResource resource1 = sampleResource();
78+
CustomResource resource2 = sampleResource();
79+
resource2.getMetadata().setResourceVersion("2");
80+
81+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
82+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
83+
84+
waitTimeForExecution(2);
85+
assertThat(eventProcessingList).hasSize(1)
86+
.matches(list ->
87+
eventProcessingList.get(0).getCustomResource().getMetadata().getResourceVersion().equals("1"));
88+
89+
}
90+
91+
@Test
92+
public void notGenerationAwareSchedulingProcessesAllEventsRegardlessOfGeneration() {
93+
generationUnAwareScheduler();
94+
normalDispatcherExecution();
95+
CustomResource resource1 = sampleResource();
96+
CustomResource resource2 = sampleResource();
97+
resource2.getMetadata().setResourceVersion("2");
98+
99+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
100+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
101+
102+
waitTimeForExecution(2);
103+
log.info("Event processing details 1.: {}. 2: {}", eventProcessingList.get(0), eventProcessingList.get(1));
104+
assertThat(eventProcessingList).hasSize(2)
105+
.matches(list -> eventProcessingList.get(0).getCustomResource().getMetadata().getResourceVersion().equals("1") &&
106+
eventProcessingList.get(1).getCustomResource().getMetadata().getResourceVersion().equals("2"),
107+
"Events processed in correct order")
108+
.matches(list -> eventExecutedBefore(0, 1),
109+
"Start time of event 2 is after end time of event 1");
110+
}
111+
112+
// note that this is true for generation aware scheduling
73113
@Test
74114
public void onlyLastEventIsScheduledIfMoreReceivedDuringAndExecution() {
75115
normalDispatcherExecution();
76116
CustomResource resource1 = sampleResource();
77117
CustomResource resource2 = sampleResource();
78118
resource2.getMetadata().setResourceVersion("2");
119+
resource2.getMetadata().setGeneration(2l);
79120
CustomResource resource3 = sampleResource();
80121
resource3.getMetadata().setResourceVersion("3");
122+
resource3.getMetadata().setGeneration(3l);
81123

82124
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
83125
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
@@ -89,8 +131,7 @@ public void onlyLastEventIsScheduledIfMoreReceivedDuringAndExecution() {
89131
.matches(list -> eventProcessingList.get(0).getCustomResource().getMetadata().getResourceVersion().equals("1") &&
90132
eventProcessingList.get(1).getCustomResource().getMetadata().getResourceVersion().equals("3"),
91133
"Events processed in correct order")
92-
.matches(list ->
93-
eventProcessingList.get(0).getEndTime().isBefore(eventProcessingList.get(1).startTime),
134+
.matches(list -> eventExecutedBefore(0, 1),
94135
"Start time of event 2 is after end time of event 1");
95136
}
96137

@@ -118,6 +159,7 @@ public void processesNewEventIfItIsReceivedAfterExecutionInError() {
118159
CustomResource resource1 = sampleResource();
119160
CustomResource resource2 = sampleResource();
120161
resource2.getMetadata().setResourceVersion("2");
162+
resource2.getMetadata().setGeneration(2l);
121163

122164
doAnswer(this::exceptionInExecution).when(eventDispatcher).handleEvent(any(Watcher.Action.class), eq(resource1));
123165
doAnswer(this::normalExecution).when(eventDispatcher).handleEvent(any(Watcher.Action.class), eq(resource2));
@@ -131,16 +173,15 @@ public void processesNewEventIfItIsReceivedAfterExecutionInError() {
131173
.matches(list -> eventProcessingList.get(0).getCustomResource().getMetadata().getResourceVersion().equals("1") &&
132174
eventProcessingList.get(1).getCustomResource().getMetadata().getResourceVersion().equals("2"),
133175
"Events processed in correct order")
134-
.matches(list ->
135-
eventProcessingList.get(0).getEndTime().isBefore(eventProcessingList.get(1).startTime),
176+
.matches(list -> eventExecutedBefore(0, 1),
136177
"Start time of event 2 is after end time of event 1");
137178

138179
assertThat(eventProcessingList.get(0).getException()).isNotNull();
139180
assertThat(eventProcessingList.get(1).getException()).isNull();
140181
}
141182

142183
@Test
143-
public void numberOfRetriesIsLimited() {
184+
public void numberOfRetriesCanBeLimited() {
144185
doAnswer(this::exceptionInExecution).when(eventDispatcher).handleEvent(any(Watcher.Action.class), any(CustomResource.class));
145186

146187
eventScheduler.eventReceived(Watcher.Action.MODIFIED, sampleResource());
@@ -166,6 +207,14 @@ private Object normalExecution(InvocationOnMock invocation) {
166207
}
167208
}
168209

210+
private void generationUnAwareScheduler() {
211+
eventScheduler = initScheduler(false);
212+
}
213+
214+
private EventScheduler initScheduler(boolean generationAware) {
215+
return new EventScheduler(eventDispatcher,
216+
new GenericRetry().setMaxAttempts(MAX_RETRY_ATTEMPTS).withLinearRetry(), generationAware);
217+
}
169218

170219
private Object exceptionInExecution(InvocationOnMock invocation) {
171220
try {
@@ -181,6 +230,11 @@ private Object exceptionInExecution(InvocationOnMock invocation) {
181230
}
182231
}
183232

233+
private boolean eventExecutedBefore(int event1Index, int event2Index) {
234+
return eventProcessingList.get(event1Index).getEndTime().isBefore(eventProcessingList.get(event2Index).startTime) ||
235+
eventProcessingList.get(event1Index).getEndTime().equals(eventProcessingList.get(event2Index).startTime);
236+
}
237+
184238
private void waitMinimalTimeForExecution() {
185239
waitTimeForExecution(1);
186240
}
@@ -203,7 +257,7 @@ CustomResource sampleResource() {
203257
resource.setMetadata(new ObjectMetaBuilder()
204258
.withCreationTimestamp("creationTimestamp")
205259
.withDeletionGracePeriodSeconds(10L)
206-
.withGeneration(10L)
260+
.withGeneration(1L)
207261
.withName("name")
208262
.withNamespace("namespace")
209263
.withResourceVersion("1")

0 commit comments

Comments
 (0)