|
18 | 18 | '{{"type":"next","id":"{query_id}","payload":{{"data":{{"number":{number}}}}}}}'
|
19 | 19 | )
|
20 | 20 |
|
21 |
| -COUNTING_DELAY = 2 * MS |
22 |
| -PING_SENDING_DELAY = 5 * MS |
23 |
| -PONG_TIMEOUT = 2 * MS |
| 21 | +COUNTING_DELAY = 20 * MS |
| 22 | +PING_SENDING_DELAY = 50 * MS |
| 23 | +PONG_TIMEOUT = 100 * MS |
24 | 24 |
|
25 | 25 | # List which can used to store received messages by the server
|
26 | 26 | logged_messages: List[str] = []
|
@@ -48,100 +48,140 @@ async def server_countdown_template(ws, path):
|
48 | 48 |
|
49 | 49 | count_found = search("count: {:d}", query)
|
50 | 50 | count = count_found[0]
|
51 |
| - print(f"Countdown started from: {count}") |
| 51 | + print(f" Server: Countdown started from: {count}") |
52 | 52 |
|
53 | 53 | pong_received: asyncio.Event = asyncio.Event()
|
54 | 54 |
|
55 | 55 | async def counting_coro():
|
56 |
| - for number in range(count, -1, -1): |
57 |
| - await ws.send( |
58 |
| - countdown_server_answer.format(query_id=query_id, number=number) |
59 |
| - ) |
60 |
| - await asyncio.sleep(COUNTING_DELAY) |
| 56 | + print(" Server: counting task started") |
| 57 | + try: |
| 58 | + for number in range(count, -1, -1): |
| 59 | + await ws.send( |
| 60 | + countdown_server_answer.format( |
| 61 | + query_id=query_id, number=number |
| 62 | + ) |
| 63 | + ) |
| 64 | + await asyncio.sleep(COUNTING_DELAY) |
| 65 | + finally: |
| 66 | + print(" Server: counting task ended") |
61 | 67 |
|
| 68 | + print(" Server: starting counting task") |
62 | 69 | counting_task = asyncio.ensure_future(counting_coro())
|
63 | 70 |
|
64 | 71 | async def keepalive_coro():
|
65 |
| - while True: |
66 |
| - await asyncio.sleep(PING_SENDING_DELAY) |
67 |
| - try: |
68 |
| - # Send a ping |
69 |
| - await WebSocketServerHelper.send_ping( |
70 |
| - ws, payload="dummy_ping_payload" |
71 |
| - ) |
72 |
| - |
73 |
| - # Wait for a pong |
| 72 | + print(" Server: keepalive task started") |
| 73 | + try: |
| 74 | + while True: |
| 75 | + await asyncio.sleep(PING_SENDING_DELAY) |
74 | 76 | try:
|
75 |
| - await asyncio.wait_for(pong_received.wait(), PONG_TIMEOUT) |
76 |
| - except asyncio.TimeoutError: |
77 |
| - print("\nNo pong received in time!\n") |
| 77 | + # Send a ping |
| 78 | + await WebSocketServerHelper.send_ping( |
| 79 | + ws, payload="dummy_ping_payload" |
| 80 | + ) |
| 81 | + |
| 82 | + # Wait for a pong |
| 83 | + try: |
| 84 | + await asyncio.wait_for( |
| 85 | + pong_received.wait(), PONG_TIMEOUT |
| 86 | + ) |
| 87 | + except asyncio.TimeoutError: |
| 88 | + print( |
| 89 | + "\n Server: No pong received in time!\n" |
| 90 | + ) |
| 91 | + break |
| 92 | + |
| 93 | + pong_received.clear() |
| 94 | + |
| 95 | + except websockets.exceptions.ConnectionClosed: |
78 | 96 | break
|
79 |
| - |
80 |
| - pong_received.clear() |
81 |
| - |
82 |
| - except websockets.exceptions.ConnectionClosed: |
83 |
| - break |
| 97 | + finally: |
| 98 | + print(" Server: keepalive task ended") |
84 | 99 |
|
85 | 100 | if keepalive:
|
| 101 | + print(" Server: starting keepalive task") |
86 | 102 | keepalive_task = asyncio.ensure_future(keepalive_coro())
|
87 | 103 |
|
88 | 104 | async def receiving_coro():
|
89 |
| - nonlocal counting_task |
90 |
| - while True: |
91 |
| - |
92 |
| - try: |
93 |
| - result = await ws.recv() |
94 |
| - logged_messages.append(result) |
95 |
| - except websockets.exceptions.ConnectionClosed: |
96 |
| - break |
97 |
| - |
98 |
| - json_result = json.loads(result) |
99 |
| - |
100 |
| - answer_type = json_result["type"] |
101 |
| - |
102 |
| - if answer_type == "complete" and json_result["id"] == str(query_id): |
103 |
| - print("Cancelling counting task now") |
104 |
| - counting_task.cancel() |
105 |
| - if keepalive: |
106 |
| - print("Cancelling keep alive task now") |
107 |
| - keepalive_task.cancel() |
108 |
| - |
109 |
| - elif answer_type == "ping": |
110 |
| - if answer_pings: |
111 |
| - payload = json_result.get("payload", None) |
112 |
| - await WebSocketServerHelper.send_pong(ws, payload=payload) |
| 105 | + print(" Server: receiving task started") |
| 106 | + try: |
| 107 | + nonlocal counting_task |
| 108 | + while True: |
113 | 109 |
|
114 |
| - elif answer_type == "pong": |
115 |
| - pong_received.set() |
| 110 | + try: |
| 111 | + result = await ws.recv() |
| 112 | + logged_messages.append(result) |
| 113 | + except websockets.exceptions.ConnectionClosed: |
| 114 | + break |
116 | 115 |
|
| 116 | + json_result = json.loads(result) |
| 117 | + |
| 118 | + answer_type = json_result["type"] |
| 119 | + |
| 120 | + if answer_type == "complete" and json_result["id"] == str( |
| 121 | + query_id |
| 122 | + ): |
| 123 | + print("Cancelling counting task now") |
| 124 | + counting_task.cancel() |
| 125 | + if keepalive: |
| 126 | + print("Cancelling keep alive task now") |
| 127 | + keepalive_task.cancel() |
| 128 | + |
| 129 | + elif answer_type == "ping": |
| 130 | + if answer_pings: |
| 131 | + payload = json_result.get("payload", None) |
| 132 | + await WebSocketServerHelper.send_pong( |
| 133 | + ws, payload=payload |
| 134 | + ) |
| 135 | + |
| 136 | + elif answer_type == "pong": |
| 137 | + pong_received.set() |
| 138 | + finally: |
| 139 | + print(" Server: receiving task ended") |
| 140 | + if keepalive: |
| 141 | + keepalive_task.cancel() |
| 142 | + |
| 143 | + print(" Server: starting receiving task") |
117 | 144 | receiving_task = asyncio.ensure_future(receiving_coro())
|
118 | 145 |
|
119 | 146 | try:
|
| 147 | + print(" Server: waiting for counting task to complete") |
120 | 148 | await counting_task
|
121 | 149 | except asyncio.CancelledError:
|
122 |
| - print("Now counting task is cancelled") |
| 150 | + print(" Server: Now counting task is cancelled") |
123 | 151 |
|
124 |
| - receiving_task.cancel() |
125 |
| - |
126 |
| - try: |
127 |
| - await receiving_task |
128 |
| - except asyncio.CancelledError: |
129 |
| - print("Now receiving task is cancelled") |
| 152 | + print(" Server: sending complete message") |
| 153 | + await WebSocketServerHelper.send_complete(ws, query_id) |
130 | 154 |
|
131 | 155 | if keepalive:
|
| 156 | + print(" Server: cancelling keepalive task") |
132 | 157 | keepalive_task.cancel()
|
133 | 158 | try:
|
134 | 159 | await keepalive_task
|
135 | 160 | except asyncio.CancelledError:
|
136 |
| - print("Now keepalive task is cancelled") |
| 161 | + print(" Server: Now keepalive task is cancelled") |
| 162 | + |
| 163 | + print(" Server: waiting for client to close the connection") |
| 164 | + try: |
| 165 | + await asyncio.wait_for(receiving_task, 1000 * MS) |
| 166 | + except asyncio.TimeoutError: |
| 167 | + pass |
| 168 | + |
| 169 | + print(" Server: cancelling receiving task") |
| 170 | + receiving_task.cancel() |
| 171 | + |
| 172 | + try: |
| 173 | + await receiving_task |
| 174 | + except asyncio.CancelledError: |
| 175 | + print(" Server: Now receiving task is cancelled") |
137 | 176 |
|
138 |
| - await WebSocketServerHelper.send_complete(ws, query_id) |
139 | 177 | except websockets.exceptions.ConnectionClosedOK:
|
140 | 178 | pass
|
141 | 179 | except AssertionError as e:
|
142 |
| - print(f"\nAssertion failed: {e!s}\n") |
| 180 | + print(f"\n Server: Assertion failed: {e!s}\n") |
143 | 181 | finally:
|
| 182 | + print(" Server: waiting for websocket connection to close") |
144 | 183 | await ws.wait_closed()
|
| 184 | + print(" Server: connection closed") |
145 | 185 |
|
146 | 186 | return server_countdown_template
|
147 | 187 |
|
@@ -406,6 +446,7 @@ async def test_graphqlws_subscription_with_keepalive(
|
406 | 446 | count -= 1
|
407 | 447 |
|
408 | 448 | assert count == -1
|
| 449 | + assert "ping" in session.transport.payloads |
409 | 450 | assert session.transport.payloads["ping"] == "dummy_ping_payload"
|
410 | 451 | assert (
|
411 | 452 | session.transport.payloads["connection_ack"] == "dummy_connection_ack_payload"
|
@@ -570,18 +611,19 @@ async def test_graphqlws_subscription_manual_pings_with_payload(
|
570 | 611 | number = result["number"]
|
571 | 612 | print(f"Number received: {number}")
|
572 | 613 |
|
573 |
| - assert number == count |
574 |
| - count -= 1 |
575 |
| - |
576 | 614 | payload = {"count_received": count}
|
577 | 615 |
|
578 | 616 | await transport.send_ping(payload=payload)
|
579 | 617 |
|
580 |
| - await transport.pong_received.wait() |
| 618 | + await asyncio.wait_for(transport.pong_received.wait(), 10000 * MS) |
| 619 | + |
581 | 620 | transport.pong_received.clear()
|
582 | 621 |
|
583 | 622 | assert transport.payloads["pong"] == payload
|
584 | 623 |
|
| 624 | + assert number == count |
| 625 | + count -= 1 |
| 626 | + |
585 | 627 | assert count == -1
|
586 | 628 |
|
587 | 629 |
|
|
0 commit comments