-
Notifications
You must be signed in to change notification settings - Fork 184
AWS Appsync support? #125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
It is not currently supported. Adding the |
There seems to be a tutorial for testing this implementation using a boilerplate React App: https://aws.amazon.com/blogs/mobile/appsync-realtime/ and https://docs.amplify.aws/lib/graphqlapi/getting-started/q/platform/js#mocking-and-local-testing But I couldn't find the protocol on their repository: https://github.com/aws/aws-appsync-community Only this blog post kinda explain it besides the one shared by the issue author: https://aws.amazon.com/blogs/mobile/appsync-realtime/ |
I have this implemented in very alpha form. I have tested it with OIDC credentials and it works. I need to test it with API Keys and IAM (sigv4) credentials and then I'll submit a pull request. While I was doing it (I needed it for a project I have), I noticed that extending the WebsocketsTransport is a bit clunky - requires a bit of copy-paste to insert new protocol stuff into the execution path. NBD, but it would be cleaner if WebsocketsTranport was refactored to allow easier sub-class access to the execution path... |
To further elucidate - Websockets in AppSync don't actually directly communicate with AppSync. They communicate with API Gateway, which then does an HTTPS redirect to AppSync in the backend. This makes setting up the socket a little confusing. Also, the start-ack protocol requires waiting on it's reception before starting the listening loop. Causes a bit of a change to the base class flow. I'll try to document the PR well enough to explain when I send it. I guess you guys will be the judge on my success... |
TODO: dive into the Appsync protocol and create another transport AWSAppsyncTransport which inherits WebsocketsTransport. Going to start looking into this. Would love to see your preliminary pull request @joseph-wortmann |
Ideally it would be possible to use the AWS AppSync SDK directly to facilitate this, but that might be too heavy. There's lots of tooling built directly into amplify for starting mock servers and we can explore that: https://docs.amplify.aws/cli/usage/mock/#function-mock-environment-variables An example of the websocket messages that transfer (for later reference): To experiment with this:
The above is a note for myself or anyone else who picks this up before I get back to it. Right now I'm setting up my local environment and re-establishing my AWS account so I can poke around here a little bit. Ideally I'll get my mock server setup during my next pomodoro and be able to report back on how the mocking stuff goes. |
Note that I've tried the above. It looks like Note this is just the websocket channel. Auth against AWS Amplify is not covered here. For the above to work, it looks like I'm already authenticated against AWS: Request URL: wss://wewzcj762zgwrfrfumqdgx4xru.appsync-realtime-api.us-east-1.amazonaws.com/graphql?header=&payload=e30= My decrypted JWT looks like:
So somehow to make this work we'd need the JWT for the request URL's GET param, and we'd need to handle the keep-alive (
The importance of Action Items:
|
Note that I will be able to commit about an hour or two a week to this. Have to balance this against my other existing commitments. I'll be back next weekend, I expect. |
@chadfurman I'm sorry that I haven't had time to come back to this. Been awhile since I touched the code that I was working on, but you are welcome to it as a starting point. I'd love to help, but swamped with other stuff at the moment. If you're still working on it when I get back to the thing that forced me into this interesting corner, I'll be glad to lend a hand. One thing to note is that AppSync actually goes through ApiGateway for all of it's access, leading to some pretty strange enveloping of the AppSync access credentials as they are proxied through ApiGateway. My code (no quality guarantees) is below. As I recall, I tested the API Key and OIDC auths (but it has been a minute and my memory isn't what it once was. Also, as I recall, there was some stuff in the underlying import hmac
import json
from abc import ABC, abstractmethod
from asyncio import CancelledError, wait_for, TimeoutError, ensure_future
from base64 import b64encode
from datetime import datetime
from hashlib import sha256
from logging import getLogger
from ssl import SSLContext
from typing import Any, AsyncGenerator, Dict, Optional, Union, cast
from boto3.session import Session
from gql.transport.websockets import ListenerQueue, WebsocketsTransport
from graphql import DocumentNode, ExecutionResult, print_ast
from websockets import connect as wsconnect
from websockets.typing import Subprotocol
from websockets.exceptions import ConnectionClosed
from gql.transport.exceptions import (
TransportAlreadyConnected,
TransportProtocolError,
)
_LOG = getLogger(__name__)
class AppSyncAuthorization(ABC):
def on_connect(self) -> str:
return b64encode(
json.dumps(self.on_subscribe(), separators=(",", ":")).encode()
).decode()
@abstractmethod
def on_subscribe(self, data: Optional[str] = None) -> Dict:
raise NotImplementedError()
def _encodeHeader(self, header: Dict) -> str:
return b64encode(json.dumps(header, separators=(",", ":")).encode()).decode()
class AppSyncApiKeyAuthorization(AppSyncAuthorization):
def __init__(self, host: str, api_key: str) -> None:
self.host = host
self.api_key = api_key
def on_subscribe(self, data: Optional[str] = None) -> Dict:
return {"host": self.host, "x-api-key": self.api_key}
class AppSyncOIDCAuthorization(AppSyncAuthorization):
def __init__(self, host: str, token: str) -> None:
self.host = host
self.token = token
def on_subscribe(self, data: Optional[str] = None) -> Dict:
return {"host": self.host, "Authorization": self.token}
class AppSyncIAMAuthorization(AppSyncAuthorization):
def __init__(self, host: str) -> None:
self.host = host
self.session = Session()
self.region = self.host.split(".")[2]
self.signed_headers = (
"accept;content-encoding;content-type;host;x-amz-date;x-amz-security-token"
)
def on_subscribe(self, data: Optional[str] = None) -> Dict:
utc_now = datetime.utcnow()
amz_date = utc_now.strftime("%Y%m%dT%H%M%SZ")
date_stamp = utc_now.strftime("%Y%m%d")
credentials = self.session.get_credentials()
return {
"accept": "application/json, text/javascript",
"content-encoding": "amz-1.0",
"content-type": "application/json; charset=UTF-8",
"host": self.host,
"x-amz-date": amz_date,
"X-Amz-Security-Token": credentials.token,
"Authorization": self._sigv4(amz_date, date_stamp, credentials, data),
}
def _sigv4(
self, amz_date, date_stamp, credentials, data: Optional[str] = None
) -> str:
def getSignatureKey(key, date_stamp, regionName, serviceName):
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), sha256).digest()
kDate = sign(f"AWS4 {key}".encode("utf-8"), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, "aws4_request")
return kSigning
# Create a date for headers and the credential string
credentials_scope = f"{date_stamp}/{self.region}/appsync/aws4_request"
canonical_request = f"""POST
/graphql{"/connect" if data else ""}
accept:application/json, text/javascript
content-encoding:amz-1.0
content-type:application/json; charset=UTF-8
host:{self.host}
x-amz-date:{amz_date}
x-amz-security-token:{credentials.token}
{self.signed_headers}
{sha256((data or "{}").encode('utf-8')).hexdigest()}
"""
string_to_sign = f"""AWS4-HMAC-SHA256
{amz_date}
{credentials_scope}
{sha256(canonical_request.encode('utf-8')).hexdigest()}"""
signature = hmac.new(
getSignatureKey(
credentials.secret_key,
date_stamp,
self.region,
"appsync",
),
string_to_sign.encode("utf-8"),
sha256,
).hexdigest()
return f"AWS4-HMAC-SHA256 Credential={credentials.access_key}/{credentials_scope},SignedHeaders={self.signed_headers},Signature={signature}"
class AppSyncWebsocketsTransport(WebsocketsTransport):
def __init__(
self,
url: str,
authorization: AppSyncAuthorization,
ssl: Union[SSLContext, bool] = False,
connect_timeout: int = 10,
close_timeout: int = 10,
ack_timeout: int = 10,
connect_args: Dict[str, Any] = {},
) -> None:
self.authorization = authorization
super().__init__(
url,
ssl=ssl,
connect_timeout=connect_timeout,
close_timeout=close_timeout,
ack_timeout=ack_timeout,
connect_args=connect_args,
)
async def _wait_start_ack(self) -> None:
"""Wait for the start_ack message. Keep alive messages are ignored"""
while True:
answer_type = str(json.loads(await self._receive()).get("type"))
if answer_type == "start_ack":
return
if answer_type != "ka":
raise TransportProtocolError(
"AppSync server did not return a start ack"
)
async def _send_start_and_wait_ack(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, str]] = None,
) -> int:
query_id = self.next_query_id
self.next_query_id += 1
data = {"query": print_ast(document)}
if variable_values:
data["variables"] = variable_values
data = json.dumps(data, separators=(",", ":"))
await self._send(
json.dumps(
{
"id": str(query_id),
"type": "start",
"payload": {
"data": data,
"extensions": {
"authorization": self.authorization.on_subscribe(data)
},
},
},
separators=(",", ":"),
)
)
# Wait for the connection_ack message or raise a TimeoutError
await wait_for(self._wait_start_ack(), self.ack_timeout)
# Create a task to listen to the incoming websocket messages
self.receive_data_task = ensure_future(self._receive_data_loop())
return query_id
async def connect(self) -> None:
"""Coroutine which will:
- connect to the websocket address
- send the init message
- wait for the connection acknowledge from the server
- create an asyncio task which will be used to receive
and parse the websocket answers
Should be cleaned with a call to the close coroutine
"""
GRAPHQLWS_SUBPROTOCOL: Subprotocol = cast(Subprotocol, "graphql-ws")
_LOG.debug("connect: starting")
if self.websocket is None and not self._connecting:
# Set connecting to True to avoid a race condition if user is trying
# to connect twice using the same client at the same time
self._connecting = True
# If the ssl parameter is not provided,
# generate the ssl value depending on the url
ssl: Optional[Union[SSLContext, bool]]
if self.ssl:
ssl = self.ssl
else:
ssl = True if self.url.startswith("wss") else None
# Set default arguments used in the websockets.connect call
connect_args: Dict[str, Any] = {
"ssl": ssl,
"extra_headers": self.headers,
"subprotocols": [GRAPHQLWS_SUBPROTOCOL],
}
# Adding custom parameters passed from init
connect_args.update(self.connect_args)
# Connection to the specified url
# Generate a TimeoutError if taking more than connect_timeout seconds
# Set the _connecting flag to False after in all cases
try:
self.websocket = await wait_for(
wsconnect(
f'{self.url.replace("https","wss").replace("appsync-api","appsync-realtime-api")}?header={self.authorization.on_connect()}&payload=e30=',
**connect_args,
),
self.connect_timeout,
)
finally:
self._connecting = False
self.next_query_id = 1
self.close_exception = None
self._wait_closed.clear()
# Send the init message and wait for the ack from the server
# Note: This will generate a TimeoutError
# if no ACKs are received within the ack_timeout
try:
await self._send_init_message_and_wait_ack()
except ConnectionClosed as e:
raise e
except (TransportProtocolError, TimeoutError) as e:
await self._fail(e, clean_close=False)
raise e
else:
raise TransportAlreadyConnected("Transport is already connected")
_LOG.debug("connect: done")
async def subscribe(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, str]] = None,
send_stop: Optional[bool] = None,
) -> AsyncGenerator[ExecutionResult, None]:
# Send the query and receive the id
query_id: int = await self._send_start_and_wait_ack(document, variable_values)
# Create a queue to receive the answers for this query_id
listener = ListenerQueue(query_id, send_stop=(send_stop is True))
self.listeners[query_id] = listener
# We will need to wait at close for this query to clean properly
self._no_more_listeners.clear()
try:
# Loop over the received answers
while True:
# Wait for the answer from the queue of this query_id
# This can raise a TransportError or ConnectionClosed exception.
answer_type, execution_result = await listener.get()
# If the received answer contains data,
# Then we will yield the results back as an ExecutionResult object
if execution_result is not None:
yield execution_result
# If we receive a 'complete' answer from the server,
# Then we will end this async generator output without errors
elif answer_type == "complete":
_LOG.debug(
f"Complete received for query {query_id} --> exit without error"
)
break
except (CancelledError, GeneratorExit) as e:
_LOG.debug("Exception in subscribe: " + repr(e))
if listener.send_stop:
await self._send_stop_message(query_id)
listener.send_stop = False
finally:
del self.listeners[query_id]
if len(self.listeners) == 0:
self._no_more_listeners.set() |
Noted with thanks. I will review and clean up a bit next weekend. Are you sure there's no way to just piggy-back the auth layer or import it from AWS SDKs? |
@chadfurman Unfortunately, I don't think so. As I said, AppSync is proxied through ApiGateway. That is also true for the WebSockets portion. As you can see in the code, you have to wrap the AppSync credentials into a SigV4 envelope for IAM to send to the "hidden" ApiGateway. It's kind of a PITA, and not very well documented in the AWS docs (or at least it wasn't when I wrote this). |
goals for this weekend:
|
Re: Above Code:
2. How is Authentication in the above code handled?
3. How may the provided code be improved/extended now or in the future?
|
Re: AppSync Code:
2. Where are the calls necessary to get the token/signature keys?
3. What are the pieces of the authentication system that we'd like to replicate?
|
Re: Existing code
2. Where/How would we add
3. Where / how might we handle the various message types and error states?
|
Action Items for next weekend:
From there, I will respond to comments and address any outstanding issues and requests in an effort to get this merged. |
@joseph-wortmann Thanks for your example code, it is really helpful.
I think you mean add an auth property to the new AWSAppsyncTransport no? There is no point to add it to the current WebsocketsTransport class if it is not used there. We should try to reuse the
Note: if you need to add dependencies, you should keep them separate in a new "aws" extra dependency (put it in a
For http requests, we should indicate to users that they could use the default AIOHTTPTransport, with custom headers if necessary. I think we should rename the |
Possibly, yes. My main concern is not having to copy/paste lots of code to add an Auth hook. If we added an optional auth hook to WebsocketTransport that would make it easier to build custom protocols around it, that could be helpful. Not 100% sure on this, though, and definitely open to suggestions and guidance.
My concern here is that Auth for AppSync is more than just a URL. They're adding the "extensions" field to the messages with signed data including timestamps etc which I think have to be updated/maintained as messages fly back and forth. I need to look into this more, but something in WebsocketTransport that would allow us to hook the URL for sure but also the messages themselves could be helpful. If not an "auth" property (because it's somewhat specific), then a more generic hook or set of hooks which would allow for less copy/paste of connect/subscribe stuff.
Noted. Makes sense! Thank you :)
Aye, when/if doing segmentation then I should really try to re-use the existing transports. I'm thinking that I'll let people pass in the other transports they want to use, as I expect that I will have to wrap the AIOHTTPTransport for ApiGateway auth shenanigans as well.
Could you elaborate on this? I don't quite follow. I'm open to this change, for sure. I think this might go back to the conversation above re: hooks in WebsocketTransport w.r.t. adding auth? We may also want to make sure any hooks we add are consistent in a way that can also be implemented in the AIOHTTPTransport w.r.t. adding auth... TBD |
I think that can be done by simply overloading the
I noted that the authorization methods of @joseph-wortmann are only returning a dictionnary of elements which are looking like http headers, which are then used:
I don't really know how different is the Authorization code between websockets or http with aws appsync, the doc is not clear. But if we could reuse the AppSyncAuthorization subclasses, that would be great.
|
Thank you for clarifying. I'll follow up on this next weekend and try to get a draft PR open with your suggestions. |
Following up on stated goals from last weekend:
Re: #1:
Re: #2:
Goals for today
|
Leaving this here for reference: https://aws.amazon.com/blogs/mobile/appsync-websockets-python/ |
Did not get to #3 above due to complexities in Next Steps:
|
Added the PR: #239 I'll check back in throughout the week for any discussion and will pick away at it again next weekend. Note that we should probably keep discussion on the PR thread, now, for code changes. |
Note that the PR is just about ready. I've added what I think are all the necessary code changes to get it to work. Currently, I have not tested it (manually or automatically), and there are no instructions for it. I will be following up next weekend to both test it and improve the documentation. Anything I find in that process will flow over into the following week and from there hopefully we'll be able to wrap this up. If I get really ambitious I'll try and carve out some time throughout the week to pick away. I'm tracking my efforts in the PR thread, if you're curious, including when I start and stop each session (for no other reason than my own edification, measurements, and improvements). |
This is now available in version 3.0.0rc0 |
AWS Appsync seems to require an
extensions
field on the payload. Does the WebsocketsTransport support this?https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html
The text was updated successfully, but these errors were encountered: