@@ -206,8 +206,10 @@ open class RedisClient : RedisCommandTarget {
206
206
var subscribedChannels = Set < String > ( )
207
207
var subscribedPatterns = Set < String > ( )
208
208
209
- private var subscribeListeners = EventLoopEventListenerSet < ( String , Int ) > ( )
210
- private var messageListeners = EventLoopEventListenerSet < ( String , String ) > ( )
209
+ private var subscribeListeners = EventLoopEventListenerSet < ( String , Int ) > ( )
210
+ private var psubscribeListeners = EventLoopEventListenerSet < ( String , Int ) > ( )
211
+ private var messageListeners =
212
+ EventLoopEventListenerSet < ( String , String , String ? ) > ( )
211
213
212
214
open func subscribe( _ channels: String ... ) {
213
215
_subscribe ( channels: channels)
@@ -282,7 +284,7 @@ open class RedisClient : RedisCommandTarget {
282
284
283
285
let newPatterns = Set ( patterns) . subtracting ( subscribedPatterns)
284
286
if !newPatterns. isEmpty {
285
- subscribedPatterns. formUnion ( newChannels )
287
+ subscribedPatterns. formUnion ( newPatterns )
286
288
287
289
if state. isConnected {
288
290
let call = RedisCommandCall ( [ " PSUBSCRIBE " ] + newPatterns,
@@ -308,7 +310,35 @@ open class RedisClient : RedisCommandTarget {
308
310
}
309
311
310
312
@discardableResult
313
+ open func onPSubscribe( _ cb: @escaping ( String , Int ) -> Void ) -> Self {
314
+ if eventLoop. inEventLoop {
315
+ psubscribeListeners. append ( cb)
316
+ }
317
+ else {
318
+ eventLoop. execute {
319
+ self . psubscribeListeners. append ( cb)
320
+ }
321
+ }
322
+ return self
323
+ }
324
+
325
+
326
+ @discardableResult
327
+ /// Executes the callback when the pub/sub system receives a message.
328
+ /// The callback parameters are channel and message.
311
329
open func onMessage( _ cb: @escaping ( String , String ) -> Void ) -> Self {
330
+ return onMessage { ( channel, message, _) in
331
+ cb ( channel, message)
332
+ }
333
+ }
334
+
335
+ /// Executes the callback when the pub/sub system receives a message.
336
+ /// The callback parameters are channel, message and the pattern in case
337
+ /// of a pattern subscription.
338
+ @discardableResult
339
+ open func onMessage( _ cb: @escaping ( String , String , String ? )
340
+ -> Void ) -> Self
341
+ {
312
342
if eventLoop. inEventLoop {
313
343
messageListeners. append ( cb)
314
344
}
@@ -423,13 +453,27 @@ open class RedisClient : RedisCommandTarget {
423
453
if let channel = channel. stringValue,
424
454
let message = message. stringValue
425
455
{
426
- return messageListeners. emit ( ( channel, message ) )
456
+ return messageListeners. emit ( ( channel, message, nil ) )
457
+ }
458
+ case ( " pmessage " , let pattern, let channel ) :
459
+ if items. count > 3 ,
460
+ let pattern = pattern. stringValue,
461
+ let channel = channel. stringValue,
462
+ let message = items [ 3 ] . stringValue
463
+ {
464
+ return messageListeners. emit ( ( channel, message, pattern ) )
427
465
}
428
- case ( " subscribe " , let channel, let count ) :
466
+ case ( " subscribe " , let channel, let count ) ,
467
+ ( " psubscribe " , let channel, let count ) :
429
468
if let channel = channel. stringValue, let count = count. intValue {
430
- return subscribeListeners. emit ( ( channel, count ) )
469
+ return subscribeListeners. emit ( ( channel, count ) )
470
+ }
471
+ case ( " psubscribe " , let pattern, let count ) :
472
+ if let pattern = pattern. stringValue, let count = count. intValue {
473
+ return psubscribeListeners. emit ( ( pattern, count ) )
431
474
}
432
475
case ( " unsubscribe " , _, _ ) : return
476
+ case ( " punsubscribe " , _, _ ) : return
433
477
default : break
434
478
}
435
479
}
0 commit comments