Skip to content

Commit 192f254

Browse files
committed
fix: concurrent modification when getting event sources (v5) (#2572)
1 parent f57c305 commit 192f254

File tree

3 files changed

+48
-3
lines changed

3 files changed

+48
-3
lines changed

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
34
import java.util.Collections;
45
import java.util.HashMap;
56
import java.util.List;
67
import java.util.Map;
78
import java.util.Objects;
9+
import java.util.concurrent.ConcurrentHashMap;
810
import java.util.concurrent.ConcurrentNavigableMap;
911
import java.util.concurrent.ConcurrentSkipListMap;
1012
import java.util.stream.Stream;
@@ -33,7 +35,8 @@ public void add(EventSource eventSource) {
3335
+ " is already registered with name: " + name);
3436
}
3537
sourceByName.put(name, eventSource);
36-
sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
38+
sources.computeIfAbsent(keyFor(eventSource), k -> new ConcurrentHashMap<>()).put(name,
39+
eventSource);
3740
}
3841

3942
public EventSource remove(String name) {
@@ -144,7 +147,6 @@ public <S> List<EventSource<S, P>> getEventSources(Class<S> dependentType) {
144147
if (sourcesForType == null) {
145148
return Collections.emptyList();
146149
}
147-
148150
return sourcesForType.values().stream()
149151
.map(es -> (EventSource<S, P>) es).toList();
150152
}

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class EventSourceManagerTest {
3636
public void registersEventSource() {
3737
EventSource eventSource = mock(EventSource.class);
3838
when(eventSource.resourceType()).thenReturn(EventSource.class);
39+
when(eventSource.name()).thenReturn("name1");
3940

4041
eventSourceManager.registerEventSource(eventSource);
4142

@@ -95,6 +96,7 @@ void retrievingEventSourceForClassShouldWork() {
9596

9697
ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class);
9798
when(eventSource.resourceType()).thenReturn(String.class);
99+
when(eventSource.name()).thenReturn("name1");
98100
manager.registerEventSource(eventSource);
99101

100102
var source = manager.getEventSourceFor(String.class);

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java

+42-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.util.ConcurrentModificationException;
4+
import java.util.concurrent.Phaser;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.stream.IntStream;
7+
38
import org.junit.jupiter.api.Test;
49

510
import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -177,7 +182,43 @@ void getEventSourcesShouldWork() {
177182
assertThat(eventSources.getEventSources(Service.class)).isEmpty();
178183
}
179184

180-
185+
@Test
186+
void testConcurrentAddRemoveAndGet() throws InterruptedException {
187+
188+
final var concurrentExceptionFound = new AtomicBoolean(false);
189+
190+
for (int i = 0; i < 1000 && !concurrentExceptionFound.get(); i++) {
191+
final var eventSources = new EventSources();
192+
var eventSourceList =
193+
IntStream.range(1, 20).mapToObj(n -> eventSourceMockWithName(EventSource.class,
194+
"name" + n, HasMetadata.class)).toList();
195+
196+
IntStream.range(1, 10).forEach(n -> eventSources.add(eventSourceList.get(n - 1)));
197+
198+
var phaser = new Phaser(2);
199+
200+
var t1 = new Thread(() -> {
201+
phaser.arriveAndAwaitAdvance();
202+
IntStream.range(11, 20).forEach(n -> eventSources.add(eventSourceList.get(n - 1)));
203+
});
204+
var t2 = new Thread(() -> {
205+
phaser.arriveAndAwaitAdvance();
206+
try {
207+
eventSources.getEventSources(eventSourceList.get(0).resourceType());
208+
} catch (ConcurrentModificationException e) {
209+
concurrentExceptionFound.set(true);
210+
}
211+
});
212+
t1.start();
213+
t2.start();
214+
t1.join();
215+
t2.join();
216+
}
217+
218+
assertThat(concurrentExceptionFound)
219+
.withFailMessage("ConcurrentModificationException thrown")
220+
.isFalse();
221+
}
181222

182223
<T extends EventSource> EventSource eventSourceMockWithName(Class<T> clazz, String name,
183224
Class resourceType) {

0 commit comments

Comments
 (0)