Skip to content

Commit c76203c

Browse files
committed
swift-server#103 -- Provide greater context to Pub/Sub Unsubscribe events
1 parent 555062c commit c76203c

File tree

6 files changed

+132
-70
lines changed

6 files changed

+132
-70
lines changed

Sources/RediStack/ChannelHandlers/RedisPubSubHandler.swift

+43-16
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2020 RediStack project authors
5+
// Copyright (c) 2020-2022 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -24,18 +24,44 @@ import NIO
2424
/// - message: The message data that was received from the `publisher`.
2525
public typealias RedisSubscriptionMessageReceiver = (_ publisher: RedisChannelName, _ message: RESPValue) -> Void
2626

27-
/// A closure handler invoked for Pub/Sub subscription changes.
27+
/// The details of the subscription change.
28+
/// - Parameters:
29+
/// - subscriptionKey: The subscribed channel or pattern that had its subscription status changed.
30+
/// - currentSubscriptionCount: The current total number of subscriptions the connection has.
31+
public typealias RedisSubscriptionChangeDetails = (subscriptionKey: String, currentSubscriptionCount: Int)
32+
33+
/// A closure handler invoked for Pub/Sub subscribe commands.
34+
///
35+
/// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
36+
/// even if it was done as a single PSUBSCRIBE or SUBSCRIBE command.
37+
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
38+
///
39+
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
40+
/// so as to not block further messages from being processed.
41+
/// - Parameter details: The details of the subscription.
42+
public typealias RedisSubscribeHandler = (_ details: RedisSubscriptionChangeDetails) -> Void
43+
44+
/// An enumeration of possible sources of Pub/Sub unsubscribe events.
45+
public enum RedisUnsubscribeEventSource {
46+
/// The client sent an unsubscribe command either as UNSUBSCRIBE or PUNSUBSCRIBE.
47+
case userInitiated
48+
/// The client encountered an error and had to unsubscribe.
49+
/// - Parameter _: The error the client encountered.
50+
case clientError(_ error: Error)
51+
}
52+
53+
/// A closure handler invoked for Pub/Sub unsubscribe commands.
2854
///
2955
/// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
30-
/// even if it was done as a single PSUBSCRIBE, SUBSCRIBE, PUNSUBSCRIBE, or UNSUBSCRIBE command.
56+
/// even if it was done as a single PUNSUBSCRIBE or UNSUBSCRIBE command.
3157
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
3258
///
3359
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
3460
/// so as to not block further messages from being processed.
3561
/// - Parameters:
36-
/// - subscriptionKey: The subscribed channel or pattern that had its subscription status changed.
37-
/// - currentSubscriptionCount: The current total number of subscriptions the connection has.
38-
public typealias RedisSubscriptionChangeHandler = (_ subscriptionKey: String, _ currentSubscriptionCount: Int) -> Void
62+
/// - details: The details of the subscription.
63+
/// - source: The source of the unsubscribe event.
64+
public typealias RedisUnsubscribeHandler = (_ details: RedisSubscriptionChangeDetails, _ source: RedisUnsubscribeEventSource) -> Void
3965

