Skip to content

Commit d9882fc

Browse files
ecthiender0x777
authored andcommitted
fix remote queries/mutations to work over websocket (fix hasura#1619) (hasura#1621)
1 parent da4ed1a commit d9882fc

File tree

2 files changed

+161
-32
lines changed

2 files changed

+161
-32
lines changed

server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs

+42-11
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import Control.Concurrent (threadDelay)
2626
import Data.ByteString (ByteString)
2727
import qualified Data.IORef as IORef
2828

29+
import Hasura.GraphQL.Context (GCtx)
2930
import Hasura.GraphQL.Resolve (resolveSelSet)
3031
import Hasura.GraphQL.Resolve.Context (LazyRespTx)
3132
import qualified Hasura.GraphQL.Resolve.LiveQuery as LQ
@@ -239,20 +240,13 @@ onStart serverEnv wsConn (StartMsg opId q) msgRaw = catchAndIgnore $ do
239240
either (\(QErr _ _ err _ _) -> withComplete $ sendConnErr err) return res'
240241

241242
case typeLocs of
242-
[] -> runHasuraQ userInfo gCtx queryParts
243-
243+
[] -> runHasuraQ userInfo gCtx queryParts
244244
(typeLoc:_) -> case typeLoc of
245-
VT.HasuraType ->
246-
runHasuraQ userInfo gCtx queryParts
247-
VT.RemoteType _ rsi -> do
248-
when (G._todType opDef == G.OperationTypeSubscription) $
249-
withComplete $ sendConnErr "subscription to remote server is not supported"
250-
resp <- runExceptT $ TH.runRemoteGQ httpMgr userInfo reqHdrs
251-
msgRaw rsi opDef
252-
either postExecErr sendSuccResp resp
253-
sendCompleted
245+
VT.HasuraType -> runHasuraQ userInfo gCtx queryParts
246+
VT.RemoteType _ rsi -> runRemoteQ userInfo reqHdrs opDef rsi
254247

255248
where
249+
runHasuraQ :: UserInfo -> GCtx -> QueryParts -> ExceptT () IO ()
256250
runHasuraQ userInfo gCtx queryParts = do
257251
(opTy, fields) <- either (withComplete . preExecErr) return $
258252
runReaderT (validateGQ queryParts) gCtx
@@ -270,8 +264,30 @@ onStart serverEnv wsConn (StartMsg opId q) msgRaw = catchAndIgnore $ do
270264
either postExecErr sendSuccResp resp
271265
sendCompleted
272266

267+
runRemoteQ :: UserInfo -> [H.Header]
268+
-> G.TypedOperationDefinition -> RemoteSchemaInfo
269+
-> ExceptT () IO ()
270+
runRemoteQ userInfo reqHdrs opDef rsi = do
271+
when (G._todType opDef == G.OperationTypeSubscription) $
272+
withComplete $ preExecErr $
273+
err400 NotSupported "subscription to remote server is not supported"
274+
275+
-- if it's not a subscription, use HTTP to execute the query on the remote
276+
-- server
277+
-- try to parse the (apollo protocol) websocket frame and get only the
278+
-- payload
279+
sockPayload <- onLeft (J.eitherDecode msgRaw) $
280+
const $ withComplete $ preExecErr $
281+
err500 Unexpected "invalid websocket payload"
282+
let payload = J.encode $ _wpPayload sockPayload
283+
resp <- runExceptT $ TH.runRemoteGQ httpMgr userInfo reqHdrs
284+
payload rsi opDef
285+
either postExecErr sendSuccResp resp
286+
sendCompleted
287+
273288

274289
WSServerEnv logger _ runTx lqMap gCtxMapRef httpMgr _ sqlGenCtx = serverEnv
290+
275291
wsId = WS.getWSId wsConn
276292
WSConnData userInfoR opMap = WS.getData wsConn
277293

@@ -429,3 +445,18 @@ createWSServerApp authMode serverEnv =
429445
(onConn (_wseLogger serverEnv) (_wseCorsPolicy serverEnv))
430446
(onMessage authMode serverEnv)
431447
(onClose (_wseLogger serverEnv) $ _wseLiveQMap serverEnv)
448+
449+
450+
-- | TODO:
451+
-- | The following ADT is required so that we can parse the incoming websocket
452+
-- | frame, and only pick the payload, for remote schema queries.
453+
-- | Ideally we should use `StartMsg` from Websocket.Protocol, but as
454+
-- | `GraphQLRequest` doesn't have a ToJSON instance we are using our own type to
455+
-- | get only the payload
456+
data WebsocketPayload
457+
= WebsocketPayload
458+
{ _wpId :: !Text
459+
, _wpType :: !Text
460+
, _wpPayload :: !J.Value
461+
} deriving (Show, Eq)
462+
$(J.deriveJSON (J.aesonDrop 3 J.snakeCase) ''WebsocketPayload)

server/tests-py/test_schema_stitching.py

+119-21
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
#!/usr/bin/env python3
22

3-
import pytest
3+
import string
4+
import random
45
import yaml
6+
import json
7+
import queue
58
import requests
69

10+
import pytest
11+
712
from validate import check_query_f, check_query
813

914

@@ -90,7 +95,8 @@ def test_add_remote_schema_with_interfaces(self, hge_ctx):
9095
st_code, resp = hge_ctx.v1q(q)
9196
assert st_code == 200, resp
9297
check_query_f(hge_ctx, self.dir + '/character_interface_query.yaml')
93-
hge_ctx.v1q({"type": "remove_remote_schema", "args": {"name": "my remote interface one"}})
98+
hge_ctx.v1q({"type": "remove_remote_schema",
99+
"args": {"name": "my remote interface one"}})
94100
assert st_code == 200, resp
95101

96102
def test_add_remote_schema_with_interface_err_empty_fields_list(self, hge_ctx):
@@ -102,23 +108,30 @@ def test_add_remote_schema_err_unknown_interface(self, hge_ctx):
102108
check_query_f(hge_ctx, self.dir + '/add_remote_schema_err_unknown_interface.yaml')
103109

104110
def test_add_remote_schema_with_interface_err_missing_field(self, hge_ctx):
105-
"""add a remote schema where an object implementing an interface does not have a field defined in the interface"""
111+
""" add a remote schema where an object implementing an interface does
112+
not have a field defined in the interface """
106113
check_query_f(hge_ctx, self.dir + '/add_remote_schema_err_missing_field.yaml')
107114

108115
def test_add_remote_schema_with_interface_err_wrong_field_type(self, hge_ctx):
109-
"""add a remote schema where an object implementing an interface have a field with the same name as in the interface, but of different type"""
116+
"""add a remote schema where an object implementing an interface have a
117+
field with the same name as in the interface, but of different type"""
110118
check_query_f(hge_ctx, self.dir + '/add_remote_schema_with_iface_err_wrong_field_type.yaml')
111119

112120
def test_add_remote_schema_with_interface_err_missing_arg(self, hge_ctx):
113-
"""add a remote schema where a field of an object implementing an interface does not have the argument defined in the same field of interface"""
121+
"""add a remote schema where a field of an object implementing an
122+
interface does not have the argument defined in the same field of
123+
interface"""
114124
check_query_f(hge_ctx, self.dir + '/add_remote_schema_err_missing_arg.yaml')
115125

116126
def test_add_remote_schema_with_interface_err_wrong_arg_type(self, hge_ctx):
117-
"""add a remote schema where the argument of a field of an object implementing the interface does not have the same type as the argument defined in the field of interface"""
127+
"""add a remote schema where the argument of a field of an object
128+
implementing the interface does not have the same type as the argument
129+
defined in the field of interface"""
118130
check_query_f(hge_ctx, self.dir + '/add_remote_schema_iface_err_wrong_arg_type.yaml')
119131

120132
def test_add_remote_schema_with_interface_err_extra_non_null_arg(self, hge_ctx):
121-
"""add a remote schema with a field of an object implementing interface having extra non_null argument"""
133+
"""add a remote schema with a field of an object implementing interface
134+
having extra non_null argument"""
122135
check_query_f(hge_ctx, self.dir + '/add_remote_schema_with_iface_err_extra_non_null_arg.yaml')
123136

124137
def test_add_remote_schema_with_union(self, hge_ctx):
@@ -226,6 +239,88 @@ def test_add_schema_same_type_containing_same_scalar(self, hge_ctx):
226239
assert st_code == 200, resp
227240

228241

242+
class TestRemoteSchemaQueriesOverWebsocket:
243+
dir = 'queries/remote_schemas'
244+
teardown = {"type": "clear_metadata", "args": {}}
245+
246+
@pytest.fixture(autouse=True)
247+
def transact(self, hge_ctx):
248+
st_code, resp = hge_ctx.v1q_f('queries/remote_schemas/tbls_setup.yaml')
249+
assert st_code == 200, resp
250+
yield
251+
# teardown
252+
st_code, resp = hge_ctx.v1q_f('queries/remote_schemas/tbls_teardown.yaml')
253+
assert st_code == 200, resp
254+
st_code, resp = hge_ctx.v1q(self.teardown)
255+
assert st_code == 200, resp
256+
257+
def _init(self, hge_ctx):
258+
payload = {'type': 'connection_init', 'payload': {}}
259+
if hge_ctx.hge_key:
260+
payload['payload']['headers'] = {
261+
'x-hasura-admin-secret': hge_ctx.hge_key
262+
}
263+
hge_ctx.ws.send(json.dumps(payload))
264+
ev = hge_ctx.get_ws_event(3)
265+
assert ev['type'] == 'connection_ack', ev
266+
267+
def _stop(self, hge_ctx, _id):
268+
data = {'id': _id, 'type': 'stop'}
269+
hge_ctx.ws.send(json.dumps(data))
270+
ev = hge_ctx.get_ws_event(3)
271+
assert ev['type'] == 'complete', ev
272+
273+
def _send_query(self, hge_ctx, query):
274+
self._init(hge_ctx)
275+
_id = gen_id()
276+
frame = {
277+
'id': _id,
278+
'type': 'start',
279+
'payload': {'query': query},
280+
}
281+
if hge_ctx.hge_key:
282+
frame['payload']['headers'] = {
283+
'x-hasura-admin-secret': hge_ctx.hge_key
284+
}
285+
hge_ctx.ws.send(json.dumps(frame))
286+
return _id
287+
288+
def test_remote_query(self, hge_ctx):
289+
self._init(hge_ctx)
290+
query = """
291+
query {
292+
user(id: 2) {
293+
id
294+
username
295+
}
296+
}
297+
"""
298+
_id = self._send_query(hge_ctx, query)
299+
ev = hge_ctx.get_ws_event(3)
300+
assert ev['type'] == 'data' and ev['id'] == _id, ev
301+
assert ev['payload']['data']['data']['user']['username'] == 'john'
302+
self._stop(hge_ctx, _id)
303+
304+
def test_remote_mutation(self, hge_ctx):
305+
self._init(hge_ctx)
306+
query = """
307+
mutation {
308+
createUser(id: 42, username: "foobar") {
309+
user {
310+
id
311+
username
312+
}
313+
}
314+
}
315+
"""
316+
_id = self._send_query(hge_ctx, query)
317+
ev = hge_ctx.get_ws_event(3)
318+
assert ev['type'] == 'data' and ev['id'] == _id, ev
319+
assert ev['payload']['data']['data']['createUser']['user']['id'] == 42
320+
assert ev['payload']['data']['data']['createUser']['user']['username'] == 'foobar'
321+
self._stop(hge_ctx, _id)
322+
323+
229324
class TestAddRemoteSchemaCompareRootQueryFields:
230325

231326
remote = 'http://localhost:5000/default-value-echo-graphql'
@@ -250,35 +345,35 @@ def test_schema_check_arg_default_values_and_field_and_arg_types(self, hge_ctx):
250345
introspect_remote = resp.json()
251346
assert resp.status_code == 200, introspect_remote
252347
remote_root_ty_info = get_query_root_info(introspect_remote)
253-
hasura_root_ty_Info = get_query_root_info(introspect_hasura)
254-
hasFld=dict()
348+
hasura_root_ty_info = get_query_root_info(introspect_hasura)
349+
has_fld = dict()
255350
for fr in remote_root_ty_info['fields']:
256-
hasFld[fr['name']] = False
257-
for fh in filter(lambda f: f['name'] == fr['name'], hasura_root_ty_Info['fields']):
258-
hasFld[fr['name']] = True
351+
has_fld[fr['name']] = False
352+
for fh in filter(lambda f: f['name'] == fr['name'], hasura_root_ty_info['fields']):
353+
has_fld[fr['name']] = True
259354
assert fr['type'] == fh['type'], yaml.dump({
260355
'error' : 'Types do not match for fld ' + fr['name'],
261356
'remote_type' : fr['type'],
262357
'hasura_type' : fh['type']
263358
})
264-
hasArg=dict()
359+
has_arg = dict()
265360
for ar in fr['args']:
266-
arPath = fr['name'] + '(' + ar['name'] + ':)'
267-
hasArg[arPath] = False
361+
arg_path = fr['name'] + '(' + ar['name'] + ':)'
362+
has_arg[arg_path] = False
268363
for ah in filter(lambda a: a['name'] == ar['name'], fh['args']):
269-
hasArg[arPath] = True
364+
has_arg[arg_path] = True
270365
assert ar['type'] == ah['type'], yaml.dump({
271-
'error' : 'Types do not match for arg ' + arPath,
366+
'error' : 'Types do not match for arg ' + arg_path,
272367
'remote_type' : ar['type'],
273368
'hasura_type' : ah['type']
274369
})
275370
assert ar['defaultValue'] == ah['defaultValue'], yaml.dump({
276-
'error' : 'Default values do not match for arg ' + arPath,
371+
'error' : 'Default values do not match for arg ' + arg_path,
277372
'remote_default_value' : ar['defaultValue'],
278373
'hasura_default_value' : ah['defaultValue']
279374
})
280-
assert hasArg[arPath], 'Argument ' + arPath + ' in the remote schema root query type not found in Hasura schema'
281-
assert hasFld[fr['name']], 'Field ' + fr['name'] + ' in the remote shema root query type not found in Hasura schema'
375+
assert has_arg[arg_path], 'Argument ' + arg_path + ' in the remote schema root query type not found in Hasura schema'
376+
assert has_fld[fr['name']], 'Field ' + fr['name'] + ' in the remote shema root query type not found in Hasura schema'
282377

283378

284379
# def test_remote_query_variables(self, hge_ctx):
@@ -297,7 +392,7 @@ def _filter(f, l):
297392

298393
def get_query_root_info(res):
299394
root_ty_name = res['data']['__schema']['queryType']['name']
300-
return list(filter(lambda ty: ty['name'] == root_ty_name, get_types(res) ))[0]
395+
return _filter(lambda ty: ty['name'] == root_ty_name, get_types(res))[0]
301396

302397
def get_types(res):
303398
return res['data']['__schema']['types']
@@ -322,3 +417,6 @@ def check_introspection_result(res, types, node_names):
322417
satisfy_node = False
323418

324419
return satisfy_node and satisfy_ty
420+
421+
def gen_id(size=6, chars=string.ascii_letters + string.digits):
422+
return ''.join(random.choice(chars) for _ in range(size))

0 commit comments

Comments
 (0)