Skip to content

Commit 5ff3c27

Browse files
thrauwhummer
authored andcommitted
try to replace http2 server
1 parent 095a74d commit 5ff3c27

File tree

3 files changed

+265
-24
lines changed

3 files changed

+265
-24
lines changed

aws-replicator/aws_replicator/client/auth_proxy.py

+232-16
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,28 @@
55
import subprocess
66
import sys
77
from functools import cache
8+
from io import BytesIO
89
from typing import Dict, Optional, Tuple
910
from urllib.parse import urlparse, urlunparse
1011

1112
import boto3
1213
import requests
13-
from botocore.awsrequest import AWSPreparedRequest
14+
from botocore.awsrequest import AWSPreparedRequest, AWSResponse
15+
from botocore.httpchecksum import resolve_checksum_context
1416
from botocore.model import OperationModel
1517
from localstack import config
1618
from localstack import config as localstack_config
19+
from localstack.aws.api import HttpRequest
20+
from localstack.aws.chain import HandlerChain
21+
from localstack.aws.chain import RequestContext as AwsRequestContext
22+
from localstack.aws.gateway import Gateway
1723
from localstack.aws.protocol.parser import create_parser
1824
from localstack.aws.spec import load_service
1925
from localstack.config import external_service_url
2026
from localstack.constants import AWS_REGION_US_EAST_1, DOCKER_IMAGE_NAME_PRO
2127
from localstack.http import Request
28+
from localstack.http import Response as HttpResponse
29+
from localstack.http.hypercorn import GatewayServer
2230
from localstack.utils.aws.aws_responses import requests_response
2331
from localstack.utils.bootstrap import setup_logging
2432
from localstack.utils.collections import select_attributes
@@ -57,6 +65,207 @@
5765
DEFAULT_BIND_HOST = "127.0.0.1"
5866

5967

