@@ -19,10 +19,37 @@ class EventSources<P extends HasMetadata> {
19
19
20
20
private final ConcurrentNavigableMap <String , Map <String , EventSource <?, P >>> sources =
21
21
new ConcurrentSkipListMap <>();
22
+ private final Map <String , EventSource > sourceByName = new HashMap <>();
23
+
22
24
private final TimerEventSource <P > retryAndRescheduleTimerEventSource =
23
25
new TimerEventSource <>("RetryAndRescheduleTimerEventSource" );
24
26
private ControllerEventSource <P > controllerEventSource ;
25
27
28
+ public void add (EventSource eventSource ) {
29
+ final var name = eventSource .name ();
30
+ var existing = sourceByName .get (name );
31
+ if (existing != null ) {
32
+ throw new IllegalArgumentException ("Event source " + existing
33
+ + " is already registered with name: " + name );
34
+ }
35
+ sourceByName .put (name , eventSource );
36
+ sources .computeIfAbsent (keyFor (eventSource ), k -> new HashMap <>()).put (name , eventSource );
37
+ }
38
+
39
+ public EventSource remove (String name ) {
40
+ var optionalMap = sources .values ().stream ().filter (m -> m .containsKey (name )).findFirst ();
41
+ sourceByName .remove (name );
42
+ return optionalMap .map (m -> m .remove (name )).orElse (null );
43
+ }
44
+
45
+ public void clear () {
46
+ sources .clear ();
47
+ sourceByName .clear ();
48
+ }
49
+
50
+ public EventSource existingEventSourceByName (String name ) {
51
+ return sourceByName .get (name );
52
+ }
26
53
27
54
void createControllerEventSource (Controller <P > controller ) {
28
55
controllerEventSource = new ControllerEventSource <>(controller );
@@ -54,30 +81,7 @@ Stream<EventSource<?, P>> flatMappedSources() {
54
81
return sources .values ().stream ().flatMap (c -> c .values ().stream ());
55
82
}
56
83
57
- public void clear () {
58
- sources .clear ();
59
- }
60
-
61
- @ SuppressWarnings ("unchecked" )
62
- public <R > EventSource <R , P > existingEventSourceOfSameNameAndType (EventSource <R , P > source ) {
63
- return (EventSource <R , P >) existingEventSourcesOfSameType (source ).get (source .name ());
64
- }
65
-
66
- private <R > Map <String , EventSource <?, P >> existingEventSourcesOfSameType (
67
- EventSource <R , P > source ) {
68
- return sources .getOrDefault (keyFor (source ), Collections .emptyMap ());
69
- }
70
84
71
- public <R > void add (EventSource <R , P > eventSource ) {
72
- final var name = eventSource .name ();
73
- final var existing = existingEventSourcesOfSameType (eventSource );
74
- if (existing .get (name ) != null ) {
75
- throw new IllegalArgumentException ("Event source " + existing
76
- + " is already registered with name: " + name );
77
- }
78
-
79
- sources .computeIfAbsent (keyFor (eventSource ), k -> new HashMap <>()).put (name , eventSource );
80
- }
81
85
82
86
private <R > String keyFor (EventSource <R , P > source ) {
83
87
return keyFor (source .resourceType ());
@@ -145,10 +149,4 @@ public <S> List<EventSource<S, P>> getEventSources(Class<S> dependentType) {
145
149
return sourcesForType .values ().stream ()
146
150
.map (es -> (EventSource <S , P >) es ).toList ();
147
151
}
148
-
149
- @ SuppressWarnings ("rawtypes" )
150
- public EventSource remove (String name ) {
151
- var optionalMap = sources .values ().stream ().filter (m -> m .containsKey (name )).findFirst ();
152
- return optionalMap .map (m -> m .remove (name )).orElse (null );
153
- }
154
152
}
0 commit comments