4066
/// A list of patterns or channels that a Pub/Sub subscription change is targetting.
4167
///
@@ -146,7 +172,7 @@ extension RedisPubSubHandler {
146172

147173
guard let subscription = self.subscriptions[prefixedKey] else { return }
148174

149-
subscription.onSubscribe?(subscriptionKey, subscriptionCount)
175+
subscription.onSubscribe?((subscriptionKey, subscriptionCount))
150176
subscription.onSubscribe = nil // nil to free memory
151177
self.subscriptions[prefixedKey] = subscription
152178

@@ -161,8 +187,8 @@ extension RedisPubSubHandler {
161187
) {
162188
let prefixedKey = self.prefixKey(subscriptionKey, with: keyPrefix)
163189
guard let subscription = self.subscriptions.removeValue(forKey: prefixedKey) else { return }
164-
165-
subscription.onUnsubscribe?(subscriptionKey, subscriptionCount)
190+
191+
subscription.onUnsubscribe?((subscriptionKey, subscriptionCount), .userInitiated)
166192
subscription.type.gauge.decrement()
167193

168194
switch self.pendingUnsubscribes.removeValue(forKey: prefixedKey) {
@@ -208,8 +234,8 @@ extension RedisPubSubHandler {
208234
public func addSubscription(
209235
for target: RedisSubscriptionTarget,
210236
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
211-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
212-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
237+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
238+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
213239
) -> EventLoopFuture<Int> {
214240
guard self.eventLoop.inEventLoop else {
215241
return self.eventLoop.flatSubmit {
@@ -481,7 +507,8 @@ extension RedisPubSubHandler: ChannelInboundHandler {
481507
let receivers = self.subscriptions
482508
self.subscriptions.removeAll()
483509
receivers.forEach {
484-
$0.value.onUnsubscribe?($0.key, 0)
510+
let source: RedisUnsubscribeEventSource = error.map { .clientError($0) } ?? .userInitiated
511+
$0.value.onUnsubscribe?(($0.key, 0), source)
485512
$0.value.type.gauge.decrement()
486513
}
487514
}
@@ -521,14 +548,14 @@ extension RedisPubSubHandler {
521548
fileprivate final class Subscription {
522549
let type: SubscriptionType
523550
let onMessage: RedisSubscriptionMessageReceiver
524-
var onSubscribe: RedisSubscriptionChangeHandler? // will be set to nil after first call
525-
let onUnsubscribe: RedisSubscriptionChangeHandler?
551+
var onSubscribe: RedisSubscribeHandler? // will be set to nil after first call
552+
let onUnsubscribe: RedisUnsubscribeHandler?
526553

527554
init(
528555
type: SubscriptionType,
529556
messageReceiver: @escaping RedisSubscriptionMessageReceiver,
530-
subscribeHandler: RedisSubscriptionChangeHandler?,
531-
unsubscribeHandler: RedisSubscriptionChangeHandler?
557+
subscribeHandler: RedisSubscribeHandler?,
558+
unsubscribeHandler: RedisUnsubscribeHandler?
532559
) {
533560
self.type = type
534561
self.onMessage = messageReceiver

Sources/RediStack/RedisClient.swift

+12-12
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public protocol RedisClient {
7171
eventLoop: EventLoop?,
7272
logger: Logger?,
7373
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
74-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
75-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
74+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
75+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
7676
) -> EventLoopFuture<Void>
7777

7878
/// Subscribes the client to the specified Redis channel name patterns, invoking the provided message receiver each time a message is published to
@@ -100,8 +100,8 @@ public protocol RedisClient {
100100
eventLoop: EventLoop?,
101101
logger: Logger?,
102102
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
103-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
104-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
103+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
104+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
105105
) -> EventLoopFuture<Void>
106106

107107
/// Unsubscribes the client from a specific Redis channel from receiving any future published messages.
@@ -194,8 +194,8 @@ extension RedisClient {
194194
eventLoop: EventLoop? = nil,
195195
logger: Logger? = nil,
196196
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
197-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
198-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
197+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
198+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
199199
) -> EventLoopFuture<Void> {
200200
return self.subscribe(to: channels, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
201201
}
@@ -205,8 +205,8 @@ extension RedisClient {
205205
eventLoop: EventLoop? = nil,
206206
logger: Logger? = nil,
207207
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
208-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
209-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
208+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
209+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
210210
) -> EventLoopFuture<Void> {
211211
return self.subscribe(to: channels, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
212212
}
@@ -216,8 +216,8 @@ extension RedisClient {
216216
eventLoop: EventLoop? = nil,
217217
logger: Logger? = nil,
218218
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
219-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
220-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
219+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
220+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
221221
) -> EventLoopFuture<Void> {
222222
return self.psubscribe(to: patterns, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
223223
}
@@ -227,8 +227,8 @@ extension RedisClient {
227227
eventLoop: EventLoop? = nil,
228228
logger: Logger? = nil,
229229
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
230-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
231-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
230+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
231+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
232232
) -> EventLoopFuture<Void> {
233233
return self.psubscribe(to: patterns, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
234234
}

Sources/RediStack/RedisConnection.swift

+6-6
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,8 @@ extension RedisConnection {
381381
eventLoop: EventLoop? = nil,
382382
logger: Logger? = nil,
383383
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
384-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
385-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
384+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
385+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
386386
) -> EventLoopFuture<Void> {
387387
return self._subscribe(.channels(channels), receiver, subscribeHandler, unsubscribeHandler, eventLoop, logger)
388388
}
@@ -392,17 +392,17 @@ extension RedisConnection {
392392
eventLoop: EventLoop? = nil,
393393
logger: Logger? = nil,
394394
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
395-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
396-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
395+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
396+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
397397
) -> EventLoopFuture<Void> {
398398
return self._subscribe(.patterns(patterns), receiver, subscribeHandler, unsubscribeHandler, eventLoop, logger)
399399
}
400400

401401
private func _subscribe(
402402
_ target: RedisSubscriptionTarget,
403403
_ receiver: @escaping RedisSubscriptionMessageReceiver,
404-
_ onSubscribe: RedisSubscriptionChangeHandler?,
405-
_ onUnsubscribe: RedisSubscriptionChangeHandler?,
404+
_ onSubscribe: RedisSubscribeHandler?,
405+
_ onUnsubscribe: RedisUnsubscribeHandler?,
406406
_ eventLoop: EventLoop?,
407407
_ logger: Logger?
408408
) -> EventLoopFuture<Void> {

Sources/RediStack/RedisConnectionPool.swift

+9-9
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,8 @@ extension RedisConnectionPool: RedisClient {
355355
eventLoop: EventLoop? = nil,
356356
logger: Logger? = nil,
357357
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
358-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
359-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
358+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
359+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
360360
) -> EventLoopFuture<Void> {
361361
return self._subscribe(
362362
using: {
@@ -380,8 +380,8 @@ extension RedisConnectionPool: RedisClient {
380380
eventLoop: EventLoop? = nil,
381381
logger: Logger? = nil,
382382
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
383-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
384-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
383+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
384+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
385385
) -> EventLoopFuture<Void> {
386386
return self._subscribe(
387387
using: {
@@ -425,8 +425,8 @@ extension RedisConnectionPool: RedisClient {
425425
}
426426

427427
private func _subscribe(
428-
using operation: @escaping (RedisConnection, @escaping RedisSubscriptionChangeHandler, Logger) -> EventLoopFuture<Void>,
429-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?,
428+
using operation: @escaping (RedisConnection, @escaping RedisUnsubscribeHandler, Logger) -> EventLoopFuture<Void>,
429+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?,
430430
eventLoop: EventLoop?,
431431
taskLogger: Logger?
432432
) -> EventLoopFuture<Void> {
@@ -438,11 +438,11 @@ extension RedisConnectionPool: RedisClient {
438438
self.pubsubConnection = connection
439439
}
440440

441-
let onUnsubscribe: RedisSubscriptionChangeHandler = { channelName, subCount in
442-
defer { unsubscribeHandler?(channelName, subCount) }
441+
let onUnsubscribe: RedisUnsubscribeHandler = { subscriptionDetails, eventSource in
442+
defer { unsubscribeHandler?(subscriptionDetails, eventSource) }
443443

444444
guard
445-
subCount == 0,
445+
subscriptionDetails.currentSubscriptionCount == 0,
446446
let connection = self.pubsubConnection
447447
else { return }
448448

Sources/RediStack/RedisLogging.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ internal struct CustomLoggerRedisClient<Client: RedisClient>: RedisClient {
109109
return self.client.punsubscribe(from: patterns, eventLoop: eventLoop, logger: logger)
110110
}
111111

112-
internal func subscribe(to channels: [RedisChannelName], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?, onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?) -> EventLoopFuture<Void> {
112+
internal func subscribe(to channels: [RedisChannelName], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscribeHandler?, onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?) -> EventLoopFuture<Void> {
113113
let logger = logger ?? self.defaultLogger
114114
return self.client.subscribe(to: channels, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
115115
}
116116

117-
internal func psubscribe(to patterns: [String], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?, onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?) -> EventLoopFuture<Void> {
117+
internal func psubscribe(to patterns: [String], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscribeHandler?, onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?) -> EventLoopFuture<Void> {
118118
let logger = logger ?? self.defaultLogger
119119
return self.client.psubscribe(to: patterns, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
120120
}

0 commit comments

Comments
 (0)