Skip to content

Commit 815d7b2

Browse files
committed
try to replace http2 server
1 parent 0cab1aa commit 815d7b2

File tree

3 files changed

+264
-25
lines changed

3 files changed

+264
-25
lines changed

aws-replicator/aws_replicator/client/auth_proxy.py

+231-17
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55
import subprocess
66
import sys
77
from functools import cache
8+
from io import BytesIO
89
from typing import Dict, Optional, Tuple
910
from urllib.parse import urlparse, urlunparse
1011

1112
import boto3
1213
import requests
13-
from botocore.awsrequest import AWSPreparedRequest
14+
from botocore.awsrequest import AWSPreparedRequest, AWSResponse
15+
from botocore.httpchecksum import resolve_checksum_context
1416
from botocore.model import OperationModel
1517
from localstack import config
1618
from localstack import config as localstack_config
1719
from localstack.aws.api import HttpRequest
20+
from localstack.aws.chain import HandlerChain
21+
from localstack.aws.chain import RequestContext as AwsRequestContext
22+
from localstack.aws.gateway import Gateway
1823
from localstack.aws.protocol.parser import create_parser
1924
from localstack.aws.spec import load_service
2025
from localstack.config import external_service_url
2126
from localstack.constants import AWS_REGION_US_EAST_1, DOCKER_IMAGE_NAME_PRO
2227
from localstack.http import Request
28+
from localstack.http import Response as HttpResponse
29+
from localstack.http.hypercorn import GatewayServer
2330
from localstack.utils.aws.aws_responses import requests_response
2431
from localstack.utils.bootstrap import setup_logging
2532
from localstack.utils.collections import select_attributes
@@ -28,7 +35,6 @@
2835
from localstack.utils.files import new_tmp_file, save_file
2936
from localstack.utils.functions import run_safe
3037
from localstack.utils.net import get_docker_host_from_container, get_free_tcp_port
31-
from localstack.utils.server.http2_server import run_server
3238
from localstack.utils.serving import Server
3339
from localstack.utils.strings import short_uid, to_bytes, to_str, truncate
3440
from localstack_ext.bootstrap.licensingv2 import ENV_LOCALSTACK_API_KEY, ENV_LOCALSTACK_AUTH_TOKEN
@@ -57,6 +63,207 @@
5763
DEFAULT_BIND_HOST = "127.0.0.1"
5864

5965