68+
class AwsProxyHandler:
69+
"""
70+
A handler for an AWS Handler chain that attempts to forward the request using a specific boto3 session.
71+
This can be used to proxy incoming requests to real AWS.
72+
"""
73+
74+
def __init__(self, session: boto3.Session = None):
75+
self.session = session or boto3.Session()
76+
77+
def __call__(self, chain: HandlerChain, context: AwsRequestContext, response: HttpResponse):
78+
# prepare the API invocation parameters
79+
LOG.info(
80+
"Received %s.%s = %s",
81+
context.service.service_name,
82+
context.operation.name,
83+
context.service_request,
84+
)
85+
86+
# make the actual API call against upstream AWS (will also calculate a new auth signature)
87+
try:
88+
aws_response = self._make_aws_api_call(context)
89+
except Exception:
90+
LOG.exception(
91+
"Exception while proxying %s.%s to AWS",
92+
context.service.service_name,
93+
context.operation.name,
94+
)
95+
raise
96+
97+
# tell the handler chain to respond
98+
LOG.info(
99+
"AWS Response %s.%s: url=%s status_code=%s, headers=%s, content=%s",
100+
context.service.service_name,
101+
context.operation.name,
102+
aws_response.url,
103+
aws_response.status_code,
104+
aws_response.headers,
105+
aws_response.content,
106+
)
107+
chain.respond(aws_response.status_code, aws_response.content, dict(aws_response.headers))
108+
109+
def _make_aws_api_call(self, context: AwsRequestContext) -> AWSResponse:
110+
# TODO: reconcile with AwsRequestProxy from localstack, and other forwarder tools
111+
# create a real AWS client
112+
client = self.session.client(context.service.service_name, region_name=context.region)
113+
operation_model = context.operation
114+
115+
# prepare API request parameters as expected by boto
116+
api_params = {k: v for k, v in context.service_request.items() if v is not None}
117+
118+
# this is a stripped down version of botocore's client._make_api_call to immediately get the HTTP
119+
# response instead of a parsed response.
120+
request_context = {
121+
"client_region": client.meta.region_name,
122+
"client_config": client.meta.config,
123+
"has_streaming_input": operation_model.has_streaming_input,
124+
"auth_type": operation_model.auth_type,
125+
}
126+
127+
(
128+
endpoint_url,
129+
additional_headers,
130+
properties,
131+
) = client._resolve_endpoint_ruleset(operation_model, api_params, request_context)
132+
if properties:
133+
# Pass arbitrary endpoint info with the Request
134+
# for use during construction.
135+
request_context["endpoint_properties"] = properties
136+
137+
request_dict = client._convert_to_request_dict(
138+
api_params=api_params,
139+
operation_model=operation_model,
140+
endpoint_url=endpoint_url,
141+
context=request_context,
142+
headers=additional_headers,
143+
)
144+
resolve_checksum_context(request_dict, operation_model, api_params)
145+
146+
if operation_model.has_streaming_input:
147+
request_dict["body"] = request_dict["body"].read()
148+
149+
self._adjust_request_dict(context.service.service_name, request_dict)
150+
151+
if operation_model.has_streaming_input:
152+
request_dict["body"] = BytesIO(request_dict["body"])
153+
154+
LOG.info("Making AWS request %s", request_dict)
155+
http, _ = client._endpoint.make_request(operation_model, request_dict)
156+
157+
http: AWSResponse
158+
159+
# for some elusive reasons, these header modifications are needed (were part of http2_server)
160+
http.headers.pop("Date", None)
161+
http.headers.pop("Server", None)
162+
if operation_model.has_streaming_output:
163+
http.headers.pop("Content-Length", None)
164+
165+
return http
166+
167+
def _adjust_request_dict(self, service_name: str, request_dict: Dict):
168+
"""Apply minor fixes to the request dict, which seem to be required in the current setup."""
169+
# TODO: replacing localstack-specific URLs, IDs, etc, should ideally be done in a more generalized
170+
# way.
171+
172+
req_body = request_dict.get("body")
173+
174+
# TODO: fix for switch between path/host addressing
175+
# Note: the behavior seems to be different across botocore versions. Seems to be working
176+
# with 1.29.97 (fix below not required) whereas newer versions like 1.29.151 require the fix.
177+
if service_name == "s3":
178+
body_str = run_safe(lambda: to_str(req_body)) or ""
179+
180+
request_url = request_dict["url"]
181+
url_parsed = list(urlparse(request_url))
182+
path_parts = url_parsed[2].strip("/").split("/")
183+
bucket_subdomain_prefix = f"://{path_parts[0]}.s3."
184+
if bucket_subdomain_prefix in request_url:
185+
prefix = f"/{path_parts[0]}"
186+
url_parsed[2] = url_parsed[2].removeprefix(prefix)
187+
request_dict["url_path"] = request_dict["url_path"].removeprefix(prefix)
188+
# replace empty path with "/" (seems required for signature calculation)
189+
request_dict["url_path"] = request_dict["url_path"] or "/"
190+
url_parsed[2] = url_parsed[2] or "/"
191+
# re-construct final URL
192+
request_dict["url"] = urlunparse(url_parsed)
193+
194+
# TODO: this custom fix should not be required - investigate and remove!
195+
if "<CreateBucketConfiguration" in body_str and "LocationConstraint" not in body_str:
196+
region = request_dict["context"]["client_region"]
197+
if region == AWS_REGION_US_EAST_1:
198+
request_dict["body"] = ""
199+
else:
200+
request_dict["body"] = (
201+
'<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
202+
f"<LocationConstraint>{region}</LocationConstraint></CreateBucketConfiguration>"
203+
)
204+
205+
if service_name == "sqs" and isinstance(req_body, dict):
206+
account_id = self._query_account_id_from_aws()
207+
if "QueueUrl" in req_body:
208+
queue_name = req_body["QueueUrl"].split("/")[-1]
209+
req_body["QueueUrl"] = f"https://queue.amazonaws.com/{account_id}/{queue_name}"
210+
if "QueueOwnerAWSAccountId" in req_body:
211+
req_body["QueueOwnerAWSAccountId"] = account_id
212+
if service_name == "sqs" and request_dict.get("url"):
213+
req_json = run_safe(lambda: json.loads(body_str)) or {}
214+
account_id = self._query_account_id_from_aws()
215+
queue_name = req_json.get("QueueName")
216+
if account_id and queue_name:
217+
request_dict["url"] = f"https://queue.amazonaws.com/{account_id}/{queue_name}"
218+
req_json["QueueOwnerAWSAccountId"] = account_id
219+
request_dict["body"] = to_bytes(json.dumps(req_json))
220+
221+
def _fix_headers(self, request: HttpRequest, service_name: str):
222+
if service_name == "s3":
223+
# fix the Host header, to avoid bucket addressing issues
224+
host = request.headers.get("Host") or ""
225+
regex = r"^(https?://)?([0-9.]+|localhost)(:[0-9]+)?"
226+
if re.match(regex, host):
227+
request.headers["Host"] = re.sub(regex, r"\1s3.localhost.localstack.cloud", host)
228+
request.headers.pop("Content-Length", None)
229+
request.headers.pop("x-localstack-request-url", None)
230+
request.headers.pop("X-Forwarded-For", None)
231+
request.headers.pop("X-Localstack-Tgt-Api", None)
232+
request.headers.pop("X-Moto-Account-Id", None)
233+
request.headers.pop("Remote-Addr", None)
234+
235+
@cache
236+
def _query_account_id_from_aws(self) -> str:
237+
sts_client = self.session.client("sts")
238+
result = sts_client.get_caller_identity()
239+
return result["Account"]
240+
241+
242+
class AwsProxyGateway(Gateway):
243+
"""
244+
A handler chain that receives AWS requests, and proxies them transparently to upstream AWS using real
245+
credentials. It de-constructs the incoming request, and creates a new request signed with the AWS
246+
credentials configured in the environment.
247+
"""
248+
249+
def __init__(self) -> None:
250+
from localstack.aws import handlers
251+
252+
super().__init__(
253+
request_handlers=[
254+
handlers.parse_service_name,
255+
handlers.content_decoder,
256+
handlers.add_region_from_header,
257+
handlers.add_account_id,
258+
handlers.parse_service_request,
259+
AwsProxyHandler(),
260+
],
261+
exception_handlers=[
262+
handlers.log_exception,
263+
handlers.handle_internal_failure,
264+
],
265+
context_class=AwsRequestContext,
266+
)
267+
268+
60269
class AuthProxyAWS(Server):
61270
def __init__(self, config: ProxyConfig, port: int = None):
62271
self.config = config
@@ -65,9 +274,13 @@ def __init__(self, config: ProxyConfig, port: int = None):
65274

