Skip to content

Commit 2c6f087

Browse files
gklijssmyrick
authored andcommitted
Prevent starting second subscription with the same id, remove he complete message as it's not needed, and probably preventing flux from closing. (ExpediaGroup#520)
1 parent 178c953 commit 2c6f087

File tree

3 files changed

+80
-15
lines changed

3 files changed

+80
-15
lines changed

graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/execution/ApolloSubscriptionProtocolHandler.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,7 @@ class ApolloSubscriptionProtocolHandler(
6565
return ackowledgeMessageFlux.concatWith(keepAliveFlux)
6666
}
6767
GQL_START.type -> return startSubscription(operationMessage, session)
68-
GQL_STOP.type -> {
69-
sessionState.stopOperation(session, operationMessage)
70-
return Flux.empty()
71-
}
68+
GQL_STOP.type -> return sessionState.stopOperation(session, operationMessage)
7269
GQL_CONNECTION_TERMINATE.type -> {
7370
sessionState.terminateSession(session)
7471
return Flux.empty()
@@ -107,6 +104,11 @@ class ApolloSubscriptionProtocolHandler(
107104
return Flux.just(basicConnectionErrorMessage)
108105
}
109106

107+
if (sessionState.operationExists(session, operationMessage)) {
108+
logger.info("Already subscribed to operation ${operationMessage.id} for session ${session.id}")
109+
return Flux.empty()
110+
}
111+
110112
val payload = operationMessage.payload
111113

112114
if (payload == null) {
@@ -125,7 +127,6 @@ class ApolloSubscriptionProtocolHandler(
125127
SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it)
126128
}
127129
}
128-
.concatWith(Flux.just(SubscriptionOperationMessage(type = SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE.type, id = operationMessage.id)))
129130
.doOnSubscribe { sessionState.saveOperation(session, operationMessage, it) }
130131
} catch (exception: Exception) {
131132
logger.error("Error running graphql subscription", exception)

graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/execution/ApolloSubscriptionSessionState.kt

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package com.expediagroup.graphql.spring.execution
1818

1919
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage
20+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE
2021
import org.reactivestreams.Subscription
2122
import org.springframework.web.reactive.socket.WebSocketSession
23+
import reactor.core.publisher.Flux
2224
import java.util.concurrent.ConcurrentHashMap
2325

2426
internal class ApolloSubscriptionSessionState {
@@ -53,16 +55,19 @@ internal class ApolloSubscriptionSessionState {
5355
/**
5456
* Stop the subscription sending data. Does NOT terminate the session.
5557
*/
56-
fun stopOperation(session: WebSocketSession, operationMessage: SubscriptionOperationMessage) {
58+
fun stopOperation(session: WebSocketSession, operationMessage: SubscriptionOperationMessage): Flux<SubscriptionOperationMessage> {
5759
if (operationMessage.id != null) {
5860
val operationsForSession = activeOperations[session.id]
59-
operationsForSession?.get(operationMessage.id)?.cancel()
60-
operationsForSession?.remove(operationMessage.id)
61-
62-
if (operationsForSession?.isEmpty() == true) {
63-
activeOperations.remove(session.id)
61+
operationsForSession?.get(operationMessage.id)?.let {
62+
it.cancel()
63+
operationsForSession.remove(operationMessage.id)
64+
if (operationsForSession.isEmpty()) {
65+
activeOperations.remove(session.id)
66+
}
67+
return Flux.just(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = operationMessage.id))
6468
}
6569
}
70+
return Flux.empty()
6671
}
6772

6873
/**
@@ -75,4 +80,10 @@ internal class ApolloSubscriptionSessionState {
7580
activeKeepAliveSessions.remove(session.id)
7681
session.close()
7782
}
83+
84+
/**
85+
* Looks up the operation for the client, to check if it already exists
86+
*/
87+
fun operationExists(session: WebSocketSession, operationMessage: SubscriptionOperationMessage): Boolean =
88+
activeOperations[session.id]?.containsKey(operationMessage.id) ?: false
7889
}

graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/spring/execution/ApolloSubscriptionProtocolHandlerTest.kt

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,11 @@ class ApolloSubscriptionProtocolHandlerTest {
244244
fun `Return GQL_CONNECTION_ERROR when sending GQL_START but id is null`() {
245245
val config: GraphQLConfigurationProperties = mockk()
246246
val operationMessage = SubscriptionOperationMessage(type = GQL_START.type, id = null).toJson()
247-
val session: WebSocketSession = mockk()
247+
val mockSession: WebSocketSession = mockk { every { id } returns "123" }
248248
val subscriptionHandler: SubscriptionHandler = mockk()
249249

250250
val handler = ApolloSubscriptionProtocolHandler(config, subscriptionHandler, objectMapper)
251-
val flux = handler.handle(operationMessage, session)
251+
val flux = handler.handle(operationMessage, mockSession)
252252

253253
val message = flux.blockFirst(Duration.ofSeconds(2))
254254
assertNotNull(message)
@@ -316,7 +316,60 @@ class ApolloSubscriptionProtocolHandlerTest {
316316
val graphQLResponse: GraphQLResponse = objectMapper.convertValue(payload)
317317
assertEquals(expected = "myData", actual = graphQLResponse.data)
318318

319-
assertEquals(expected = 2, actual = flux.count().block())
319+
assertEquals(expected = 1, actual = flux.count().block())
320+
verify(exactly = 0) { session.close() }
321+
}
322+
323+
@Test
324+
fun `Return GQL_COMPLETE when sending GQL_STOP with GraphQLRequest having operation id of running operation`() {
325+
val config: GraphQLConfigurationProperties = mockk()
326+
val graphQLRequest = GraphQLRequest("{ message }")
327+
val startRequest = SubscriptionOperationMessage(type = GQL_START.type, id = "abc", payload = graphQLRequest).toJson()
328+
val stopRequest = SubscriptionOperationMessage(type = GQL_STOP.type, id = "abc").toJson()
329+
val session: WebSocketSession = mockk {
330+
every { close() } returns mockk()
331+
every { id } returns "123"
332+
}
333+
val subscriptionHandler: SubscriptionHandler = mockk {
334+
every { executeSubscription(eq(graphQLRequest)) } returns Flux.just(GraphQLResponse("myData"))
335+
}
336+
337+
val handler = ApolloSubscriptionProtocolHandler(config, subscriptionHandler, objectMapper)
338+
val startFlux = handler.handle(startRequest, session)
339+
startFlux.blockFirst(Duration.ofSeconds(2))
340+
val stopFlux = handler.handle(stopRequest, session)
341+
342+
StepVerifier.create(stopFlux)
343+
.expectSubscription()
344+
.expectNextMatches { it.type == "complete" }
345+
.thenCancel()
346+
.verify()
347+
348+
assertEquals(expected = 1, actual = startFlux.count().block())
349+
assertEquals(expected = 1, actual = stopFlux.count().block())
350+
verify(exactly = 0) { session.close() }
351+
}
352+
353+
@Test
354+
fun `Dont start second subscription when operation id is already in activeOperations`() {
355+
val config: GraphQLConfigurationProperties = mockk()
356+
val graphQLRequest = GraphQLRequest("{ message }")
357+
val operationMessage = SubscriptionOperationMessage(type = SubscriptionOperationMessage.ClientMessages.GQL_START.type, id = "abc", payload = graphQLRequest).toJson()
358+
val session: WebSocketSession = mockk {
359+
every { close() } returns mockk()
360+
every { id } returns "123"
361+
}
362+
val subscriptionHandler: SubscriptionHandler = mockk {
363+
every { executeSubscription(eq(graphQLRequest)) } returns Flux.just(GraphQLResponse("myData"))
364+
}
365+
366+
val handler = ApolloSubscriptionProtocolHandler(config, subscriptionHandler, objectMapper)
367+
val flux = handler.handle(operationMessage, session)
368+
flux.blockFirst(Duration.ofSeconds(2))
369+
val fluxTwo = handler.handle(operationMessage, session)
370+
371+
assertEquals(expected = 1, actual = flux.count().block())
372+
assertEquals(expected = 0, actual = fluxTwo.count().block())
320373
verify(exactly = 0) { session.close() }
321374
}
322375

@@ -337,7 +390,7 @@ class ApolloSubscriptionProtocolHandlerTest {
337390
val handler = ApolloSubscriptionProtocolHandler(config, subscriptionHandler, objectMapper)
338391
val flux = handler.handle(operationMessage, session)
339392

340-
assertEquals(expected = 2, actual = flux.count().block())
393+
assertEquals(expected = 1, actual = flux.count().block())
341394
val message = flux.blockFirst(Duration.ofSeconds(2))
342395
assertNotNull(message)
343396
assertEquals(expected = GQL_ERROR.type, actual = message.type)

0 commit comments

Comments
 (0)