66+
class AwsProxyHandler:
67+
"""
68+
A handler for an AWS Handler chain that attempts to forward the request using a specific boto3 session.
69+
This can be used to proxy incoming requests to real AWS.
70+
"""
71+
72+
def __init__(self, session: boto3.Session = None):
73+
self.session = session or boto3.Session()
74+
75+
def __call__(self, chain: HandlerChain, context: AwsRequestContext, response: HttpResponse):
76+
# prepare the API invocation parameters
77+
LOG.info(
78+
"Received %s.%s = %s",
79+
context.service.service_name,
80+
context.operation.name,
81+
context.service_request,
82+
)
83+
84+
# make the actual API call against upstream AWS (will also calculate a new auth signature)
85+
try:
86+
aws_response = self._make_aws_api_call(context)
87+
except Exception:
88+
LOG.exception(
89+
"Exception while proxying %s.%s to AWS",
90+
context.service.service_name,
91+
context.operation.name,
92+
)
93+
raise
94+
95+
# tell the handler chain to respond
96+
LOG.info(
97+
"AWS Response %s.%s: url=%s status_code=%s, headers=%s, content=%s",
98+
context.service.service_name,
99+
context.operation.name,
100+
aws_response.url,
101+
aws_response.status_code,
102+
aws_response.headers,
103+
aws_response.content,
104+
)
105+
chain.respond(aws_response.status_code, aws_response.content, dict(aws_response.headers))
106+
107+
def _make_aws_api_call(self, context: AwsRequestContext) -> AWSResponse:
108+
# TODO: reconcile with AwsRequestProxy from localstack, and other forwarder tools
109+
# create a real AWS client
110+
client = self.session.client(context.service.service_name, region_name=context.region)
111+
operation_model = context.operation
112+
113+
# prepare API request parameters as expected by boto
114+
api_params = {k: v for k, v in context.service_request.items() if v is not None}
115+
116+
# this is a stripped down version of botocore's client._make_api_call to immediately get the HTTP
117+
# response instead of a parsed response.
118+
request_context = {
119+
"client_region": client.meta.region_name,
120+
"client_config": client.meta.config,
121+
"has_streaming_input": operation_model.has_streaming_input,
122+
"auth_type": operation_model.auth_type,
123+
}
124+
125+
(
126+
endpoint_url,
127+
additional_headers,
128+
properties,
129+
) = client._resolve_endpoint_ruleset(operation_model, api_params, request_context)
130+
if properties:
131+
# Pass arbitrary endpoint info with the Request
132+
# for use during construction.
133+
request_context["endpoint_properties"] = properties
134+
135+
request_dict = client._convert_to_request_dict(
136+
api_params=api_params,
137+
operation_model=operation_model,
138+
endpoint_url=endpoint_url,
139+
context=request_context,
140+
headers=additional_headers,
141+
)
142+
resolve_checksum_context(request_dict, operation_model, api_params)
143+
144+
if operation_model.has_streaming_input:
145+
request_dict["body"] = request_dict["body"].read()
146+
147+
self._adjust_request_dict(context.service.service_name, request_dict)
148+
149+
if operation_model.has_streaming_input:
150+
request_dict["body"] = BytesIO(request_dict["body"])
151+
152+
LOG.info("Making AWS request %s", request_dict)
153+
http, _ = client._endpoint.make_request(operation_model, request_dict)
154+
155+
http: AWSResponse
156+
157+
# for some elusive reasons, these header modifications are needed (were part of http2_server)
158+
http.headers.pop("Date", None)
159+
http.headers.pop("Server", None)
160+
if operation_model.has_streaming_output:
161+
http.headers.pop("Content-Length", None)
162+
163+
return http
164+
165+
def _adjust_request_dict(self, service_name: str, request_dict: Dict):
166+
"""Apply minor fixes to the request dict, which seem to be required in the current setup."""
167+
# TODO: replacing localstack-specific URLs, IDs, etc, should ideally be done in a more generalized
168+
# way.
169+
170+
req_body = request_dict.get("body")
171+
172+
# TODO: fix for switch between path/host addressing
173+
# Note: the behavior seems to be different across botocore versions. Seems to be working
174+
# with 1.29.97 (fix below not required) whereas newer versions like 1.29.151 require the fix.
175+
if service_name == "s3":
176+
body_str = run_safe(lambda: to_str(req_body)) or ""
177+
178+
request_url = request_dict["url"]
179+
url_parsed = list(urlparse(request_url))
180+
path_parts = url_parsed[2].strip("/").split("/")
181+
bucket_subdomain_prefix = f"://{path_parts[0]}.s3."
182+
if bucket_subdomain_prefix in request_url:
183+
prefix = f"/{path_parts[0]}"
184+
url_parsed[2] = url_parsed[2].removeprefix(prefix)
185+
request_dict["url_path"] = request_dict["url_path"].removeprefix(prefix)
186+
# replace empty path with "/" (seems required for signature calculation)
187+
request_dict["url_path"] = request_dict["url_path"] or "/"
188+
url_parsed[2] = url_parsed[2] or "/"
189+
# re-construct final URL
190+
request_dict["url"] = urlunparse(url_parsed)
191+
192+
# TODO: this custom fix should not be required - investigate and remove!
193+
if "<CreateBucketConfiguration" in body_str and "LocationConstraint" not in body_str:
194+
region = request_dict["context"]["client_region"]
195+
if region == AWS_REGION_US_EAST_1:
196+
request_dict["body"] = ""
197+
else:
198+
request_dict["body"] = (
199+
'<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
200+
f"<LocationConstraint>{region}</LocationConstraint></CreateBucketConfiguration>"
201+
)
202+
203+
if service_name == "sqs" and isinstance(req_body, dict):
204+
account_id = self._query_account_id_from_aws()
205+
if "QueueUrl" in req_body:
206+
queue_name = req_body["QueueUrl"].split("/")[-1]
207+
req_body["QueueUrl"] = f"https://queue.amazonaws.com/{account_id}/{queue_name}"
208+
if "QueueOwnerAWSAccountId" in req_body:
209+
req_body["QueueOwnerAWSAccountId"] = account_id
210+
if service_name == "sqs" and request_dict.get("url"):
211+
req_json = run_safe(lambda: json.loads(body_str)) or {}
212+
account_id = self._query_account_id_from_aws()
213+
queue_name = req_json.get("QueueName")
214+
if account_id and queue_name:
215+
request_dict["url"] = f"https://queue.amazonaws.com/{account_id}/{queue_name}"
216+
req_json["QueueOwnerAWSAccountId"] = account_id
217+
request_dict["body"] = to_bytes(json.dumps(req_json))
218+
219+
def _fix_headers(self, request: HttpRequest, service_name: str):
220+
if service_name == "s3":
221+
# fix the Host header, to avoid bucket addressing issues
222+
host = request.headers.get("Host") or ""
223+
regex = r"^(https?://)?([0-9.]+|localhost)(:[0-9]+)?"
224+
if re.match(regex, host):
225+
request.headers["Host"] = re.sub(regex, r"\1s3.localhost.localstack.cloud", host)
226+
request.headers.pop("Content-Length", None)
227+
request.headers.pop("x-localstack-request-url", None)
228+
request.headers.pop("X-Forwarded-For", None)
229+
request.headers.pop("X-Localstack-Tgt-Api", None)
230+
request.headers.pop("X-Moto-Account-Id", None)
231+
request.headers.pop("Remote-Addr", None)
232+
233+
@cache
234+
def _query_account_id_from_aws(self) -> str:
235+
sts_client = self.session.client("sts")
236+
result = sts_client.get_caller_identity()
237+
return result["Account"]
238+
239+
240+
class AwsProxyGateway(Gateway):
241+
"""
242+
A handler chain that receives AWS requests, and proxies them transparently to upstream AWS using real
243+
credentials. It de-constructs the incoming request, and creates a new request signed with the AWS
244+
credentials configured in the environment.
245+
"""
246+
247+
def __init__(self) -> None:
248+
from localstack.aws import handlers
249+
250+
super().__init__(
251+
request_handlers=[
252+
handlers.parse_service_name,
253+
handlers.content_decoder,
254+
handlers.add_region_from_header,
255+
handlers.add_account_id,
256+
handlers.parse_service_request,
257+
AwsProxyHandler(),
258+
],
259+
exception_handlers=[
260+
handlers.log_exception,
261+
handlers.handle_internal_failure,
262+
],
263+
context_class=AwsRequestContext,
264+
)
265+
266+
60267
class AuthProxyAWS(Server):
61268
def __init__(self, config: ProxyConfig, port: int = None):
62269
self.config = config
@@ -65,9 +272,13 @@ def __init__(self, config: ProxyConfig, port: int = None):
65272

