-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathtflocal
executable file
·507 lines (426 loc) · 17.2 KB
/
tflocal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
#!/usr/bin/env python
"""
Thin wrapper around the "terraform" command line interface (CLI) for use
with LocalStack.
The "tflocal" CLI allows you to easily interact with your local services
without having to specify the local endpoints in the "provider" section of
your TF config.
"""
import os
import sys
import glob
import subprocess
import json
import textwrap
from packaging import version
from urllib.parse import urlparse
from typing import Optional
PARENT_FOLDER = os.path.realpath(os.path.join(os.path.dirname(__file__), ".."))
if os.path.isdir(os.path.join(PARENT_FOLDER, ".venv")):
sys.path.insert(0, PARENT_FOLDER)
from localstack_client import config # noqa: E402
import hcl2 # noqa: E402
DRY_RUN = str(os.environ.get("DRY_RUN")).strip().lower() in ["1", "true"]
DEFAULT_REGION = "us-east-1"
DEFAULT_ACCESS_KEY = "test"
AWS_ENDPOINT_URL = os.environ.get("AWS_ENDPOINT_URL")
CUSTOMIZE_ACCESS_KEY = str(os.environ.get("CUSTOMIZE_ACCESS_KEY")).strip().lower() in ["1", "true"]
LOCALHOST_HOSTNAME = "localhost.localstack.cloud"
S3_HOSTNAME = os.environ.get("S3_HOSTNAME") or f"s3.{LOCALHOST_HOSTNAME}"
USE_EXEC = str(os.environ.get("USE_EXEC")).strip().lower() in ["1", "true"]
TF_CMD = os.environ.get("TF_CMD") or "terraform"
TF_UNPROXIED_CMDS = os.environ.get("TF_UNPROXIED_CMDS").split(sep=",") if os.environ.get("TF_UNPROXIED_CMDS") else ("fmt", "validate", "version")
LS_PROVIDERS_FILE = os.environ.get("LS_PROVIDERS_FILE") or "localstack_providers_override.tf"
LOCALSTACK_HOSTNAME = urlparse(AWS_ENDPOINT_URL).hostname or os.environ.get("LOCALSTACK_HOSTNAME") or "localhost"
EDGE_PORT = int(urlparse(AWS_ENDPOINT_URL).port or os.environ.get("EDGE_PORT") or 4566)
TF_VERSION: Optional[version.Version] = None
TF_PROVIDER_CONFIG = """
provider "aws" {
access_key = "<access_key>"
secret_key = "test"
skip_credentials_validation = true
skip_metadata_api_check = true
<configs>
endpoints {
<endpoints>
}
}
"""
TF_S3_BACKEND_CONFIG = """
terraform {
backend "s3" {
region = "<region>"
bucket = "<bucket>"
key = "<key>"
dynamodb_table = "<dynamodb_table>"
access_key = "test"
secret_key = "test"
<endpoints>
skip_credentials_validation = true
skip_metadata_api_check = true
}
}
"""
PROCESS = None
# ---
# CONFIG GENERATION UTILS
# ---
def create_provider_config_file(provider_aliases=None):
provider_aliases = provider_aliases or []
# maps services to be replaced with alternative names
# skip services which do not have equivalent endpoint overrides
# see https://registry.terraform.io/providers/hashicorp/aws/latest/docs/guides/custom-service-endpoints
service_replaces = {
"apigatewaymanagementapi": "",
"appconfigdata": "",
"ce": "costexplorer",
"dynamodbstreams": "",
"edge": "",
"emrserverless": "",
"iotdata": "",
"ioteventsdata": "",
"iotjobsdata": "",
"iotwireless": "",
"logs": "cloudwatchlogs",
"mediastoredata": "",
"qldbsession": "",
"rdsdata": "",
"sagemakerruntime": "",
"support": "",
"timestream": "",
"timestreamquery": "",
}
# service names to be excluded (not yet available in TF)
service_excludes = ["meteringmarketplace"]
# create list of service names
services = list(config.get_service_ports())
services = [srvc for srvc in services if srvc not in service_excludes]
services = [s.replace("-", "") for s in services]
for old, new in service_replaces.items():
try:
services.remove(old)
if new:
services.append(new)
except ValueError:
pass
services = sorted(services)
# add default (non-aliased) provider, if not defined yet
default_provider = [p for p in provider_aliases if not p.get("alias")]
if not default_provider:
provider_aliases.append({"region": get_region()})
# create provider configs
provider_configs = []
for provider in provider_aliases:
provider_config = TF_PROVIDER_CONFIG.replace(
"<access_key>",
get_access_key(provider) if CUSTOMIZE_ACCESS_KEY else DEFAULT_ACCESS_KEY
)
endpoints = "\n".join([f' {s} = "{get_service_endpoint(s)}"' for s in services])
provider_config = provider_config.replace("<endpoints>", endpoints)
additional_configs = []
if use_s3_path_style():
additional_configs += [" s3_use_path_style = true"]
alias = provider.get("alias")
if alias:
if isinstance(alias, list):
alias = alias[0]
additional_configs += [f' alias = "{alias}"']
region = provider.get("region") or get_region()
if isinstance(region, list):
region = region[0]
additional_configs += [f'region = "{region}"']
provider_config = provider_config.replace("<configs>", "\n".join(additional_configs))
provider_configs.append(provider_config)
# construct final config file content
tf_config = "\n".join(provider_configs)
# create s3 backend config
tf_config += generate_s3_backend_config()
# write temporary config file
providers_file = get_providers_file_path()
write_provider_config_file(providers_file, tf_config)
return providers_file
def write_provider_config_file(providers_file, tf_config):
"""Write provider config into file"""
with open(providers_file, mode="w") as fp:
fp.write(tf_config)
def get_providers_file_path() -> str:
"""Determine the path under which the providers override file should be stored"""
chdir = [arg for arg in sys.argv if arg.startswith("-chdir=")]
base_dir = "."
if chdir:
base_dir = chdir[0].removeprefix("-chdir=")
return os.path.join(base_dir, LS_PROVIDERS_FILE)
def determine_provider_aliases() -> list:
"""Return a list of providers (and aliases) configured in the *.tf files (if any)"""
result = []
tf_files = parse_tf_files()
for _file, obj in tf_files.items():
try:
providers = ensure_list(obj.get("provider", []))
aws_providers = [prov["aws"] for prov in providers if prov.get("aws")]
result.extend(aws_providers)
except Exception as e:
print(f"Warning: Unable to extract providers from {_file}:", e)
return result
def generate_s3_backend_config() -> str:
"""Generate an S3 `backend {..}` block with local endpoints, if configured"""
is_tf_legacy = TF_VERSION < version.Version("1.6")
backend_config = None
tf_files = parse_tf_files()
for filename, obj in tf_files.items():
if LS_PROVIDERS_FILE == filename:
continue
tf_configs = ensure_list(obj.get("terraform", []))
for tf_config in tf_configs:
backend_config = ensure_list(tf_config.get("backend"))
if backend_config:
backend_config = backend_config[0]
break
backend_config = backend_config and backend_config.get("s3")
if not backend_config:
return ""
legacy_endpoint_mappings = {
"endpoint": "s3",
"iam_endpoint": "iam",
"sts_endpoint": "sts",
"dynamodb_endpoint": "dynamodb",
}
configs = {
# note: default values, updated by `backend_config` further below...
"bucket": "tf-test-state",
"key": "terraform.tfstate",
"dynamodb_table": "tf-test-state",
"region": get_region(),
"endpoints": {
"s3": get_service_endpoint("s3"),
"iam": get_service_endpoint("iam"),
"sso": get_service_endpoint("sso"),
"sts": get_service_endpoint("sts"),
"dynamodb": get_service_endpoint("dynamodb"),
},
}
# Merge in legacy endpoint configs if not existing already
if is_tf_legacy and backend_config.get("endpoints"):
print("Warning: Unsupported backend option(s) detected (`endpoints`). Please make sure you always use the corresponding options to your Terraform version.")
exit(1)
for legacy_endpoint, endpoint in legacy_endpoint_mappings.items():
if legacy_endpoint in backend_config and (not backend_config.get("endpoints") or endpoint not in backend_config["endpoints"]):
if not backend_config.get("endpoints"):
backend_config["endpoints"] = {}
backend_config["endpoints"].update({endpoint: backend_config[legacy_endpoint]})
# Add any missing default endpoints
if backend_config.get("endpoints"):
backend_config["endpoints"] = {
k: backend_config["endpoints"].get(k) or v
for k, v in configs["endpoints"].items()}
configs.update(backend_config)
if not DRY_RUN:
get_or_create_bucket(configs["bucket"])
get_or_create_ddb_table(configs["dynamodb_table"], region=configs["region"])
result = TF_S3_BACKEND_CONFIG
for key, value in configs.items():
if isinstance(value, bool):
value = str(value).lower()
elif isinstance(value, dict):
if key == "endpoints" and is_tf_legacy:
value = textwrap.indent(
text=textwrap.dedent(f"""\
endpoint = "{value["s3"]}"
iam_endpoint = "{value["iam"]}"
sts_endpoint = "{value["sts"]}"
dynamodb_endpoint = "{value["dynamodb"]}"
"""),
prefix=" " * 4)
else:
value = textwrap.indent(
text=f"{key} = {{\n" + "\n".join([f' {k} = "{v}"' for k, v in value.items()]) + "\n}",
prefix=" " * 4)
else:
value = str(value)
result = result.replace(f"<{key}>", value)
return result
def check_override_file(providers_file: str) -> None:
"""Checks override file existance"""
if os.path.exists(providers_file):
msg = f"Providers override file {providers_file} already exists"
err_msg = msg + " - please delete it first, exiting..."
if DRY_RUN:
msg += ". File will be overwritten."
print(msg)
print("\tOnly 'yes' will be accepted to approve.")
if input("\tEnter a value: ") == "yes":
return
print(err_msg)
exit(1)
# ---
# AWS CLIENT UTILS
# ---
def use_s3_path_style() -> bool:
"""
Whether to use S3 path addressing (depending on the configured S3 endpoint)
If the endpoint starts with the `s3.` prefix, LocalStack will recognize virtual host addressing. If the endpoint
does not start with it, use path style. This also allows overriding the endpoint to always use path style in case of
inter container communications in Docker.
"""
try:
host = urlparse(get_service_endpoint("s3")).hostname
except ValueError:
host = ""
return not host.startswith("s3.")
def get_region() -> str:
region = str(os.environ.get("AWS_DEFAULT_REGION") or "").strip()
if region:
return region
try:
# If boto3 is installed, try to get the region from local credentials.
# Note that boto3 is currently not included in the dependencies, to
# keep the library lightweight.
import boto3
region = boto3.session.Session().region_name
except Exception:
pass
# fall back to default region
return region or DEFAULT_REGION
def get_access_key(provider: dict) -> str:
access_key = str(os.environ.get("AWS_ACCESS_KEY_ID") or provider.get("access_key", "")).strip()
if access_key and access_key != DEFAULT_ACCESS_KEY:
# Change live access key to mocked one
return deactivate_access_key(access_key)
try:
# If boto3 is installed, try to get the access_key from local credentials.
# Note that boto3 is currently not included in the dependencies, to
# keep the library lightweight.
import boto3
access_key = boto3.session.Session().get_credentials().access_key
except Exception:
pass
# fall back to default region
return deactivate_access_key(access_key or DEFAULT_ACCESS_KEY)
def deactivate_access_key(access_key: str) -> str:
"""Safe guarding user from accidental live credential usage by deactivating access key IDs.
See more: https://docs.localstack.cloud/references/credentials/"""
return "L" + access_key[1:] if access_key[0] == "A" else access_key
def get_service_endpoint(service: str) -> str:
"""Get the service endpoint URL for the given service name"""
# allow configuring a custom endpoint via the environment
env_name = f"{service.replace('-', '_').upper().strip()}_ENDPOINT"
env_endpoint = os.environ.get(env_name, "").strip()
if env_endpoint:
if "://" not in env_endpoint:
env_endpoint = f"http://{env_endpoint}"
return env_endpoint
# some services need specific hostnames
hostname = LOCALSTACK_HOSTNAME
if service == "s3":
hostname = S3_HOSTNAME
elif service == "mwaa":
hostname = f"mwaa.{LOCALHOST_HOSTNAME}"
return f"http://{hostname}:{EDGE_PORT}"
def connect_to_service(service: str, region: str = None):
import boto3
region = region or get_region()
return boto3.client(
service, endpoint_url=get_service_endpoint(service), region_name=region,
aws_access_key_id="test", aws_secret_access_key="test",
)
def get_or_create_bucket(bucket_name: str):
"""Get or create a bucket in the current region."""
s3_client = connect_to_service("s3")
try:
return s3_client.head_bucket(Bucket=bucket_name)
except Exception:
region = s3_client.meta.region_name
kwargs = {}
if region != "us-east-1":
kwargs = {"CreateBucketConfiguration": {"LocationConstraint": region}}
return s3_client.create_bucket(Bucket=bucket_name, **kwargs)
def get_or_create_ddb_table(table_name: str, region: str = None):
"""Get or create a DynamoDB table with the given name."""
ddb_client = connect_to_service("dynamodb", region=region)
try:
return ddb_client.describe_table(TableName=table_name)
except Exception:
return ddb_client.create_table(
TableName=table_name, BillingMode="PAY_PER_REQUEST",
KeySchema=[{"AttributeName": "LockID", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "LockID", "AttributeType": "S"}]
)
# ---
# TF UTILS
# ---
def is_override_needed(args) -> bool:
if any(map(lambda x: x in args, TF_UNPROXIED_CMDS)):
return False
return True
def parse_tf_files() -> dict:
"""Parse the local *.tf files and return a dict of <filename> -> <resource_dict>"""
result = {}
for _file in glob.glob("*.tf"):
try:
with open(_file, "r") as fp:
result[_file] = hcl2.load(fp)
except Exception as e:
print(f'Unable to parse "{_file}" as HCL file: {e}')
return result
def get_tf_version(env):
global TF_VERSION
output = subprocess.run([f"{TF_CMD}", "version", "-json"], env=env, check=True, capture_output=True).stdout.decode("utf-8")
TF_VERSION = version.parse(json.loads(output)["terraform_version"])
def run_tf_exec(cmd, env):
"""Run terraform using os.exec - can be useful as it does not require any I/O
handling for stdin/out/err. Does *not* allow us to perform any cleanup logic."""
os.execvpe(cmd[0], cmd, env=env)
def run_tf_subprocess(cmd, env):
"""Run terraform in a subprocess - useful to perform cleanup logic at the end."""
global PROCESS
# register signal handlers
import signal
signal.signal(signal.SIGINT, signal_handler)
PROCESS = subprocess.Popen(
cmd, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stdout)
PROCESS.communicate()
sys.exit(PROCESS.returncode)
# ---
# UTIL FUNCTIONS
# ---
def signal_handler(sig, frame):
PROCESS.send_signal(sig)
def ensure_list(obj) -> list:
return obj if isinstance(obj, list) else [obj]
def to_bytes(obj) -> bytes:
return obj.encode("UTF-8") if isinstance(obj, str) else obj
def to_str(obj) -> bytes:
return obj.decode("UTF-8") if isinstance(obj, bytes) else obj
# ---
# MAIN ENTRYPOINT
# ---
def main():
env = dict(os.environ)
cmd = [TF_CMD] + sys.argv[1:]
try:
get_tf_version(env)
if not TF_VERSION:
raise ValueError
except (FileNotFoundError, ValueError) as e:
print(f"Unable to determine version. See error message for details: {e}")
exit(1)
if is_override_needed(sys.argv[1:]):
check_override_file(get_providers_file_path())
# create TF provider config file
providers = determine_provider_aliases()
config_file = create_provider_config_file(providers)
else:
config_file = None
# call terraform command if not dry-run or any of the commands
if not DRY_RUN or not is_override_needed(sys.argv[1:]):
try:
if USE_EXEC:
run_tf_exec(cmd, env)
else:
run_tf_subprocess(cmd, env)
finally:
# fall through if haven't set during dry-run
if config_file:
os.remove(config_file)
if __name__ == "__main__":
main()