@@ -49,7 +49,7 @@ class ApolloSubscriptionProtocolHandler(
49
49
private val objectMapper : ObjectMapper
50
50
) {
51
51
// Sessions are saved by web socket session id
52
- private val activeSessions = ConcurrentHashMap <String , Subscription >()
52
+ private val activeKeepAliveSessions = ConcurrentHashMap <String , Subscription >()
53
53
// Operations are saved by web socket session id, then operation id
54
54
private val activeOperations = ConcurrentHashMap <String , ConcurrentHashMap <String , Subscription >>()
55
55
@@ -71,7 +71,10 @@ class ApolloSubscriptionProtocolHandler(
71
71
// Send the GQL_CONNECTION_KEEP_ALIVE message every interval until the connection is closed or terminated
72
72
val keepAliveFlux = Flux .interval(Duration .ofMillis(keepAliveInterval))
73
73
.map { keepAliveMessage }
74
- .doOnSubscribe { activeSessions[session.id] = it }
74
+ .doOnSubscribe {
75
+ logger.debug(" GraphQL subscription INIT, sessionId=${session.id} activeSessions=${activeKeepAliveSessions.count()} " )
76
+ activeKeepAliveSessions[session.id] = it
77
+ }
75
78
76
79
return flux.concatWith(keepAliveFlux)
77
80
}
@@ -102,14 +105,14 @@ class ApolloSubscriptionProtocolHandler(
102
105
@Suppress(" Detekt.TooGenericExceptionCaught" )
103
106
private fun startSubscription (operationMessage : SubscriptionOperationMessage , session : WebSocketSession ): Flux <SubscriptionOperationMessage > {
104
107
if (operationMessage.id == null ) {
105
- logger.error(" Operation id is required" )
108
+ logger.error(" GraphQL subscription operation id is required" )
106
109
return Flux .just(basicConnectionErrorMessage)
107
110
}
108
111
109
112
val payload = operationMessage.payload
110
113
111
114
if (payload == null ) {
112
- logger.error(" Payload was null instead of a GraphQLRequest object" )
115
+ logger.error(" GraphQL subscription payload was null instead of a GraphQLRequest object" )
113
116
stopSubscription(operationMessage, session)
114
117
return Flux .just(SubscriptionOperationMessage (type = GQL_CONNECTION_ERROR .type, id = operationMessage.id))
115
118
}
@@ -126,11 +129,11 @@ class ApolloSubscriptionProtocolHandler(
126
129
}
127
130
.concatWith(Flux .just(SubscriptionOperationMessage (type = GQL_COMPLETE .type, id = operationMessage.id)))
128
131
.doOnSubscribe {
129
- logger.trace( " WebSocket GraphQL subscription subscribe, WebSocketSessionID =${session.id} OperationMessageID =${operationMessage.id} " )
132
+ logger.debug( " GraphQL subscription START, sessionId =${session.id} operationId =${operationMessage.id} " )
130
133
activeOperations[session.id]?.put(operationMessage.id, it)
131
134
}
132
- .doOnCancel { logger.trace( " WebSocket GraphQL subscription cancel, WebSocketSessionID =${session.id} OperationMessageID =${operationMessage.id} " ) }
133
- .doOnComplete { logger.trace( " WebSocket GraphQL subscription complete, WebSocketSessionID =${session.id} OperationMessageID =${operationMessage.id} " ) }
135
+ .doOnCancel { logger.debug( " GraphQL subscription CANCEL, sessionId =${session.id} operationId =${operationMessage.id} " ) }
136
+ .doOnComplete { logger.debug( " GraphQL subscription COMPELTE, sessionId =${session.id} operationId =${operationMessage.id} " ) }
134
137
} catch (exception: Exception ) {
135
138
logger.error(" Error running graphql subscription" , exception)
136
139
stopSubscription(operationMessage, session)
@@ -139,6 +142,7 @@ class ApolloSubscriptionProtocolHandler(
139
142
}
140
143
141
144
private fun stopSubscription (operationMessage : SubscriptionOperationMessage , session : WebSocketSession ) {
145
+ logger.debug(" GraphQL subscription STOP, sessionId=${session.id} operationId=${operationMessage.id} " )
142
146
if (operationMessage.id != null ) {
143
147
val operationsForSession = activeOperations[session.id]
144
148
operationsForSession?.get(operationMessage.id)?.cancel()
@@ -147,10 +151,11 @@ class ApolloSubscriptionProtocolHandler(
147
151
}
148
152
149
153
private fun terminateSession (session : WebSocketSession ) {
154
+ logger.debug(" GraphQL subscription TERMINATE, sessionId=${session.id} " )
150
155
activeOperations[session.id]?.forEach { _, subscription -> subscription.cancel() }
151
156
activeOperations.remove(session.id)
152
- activeSessions [session.id]?.cancel()
153
- activeSessions .remove(session.id)
157
+ activeKeepAliveSessions [session.id]?.cancel()
158
+ activeKeepAliveSessions .remove(session.id)
154
159
session.close()
155
160
}
156
161
}
0 commit comments