Skip to content

Commit e43b954

Browse files
authored
fix: concurrent modification when getting event sources (#2573)
1 parent 07e744b commit e43b954

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

33
import java.util.Collections;
4-
import java.util.HashMap;
54
import java.util.List;
65
import java.util.Map;
76
import java.util.Objects;
7+
import java.util.concurrent.ConcurrentHashMap;
88
import java.util.concurrent.ConcurrentNavigableMap;
99
import java.util.concurrent.ConcurrentSkipListMap;
1010
import java.util.stream.Collectors;
@@ -93,7 +93,8 @@ public void add(NamedEventSource eventSource) {
9393
+ keyAsString(getResourceType(original), name)
9494
+ " class/name combination");
9595
}
96-
sources.computeIfAbsent(keyFor(original), k -> new HashMap<>()).put(name, eventSource);
96+
sources.computeIfAbsent(keyFor(original), k -> new ConcurrentHashMap<>()).put(name,
97+
eventSource);
9798
}
9899

99100
@SuppressWarnings("rawtypes")

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java

+47
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.util.ConcurrentModificationException;
4+
import java.util.concurrent.Phaser;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.stream.Collectors;
7+
import java.util.stream.IntStream;
8+
39
import org.junit.jupiter.api.Test;
410

511
import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -184,4 +190,45 @@ void getEventSourcesShouldWork() {
184190

185191
assertThat(eventSources.getEventSources(Service.class)).isEmpty();
186192
}
193+
194+
@Test
195+
void testConcurrentAddRemoveAndGet() throws InterruptedException {
196+
final var concurrentExceptionFound = new AtomicBoolean(false);
197+
for (int i = 0; i < 1000 && !concurrentExceptionFound.get(); i++) {
198+
final var eventSources = new EventSources();
199+
var eventSourceList =
200+
IntStream.range(1, 20).mapToObj(n -> {
201+
var mockResES = mock(ResourceEventSource.class);
202+
NamedEventSource eventSource = mock(NamedEventSource.class);
203+
when(eventSource.original()).thenReturn(mockResES);
204+
when(eventSource.name()).thenReturn("name" + n);
205+
when(mockResES.resourceType()).thenReturn(HasMetadata.class);
206+
return eventSource;
207+
}).collect(Collectors.toList());
208+
209+
IntStream.range(1, 10).forEach(n -> eventSources.add(eventSourceList.get(n - 1)));
210+
211+
var phaser = new Phaser(2);
212+
213+
var t1 = new Thread(() -> {
214+
phaser.arriveAndAwaitAdvance();
215+
IntStream.range(11, 20).forEach(n -> eventSources.add(eventSourceList.get(n - 1)));
216+
});
217+
var t2 = new Thread(() -> {
218+
phaser.arriveAndAwaitAdvance();
219+
try {
220+
eventSources.getEventSources(HasMetadata.class);
221+
} catch (ConcurrentModificationException e) {
222+
concurrentExceptionFound.set(true);
223+
}
224+
});
225+
t1.start();
226+
t2.start();
227+
t1.join();
228+
t2.join();
229+
}
230+
assertThat(concurrentExceptionFound)
231+
.withFailMessage("ConcurrentModificationException thrown")
232+
.isFalse();
233+
}
187234
}

0 commit comments

Comments
 (0)