25
25
import java .io .Closeable ;
26
26
import java .io .StringReader ;
27
27
import java .time .Duration ;
28
- import java .util .ArrayList ;
29
28
import java .util .HashMap ;
30
29
import java .util .List ;
31
30
import java .util .Map ;
@@ -66,11 +65,12 @@ public class Connection implements Closeable {
66
65
return thread ;
67
66
});
68
67
private static final AtomicLong NEXT_ID = new AtomicLong (1L );
68
+ private final AtomicLong EVENT_CALLBACK_ID = new AtomicLong (1 );
69
69
private WebSocket socket ;
70
70
private final Map <Long , Consumer <Either <Throwable , JsonInput >>> methodCallbacks =
71
71
new ConcurrentHashMap <>();
72
72
private final ReadWriteLock callbacksLock = new ReentrantReadWriteLock (true );
73
- private final Map <Event <?>, List < Consumer <?>>> eventCallbacks = new HashMap <>();
73
+ private final Map <Event <?>, Map < Long , Consumer <?>>> eventCallbacks = new HashMap <>();
74
74
private final HttpClient client ;
75
75
private final AtomicBoolean underlyingSocketClosed = new AtomicBoolean ();
76
76
@@ -180,17 +180,26 @@ public <X> X sendAndWait(Command<X> command, Duration timeout) {
180
180
}
181
181
}
182
182
183
- public <X > void addListener (Event <X > event , Consumer <X > handler ) {
183
+ public <X > long addListener (Event <X > event , Consumer <X > handler ) {
184
184
Require .nonNull ("Event to listen for" , event );
185
185
Require .nonNull ("Handler to call" , handler );
186
186
187
+ long id = EVENT_CALLBACK_ID .getAndIncrement ();
188
+
187
189
Lock lock = callbacksLock .writeLock ();
188
190
lock .lock ();
189
191
try {
190
- eventCallbacks .computeIfAbsent (event , (key ) -> new ArrayList <>()).add (handler );
192
+ eventCallbacks .computeIfAbsent (
193
+ event ,
194
+ key -> {
195
+ HashMap <Long , Consumer <?>> map = new HashMap <>();
196
+ map .put (id , handler );
197
+ return map ;
198
+ });
191
199
} finally {
192
200
lock .unlock ();
193
201
}
202
+ return id ;
194
203
}
195
204
196
205
public <X > void clearListener (Event <X > event ) {
@@ -203,6 +212,23 @@ public <X> void clearListener(Event<X> event) {
203
212
}
204
213
}
205
214
215
+ public void removeListener (long id ) {
216
+ Lock lock = callbacksLock .writeLock ();
217
+ lock .lock ();
218
+ try {
219
+ eventCallbacks .forEach ((k , v ) -> v .remove (id ));
220
+ eventCallbacks .forEach (
221
+ (k , v ) -> {
222
+ v .remove (id );
223
+ if (v .isEmpty ()) {
224
+ eventCallbacks .remove (k );
225
+ }
226
+ });
227
+ } finally {
228
+ lock .unlock ();
229
+ }
230
+ }
231
+
206
232
public <X > boolean isEventSubscribed (Event <X > event ) {
207
233
Lock lock = callbacksLock .writeLock ();
208
234
lock .lock ();
@@ -354,7 +380,7 @@ private void handleEventResponse(Map<String, Object> rawDataMap) {
354
380
355
381
final Object finalValue = value ;
356
382
357
- for (Consumer <?> action : event .getValue ()) {
383
+ for (Consumer <?> action : event .getValue (). values () ) {
358
384
@ SuppressWarnings ("unchecked" )
359
385
Consumer <Object > obj = (Consumer <Object >) action ;
360
386
LOG .log (
0 commit comments