66273
def do_run(self):
67274
self.register_in_instance()
275+
68276
bind_host = self.config.get("bind_host") or DEFAULT_BIND_HOST
69-
proxy = run_server(port=self.port, bind_addresses=[bind_host], handler=self.proxy_request)
70-
proxy.join()
277+
srv = GatewayServer(AwsProxyGateway(), localstack_config.HostAndPort(bind_host, self.port))
278+
srv.start()
279+
srv.join()
280+
# proxy = run_server(port=self.port, bind_addresses=[bind_host], handler=self.proxy_request)
281+
# proxy.join()
71282

72283
def proxy_request(self, request: Request, data: bytes) -> Response:
73284
parsed = self._extract_region_and_service(request.headers)
@@ -214,20 +425,23 @@ def _parse_aws_request(
214425

215426
def _adjust_request_dict(self, service_name: str, request_dict: Dict):
216427
"""Apply minor fixes to the request dict, which seem to be required in the current setup."""
217-
428+
# TODO: replacing localstack-specific URLs, IDs, etc, should ideally be done in a more generalized
429+
# way.
218430
req_body = request_dict.get("body")
219-
body_str = run_safe(lambda: to_str(req_body)) or ""
220-
221-
# TODO: this custom fix should not be required - investigate and remove!
222-
if "<CreateBucketConfiguration" in body_str and "LocationConstraint" not in body_str:
223-
region = request_dict["context"]["client_region"]
224-
if region == AWS_REGION_US_EAST_1:
225-
request_dict["body"] = ""
226-
else:
227-
request_dict["body"] = (
228-
'<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
229-
f"<LocationConstraint>{region}</LocationConstraint></CreateBucketConfiguration>"
230-
)
431+
432+
if service_name == "s3":
433+
body_str = run_safe(lambda: to_str(req_body)) or ""
434+
435+
# TODO: this custom fix should not be required - investigate and remove!
436+
if "<CreateBucketConfiguration" in body_str and "LocationConstraint" not in body_str:
437+
region = request_dict["context"]["client_region"]
438+
if region == AWS_REGION_US_EAST_1:
439+
request_dict["body"] = ""
440+
else:
441+
request_dict["body"] = (
442+
'<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
443+
f"<LocationConstraint>{region}</LocationConstraint></CreateBucketConfiguration>"
444+
)
231445

232446
if service_name == "sqs" and isinstance(req_body, dict):
233447
account_id = self._query_account_id_from_aws()

aws-replicator/aws_replicator/server/aws_request_forwarder.py

+29-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from localstack.utils.net import get_addressable_container_host
1717
from localstack.utils.strings import to_str, truncate
1818
from requests.structures import CaseInsensitiveDict
19+
from rolo.proxy import forward
1920

2021
try:
2122
from localstack.testing.config import TEST_AWS_ACCESS_KEY_ID
@@ -37,17 +38,21 @@ def __call__(self, chain: HandlerChain, context: RequestContext, response: Respo
3738
return
3839

3940
# forward request to proxy
40-
response = self.forward_request(context, proxy)
41+
response_ = self.forward_request(context, proxy)
4142

42-
if response is None:
43+
if response_ is None:
4344
return
4445

45-
# set response details, then stop handler chain to return response
46-
chain.response.data = response.raw_content
47-
chain.response.status_code = response.status_code
48-
chain.response.headers.update(dict(response.headers))
46+
response.update_from(response_)
4947
chain.stop()
5048

49+
# set response details, then stop handler chain to return response
50+
# chain.response.data = response.raw_content
51+
# chain.response.status_code = response.status_code
52+
# chain.response.headers.update(dict(response.headers))
53+
# chain.stop()
54+
# chain.respond(response.status_code, response.raw_content, dict(response.headers))
55+
5156
def select_proxy(self, context: RequestContext) -> Optional[ProxyInstance]:
5257
"""select a proxy responsible to forward a request to real AWS"""
5358
if not context.service:
@@ -126,6 +131,22 @@ def forward_request(self, context: RequestContext, proxy: ProxyInstance) -> requ
126131
port = proxy["port"]
127132
request = context.request
128133
target_host = get_addressable_container_host(default_local_hostname=LOCALHOST)
134+
135+
try:
136+
LOG.info("Forwarding request: %s", context)
137+
response = forward(request, f"http://{target_host}:{port}")
138+
LOG.info(
139+
"Received response: status=%s headers=%s body=%s",
140+
response.status_code,
141+
response.headers,
142+
response.data,
143+
)
144+
except Exception as e:
145+
LOG.exception("Exception while forwarding request")
146+
raise
147+
148+
return response
149+
129150
url = f"http://{target_host}:{port}{request.path}?{to_str(request.query_string)}"
130151

131152
# inject Auth header, to ensure we're passing the right region to the proxy (e.g., for Cognito InitiateAuth)
@@ -156,9 +177,9 @@ def forward_request(self, context: RequestContext, proxy: ProxyInstance) -> requ
156177
dict(result.headers),
157178
truncate(result.raw_content, max_length=500),
158179
)
159-
except requests.exceptions.ConnectionError:
180+
except requests.exceptions.ConnectionError as e:
160181
# remove unreachable proxy
161-
LOG.info("Removing unreachable AWS forward proxy due to connection issue: %s", url)
182+
LOG.exception("Removing unreachable AWS forward proxy due to connection issue: %s", url)
162183
self.PROXY_INSTANCES.pop(port, None)
163184
return result
164185

aws-replicator/aws_replicator/server/extension.py

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ class AwsReplicatorExtension(Extension):
1212
name = "aws-replicator"
1313

1414
def on_extension_load(self):
15+
logging.getLogger("aws_replicator").setLevel(
16+
logging.DEBUG if config.DEBUG else logging.INFO
17+
)
18+
1519
if config.GATEWAY_SERVER == "twisted":
1620
LOG.warning(
1721
"AWS resource replicator: The aws-replicator extension currently requires hypercorn as "

0 commit comments

Comments
 (0)