This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 278
/
Copy pathconfig.py
127 lines (97 loc) · 4.28 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import re
import os
from typing import Any, Dict
import toml
_ARRAY_FIELDS = (
"key_columns",
"columns",
)
class ConfigParseError(Exception):
pass
def is_uri(s: str) -> bool:
return "://" in s
def _apply_config(config: Dict[str, Any], run_name: str, kw: Dict[str, Any]):
_resolve_env(config)
# Load config
databases = config.pop("database", {})
runs = config.pop("run", {})
if config:
raise ConfigParseError(f"Unknown option(s): {config}")
# Init run_args
run_args = runs.get("default") or {}
if run_name:
if run_name not in runs:
raise ConfigParseError(f"Cannot find run '{run_name}' in configuration.")
run_args.update(runs[run_name])
else:
run_name = "default"
if kw.get("database1") is not None:
for attr in ("table1", "database2", "table2"):
if kw[attr] is None:
raise ValueError(f"Specified database1 but not {attr}. Must specify all 4 arguments, or neither.")
for index in "12":
run_args[index] = {attr: kw.pop(f"{attr}{index}") for attr in ("database", "table")}
# Make sure array fields are decoded as list, since array fields in toml are decoded as list, but TableSegment object requires tuple type.
for field in _ARRAY_FIELDS:
if isinstance(run_args.get(field), list):
run_args[field] = tuple(run_args[field])
# Process databases + tables
for index in "12":
try:
args = run_args.pop(index)
except KeyError:
raise ConfigParseError(
f"Could not find source #{index}: Expecting a key of '{index}' containing '.database' and '.table'."
)
for attr in ("database", "table"):
if attr not in args:
raise ConfigParseError(f"Running 'run.{run_name}': Connection #{index} is missing attribute '{attr}'.")
database = args.pop("database")
table = args.pop("table")
threads = args.pop("threads", None)
if args:
raise ConfigParseError(f"Unexpected attributes for connection #{index}: {args}")
if not is_uri(database):
if database not in databases:
raise ConfigParseError(
f"Database '{database}' not found in list of databases. Available: {list(databases)}."
)
database = dict(databases[database])
assert isinstance(database, dict)
if "driver" not in database:
raise ConfigParseError(f"Database '{database}' did not specify a driver.")
run_args[f"database{index}"] = database
run_args[f"table{index}"] = table
if threads is not None:
run_args[f"threads{index}"] = int(threads)
# Update keywords
new_kw = dict(kw) # Set defaults
new_kw.update(run_args) # Apply config
new_kw.update({k: v for k, v in kw.items() if v}) # Apply non-empty defaults
new_kw["__conf__"] = run_args
return new_kw
# There are no strict requirements for the environment variable name format.
# But most shells only allow alphanumeric characters and underscores.
# https://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html
# "Environment variable names (...) consist solely of uppercase letters, digits, and the '_' (underscore)"
_ENV_VAR_PATTERN = r"\$\{([A-Za-z0-9_]+)\}"
def _resolve_env(config: Dict[str, Any]) -> None:
"""
Resolve environment variables referenced as ${ENV_VAR_NAME}.
Missing environment variables are replaced with an empty string.
"""
for key, value in config.items():
if isinstance(value, dict):
_resolve_env(value)
elif isinstance(value, str):
config[key] = re.sub(_ENV_VAR_PATTERN, _replace_match, value)
def _replace_match(match: re.Match) -> str:
# Lookup referenced variable in environment.
# Replace with empty string if not found
referenced_var = match.group(1) # group(0) is the whole string
return os.environ.get(referenced_var, "")
def apply_config_from_file(path: str, run_name: str, kw: Dict[str, Any]):
with open(path) as f:
return _apply_config(toml.load(f), run_name, kw)
def apply_config_from_string(toml_config: str, run_name: str, kw: Dict[str, Any]):
return _apply_config(toml.loads(toml_config), run_name, kw)