66275
def do_run(self):
67276
self.register_in_instance()
277+
68278
bind_host = self.config.get("bind_host") or DEFAULT_BIND_HOST
69-
proxy = run_server(port=self.port, bind_addresses=[bind_host], handler=self.proxy_request)
70-
proxy.join()
279+
srv = GatewayServer(AwsProxyGateway(), localstack_config.HostAndPort(bind_host, self.port))
280+
srv.start()
281+
srv.join()
282+
# proxy = run_server(port=self.port, bind_addresses=[bind_host], handler=self.proxy_request)
283+
# proxy.join()
71284

72285
def proxy_request(self, request: Request, data: bytes) -> Response:
73286
parsed = self._extract_region_and_service(request.headers)
@@ -214,20 +427,23 @@ def _parse_aws_request(
214427

215428
def _adjust_request_dict(self, service_name: str, request_dict: Dict):
216429
"""Apply minor fixes to the request dict, which seem to be required in the current setup."""
217-
430+
# TODO: replacing localstack-specific URLs, IDs, etc, should ideally be done in a more generalized
431+
# way.
218432
req_body = request_dict.get("body")
219-
body_str = run_safe(lambda: to_str(req_body)) or ""
220-
221-
# TODO: this custom fix should not be required - investigate and remove!
222-
if "<CreateBucketConfiguration" in body_str and "LocationConstraint" not in body_str:
223-
region = request_dict["context"]["client_region"]
224-
if region == AWS_REGION_US_EAST_1:
225-
request_dict["body"] = ""
226-
else:
227-
request_dict["body"] = (
228-
'<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
229-
f"<LocationConstraint>{region}</LocationConstraint></CreateBucketConfiguration>"
230-
)
433+
434+
if service_name == "s3":
435+
body_str = run_safe(lambda: to_str(req_body)) or ""
436+
437+
# TODO: this custom fix should not be required - investigate and remove!
438+
if "<CreateBucketConfiguration" in body_str and "LocationConstraint" not in body_str:
439+
region = request_dict["context"]["client_region"]
440+
if region == AWS_REGION_US_EAST_1:
441+
request_dict["body"] = ""
442+
else:
443+
request_dict["body"] = (
444+
'<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
445+
f"<LocationConstraint>{region}</LocationConstraint></CreateBucketConfiguration>"
446+
)
231447

232448
if service_name == "sqs" and isinstance(req_body, dict):
233449
account_id = self._query_account_id_from_aws()

aws-replicator/aws_replicator/server/aws_request_forwarder.py

+29-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from localstack.utils.net import get_addressable_container_host
1717
from localstack.utils.strings import to_str, truncate
1818
from requests.structures import CaseInsensitiveDict
19+
from rolo.proxy import forward
1920

2021
try:
2122
from localstack.testing.config import TEST_AWS_ACCESS_KEY_ID
@@ -37,17 +38,21 @@ def __call__(self, chain: HandlerChain, context: RequestContext, response: Respo
3738
return
3839

3940
# forward request to proxy
40-
response = self.forward_request(context, proxy)
41+
response_ = self.forward_request(context, proxy)
4142

42-
if response is None:
43+
if response_ is None:
4344
return
4445

45-
# set response details, then stop handler chain to return response
46-
chain.response.data = response.raw_content
47-
chain.response.status_code = response.status_code
48-
chain.response.headers.update(dict(response.headers))
46+
response.update_from(response_)
4947
chain.stop()
5048

49+
# set response details, then stop handler chain to return response
50+
# chain.response.data = response.raw_content
51+
# chain.response.status_code = response.status_code
52+
# chain.response.headers.update(dict(response.headers))
53+
# chain.stop()
54+
# chain.respond(response.status_code, response.raw_content, dict(response.headers))
55+
5156
def select_proxy(self, context: RequestContext) -> Optional[ProxyInstance]:
5257
"""select a proxy responsible to forward a request to real AWS"""
5358
if not context.service:
@@ -126,6 +131,22 @@ def forward_request(self, context: RequestContext, proxy: ProxyInstance) -> requ
126131
port = proxy["port"]
127132
request = context.request
128133
target_host = get_addressable_container_host(default_local_hostname=LOCALHOST)
134+
135+
try:
136+
LOG.info("Forwarding request: %s", context)
137+
response = forward(request, f"http://{target_host}:{port}")
138+
LOG.info(
139+
"Received response: status=%s headers=%s body=%s",
140+
response.status_code,
141+
response.headers,
142+
response.data,
143+
)
144+
except Exception as e:
145+
LOG.exception("Exception while forwarding request")
146+
raise
147+
148+
return response
149+
129150
url = f"http://{target_host}:{port}{request.path}?{to_str(request.query_string)}"
130151

131152
# inject Auth header, to ensure we're passing the right region to the proxy (e.g., for Cognito InitiateAuth)
@@ -156,9 +177,9 @@ def forward_request(self, context: RequestContext, proxy: ProxyInstance) -> requ
156177
dict(result.headers),
157178
truncate(result.raw_content, max_length=500),
158179
)
159-
except requests.exceptions.ConnectionError:
180+
except requests.exceptions.ConnectionError as e:
160181
# remove unreachable proxy
161-
LOG.info("Removing unreachable AWS forward proxy due to connection issue: %s", url)
182+
LOG.exception("Removing unreachable AWS forward proxy due to connection issue: %s", url)
162183
self.PROXY_INSTANCES.pop(port, None)
163184
return result
164185

aws-replicator/aws_replicator/server/extension.py

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ class AwsReplicatorExtension(Extension):
1212
name = "aws-replicator"
1313

1414
def on_extension_load(self):
15+
logging.getLogger("aws_replicator").setLevel(
16+
logging.DEBUG if config.DEBUG else logging.INFO
17+
)
18+
1519
if config.GATEWAY_SERVER == "twisted":
1620
LOG.warning(
1721
"AWS resource replicator: The aws-replicator extension currently requires hypercorn as "

0 commit comments

Comments
 (0)