-
Notifications
You must be signed in to change notification settings - Fork 497
/
Copy pathaws.py
200 lines (167 loc) · 5.46 KB
/
aws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
This plugin searches for AWS key IDs
"""
import hashlib
import hmac
import re
import string
import textwrap
from datetime import datetime
from typing import cast
from typing import List
from typing import Union
import requests
from ..constants import VerifiedResult
from ..util.code_snippet import CodeSnippet
from .base import RegexBasedDetector
class AWSKeyDetector(RegexBasedDetector):
"""Scans for AWS keys."""
secret_type = 'AWS Access Key'
denylist = (
re.compile(r'AKIA[0-9A-Z]{16}'),
# This examines the variable name to identify AWS secret tokens.
# The order is important since we want to prefer finding `AKIA`-based
# keys (since they can be verified), rather than the secret tokens.
re.compile(r'aws.{0,20}?[\'\"]([0-9a-zA-Z/+]{40})[\'\"]'),
)
def verify( # type: ignore[override] # noqa: F821
self,
secret: str,
context: CodeSnippet,
) -> VerifiedResult:
# As this verification process looks for multi-factor secrets, by assuming that
# the identified secret token is the key ID (then looking for the corresponding secret).
# we quit early if it fails our assumptions.
if not self.denylist[0].match(secret):
return VerifiedResult.UNVERIFIED
secret_access_key_candidates = get_secret_access_keys(context)
if not secret_access_key_candidates:
return VerifiedResult.UNVERIFIED
for candidate in secret_access_key_candidates:
if verify_aws_secret_access_key(secret, candidate):
return VerifiedResult.VERIFIED_TRUE
return VerifiedResult.VERIFIED_FALSE
def get_secret_access_keys(content: CodeSnippet) -> List[str]:
# AWS secret access keys are 40 characters long.
# e.g. some_function('AKIA...', '[secret key]')
# e.g. secret_access_key = '[secret key]'
regex = re.compile(
r'(=|,|\() *([\'"]?)([%s]{40})(\2)(\))?' % (
re.escape(string.ascii_letters + string.digits + '+/=')
),
)
return [
match[2]
for line in content
for match in regex.findall(line)
]
def verify_aws_secret_access_key(key: str, secret: str) -> bool: # pragma: no cover
"""
Using requests, because we don't want to require boto3 for this one
optional verification step.
Loosely based off:
https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
"""
now = datetime.utcnow()
amazon_datetime = now.strftime('%Y%m%dT%H%M%SZ')
headers = {
# This is a required header for the signing process
'Host': 'sts.amazonaws.com',
'X-Amz-Date': amazon_datetime,
}
body = {
'Action': 'GetCallerIdentity',
'Version': '2011-06-15',
}
# Step #1: Canonical Request
signed_headers = ';'.join(
map(
lambda x: x.lower(),
headers.keys(),
),
)
canonical_request = textwrap.dedent("""
POST
/
{headers}
{signed_headers}
{hashed_payload}
""")[1:-1].format(
headers='\n'.join([
'{}:{}'.format(header.lower(), value)
for header, value in headers.items()
]),
signed_headers=signed_headers,
# Poor man's method, but works for this use case.
hashed_payload=hashlib.sha256(
'&'.join([
'{}={}'.format(header, value)
for header, value in body.items()
]).encode('utf-8'),
).hexdigest(),
)
# Step #2: String to Sign
region = 'us-east-1'
scope = '{request_date}/{region}/sts/aws4_request'.format(
request_date=now.strftime('%Y%m%d'),
# STS is a global service; this is just for latency control.
region=region,
)
string_to_sign = textwrap.dedent("""
AWS4-HMAC-SHA256
{request_datetime}
{scope}
{hashed_canonical_request}
""")[1:-1].format(
request_datetime=amazon_datetime,
scope=scope,
hashed_canonical_request=hashlib.sha256(
canonical_request.encode('utf-8'),
).hexdigest(),
)
# Step #3: Calculate signature
signing_key = _sign(
cast(
bytes, _sign(
cast(
bytes, _sign(
cast(
bytes, _sign(
'AWS4{}'.format(secret).encode('utf-8'),
now.strftime('%Y%m%d'),
),
),
region,
),
),
'sts',
),
),
'aws4_request',
)
signature = _sign(
cast(bytes, signing_key),
string_to_sign,
hex=True,
)
# Step #4: Add to request headers
headers['Authorization'] = (
'AWS4-HMAC-SHA256 '
f'Credential={key}/{scope}, '
f'SignedHeaders={signed_headers}, '
f'Signature={cast(str, signature)}'
)
# Step #5: Finally send the request
response = requests.post(
'https://sts.amazonaws.com',
headers=headers,
data=body,
)
if response.status_code == 403:
return False
return True
def _sign(key: bytes, message: str, hex: bool = False) -> Union[str, bytes]: # pragma: no cover
value = hmac.new(key, message.encode('utf-8'), hashlib.sha256)
if not hex:
return value.digest()
return value.hexdigest()