-
Notifications
You must be signed in to change notification settings - Fork 496
/
Copy pathaudit.py
357 lines (287 loc) · 10 KB
/
audit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
from __future__ import print_function
import json
import os
import subprocess
import sys
import textwrap
from builtins import input
from collections import defaultdict
from ..plugins.core import initialize
from ..plugins.high_entropy_strings import HighEntropyStringsPlugin
from .baseline import merge_results
from .bidirectional_iterator import BidirectionalIterator
from .color import BashColor
from .color import Color
from .potential_secret import PotentialSecret
class SecretNotFoundOnSpecifiedLineError(Exception):
pass
def audit_baseline(baseline_filename):
original_baseline = _get_baseline_from_file(baseline_filename)
if not original_baseline:
return
files_removed = _remove_nonexistent_files_from_baseline(original_baseline)
current_secret_index = 0
all_secrets = list(_secret_generator(original_baseline))
secrets_with_choices = [
(filename, secret) for filename, secret in all_secrets
if 'is_secret' not in secret
]
total_choices = len(secrets_with_choices)
secret_iterator = BidirectionalIterator(secrets_with_choices)
for filename, secret in secret_iterator:
_clear_screen()
current_secret_index += 1
try:
_print_context(
filename,
secret,
current_secret_index,
total_choices,
original_baseline['plugins_used'],
)
decision = _get_user_decision(can_step_back=secret_iterator.can_step_back())
except SecretNotFoundOnSpecifiedLineError:
decision = _get_user_decision(prompt_secret_decision=False)
if decision == 'q':
print('Quitting...')
break
if decision == 'b':
current_secret_index -= 2
secret_iterator.step_back_on_next_iteration()
_handle_user_decision(decision, secret)
if current_secret_index == 0 and not files_removed:
print('Nothing to audit!')
return
print('Saving progress...')
results = defaultdict(list)
for filename, secret in all_secrets:
results[filename].append(secret)
original_baseline['results'] = merge_results(
original_baseline['results'],
dict(results),
)
_save_baseline_to_file(baseline_filename, original_baseline)
def _get_baseline_from_file(filename): # pragma: no cover
try:
with open(filename) as f:
return json.loads(f.read())
except (IOError, json.decoder.JSONDecodeError):
print('Not a valid baseline file!', file=sys.stderr)
return
def _remove_nonexistent_files_from_baseline(baseline):
files_removed = False
for filename in baseline['results'].copy():
if not os.path.exists(filename):
del baseline['results'][filename]
files_removed = True
return files_removed
def _secret_generator(baseline):
"""Generates secrets to audit, from the baseline"""
for filename, secrets in baseline['results'].items():
for secret in secrets:
yield filename, secret
def _clear_screen(): # pragma: no cover
subprocess.call(['clear'])
def _print_context(filename, secret, count, total, plugin_settings): # pragma: no cover
"""
:type filename: str
:param filename: the file currently scanned.
:type secret: dict, in PotentialSecret.json() format
:param secret: the secret, represented in the baseline file.
:type count: int
:param count: current count of secrets scanned so far
:type total: int
:param total: total number of secrets in baseline
:type plugin_settings: list
:param plugin_settings: plugins used to create baseline.
:raises: SecretNotFoundOnSpecifiedLineError
"""
secrets_left = '{}/{}'.format(
count,
total,
)
print('{} {}\n{} {}'.format(
BashColor.color(
'Secrets Left:',
Color.BOLD,
),
BashColor.color(
secrets_left,
Color.PURPLE,
),
BashColor.color(
'Filename: ',
Color.BOLD,
),
BashColor.color(
filename,
Color.PURPLE,
),
))
print('-' * 10)
error_obj = None
try:
secret_with_context = _get_secret_with_context(
filename,
secret,
plugin_settings,
)
print(secret_with_context)
except SecretNotFoundOnSpecifiedLineError as e:
error_obj = e
print(e)
print('-' * 10)
if error_obj:
raise error_obj
def _get_user_decision(prompt_secret_decision=True, can_step_back=False):
"""
:type prompt_secret_decision: bool
:param prompt_secret_decision: if False, won't ask to label secret.
"""
allowable_user_input = ['s', 'q']
if prompt_secret_decision:
allowable_user_input.extend(['y', 'n'])
if can_step_back:
allowable_user_input.append('b')
user_input = None
while user_input not in allowable_user_input:
if user_input:
print('Invalid input.')
if 'y' in allowable_user_input:
user_input_string = 'Is this a valid secret? (y)es, (n)o, '
else:
user_input_string = 'What would you like to do? '
if 'b' in allowable_user_input:
user_input_string += '(b)ack, '
user_input_string += '(s)kip, (q)uit: '
user_input = input(user_input_string)
if user_input:
user_input = user_input[0].lower()
return user_input
def _handle_user_decision(decision, secret):
if decision == 'y':
secret['is_secret'] = True
elif decision == 'n':
secret['is_secret'] = False
elif decision == 's' and 'is_secret' in secret:
del secret['is_secret']
def _save_baseline_to_file(filename, data): # pragma: no cover
with open(filename, 'w') as f:
f.write(json.dumps(
data,
indent=2,
sort_keys=True,
))
def _get_secret_with_context(
filename,
secret,
plugin_settings,
lines_of_context=5,
):
"""
Displays the secret, with surrounding lines of code for better context.
:type filename: str
:param filename: filename where secret resides in
:type secret: dict, PotentialSecret.json() format
:param secret: the secret listed in baseline
:type plugin_settings: list
:param plugin_settings: plugins used to create baseline.
:type lines_of_context: int
:param lines_of_context: number of lines displayed before and after
secret.
:raises: SecretNotFoundOnSpecifiedLineError
"""
secret_lineno = secret['line_number']
start_line = 1 if secret_lineno <= lines_of_context \
else secret_lineno - lines_of_context
end_line = secret_lineno + lines_of_context
output = subprocess.check_output([
'sed',
'-n', '{},{}p'.format(start_line, end_line),
filename,
]).decode('utf-8').splitlines()
trailing_lines_of_context = lines_of_context
if len(output) < end_line - start_line + 1:
# This handles the case of a short file.
num_lines_in_file = int(subprocess.check_output([
'wc',
'-l',
filename,
]).decode('utf-8').split()[0])
trailing_lines_of_context = lines_of_context - \
(end_line - num_lines_in_file)
# -1, because that's where the secret actually is (without it,
# it would just be the start of the context block).
# NOTE: index_of_secret_in_output should *always* be negative.
index_of_secret_in_output = -trailing_lines_of_context - 1
output[index_of_secret_in_output] = _highlight_secret(
output[index_of_secret_in_output],
secret,
filename,
plugin_settings,
)
# Adding line numbers
return '\n'.join(
map(
lambda x: '{}:{}'.format(
BashColor.color(
str(int(x[0]) + start_line),
Color.LIGHT_GREEN,
),
x[1],
),
enumerate(output),
),
)
def _highlight_secret(secret_line, secret, filename, plugin_settings):
"""
:type secret_line: str
:param secret_line: the line on whcih the secret is found
:type secret: dict
:param secret: see caller's docstring
:type filename: str
:param filename: this is needed, because PotentialSecret uses this
as a means of comparing whether two secrets are equal.
:type plugin_settings: list
:param plugin_settings: see caller's docstring
:rtype: str
:returns: secret_line, but with the actual secret highlighted.
"""
plugin = initialize.from_secret_type(
secret['type'],
plugin_settings,
)
for raw_secret in _raw_secret_generator(plugin, secret_line):
secret_obj = PotentialSecret(
plugin.secret_type,
filename,
secret=raw_secret,
)
# There could be more than two secrets on the same line.
# We only want to highlight the right one.
if secret_obj.secret_hash == secret['hashed_secret']:
break
else:
raise SecretNotFoundOnSpecifiedLineError(
textwrap.dedent("""
ERROR: Secret not found on specified line number!
Try recreating your baseline to fix this issue.
""")[1:-1],
)
index_of_secret = secret_line.index(raw_secret)
return '{}{}{}'.format(
secret_line[:index_of_secret],
BashColor.color(
raw_secret,
Color.RED,
),
secret_line[index_of_secret + len(raw_secret):],
)
def _raw_secret_generator(plugin, secret_line):
"""Generates raw secrets by re-scanning the line, with the specified plugin"""
for raw_secret in plugin.secret_generator(secret_line):
yield raw_secret
if issubclass(plugin.__class__, HighEntropyStringsPlugin):
with plugin.non_quoted_string_regex(strict=False):
for raw_secret in plugin.secret_generator(secret_line):
yield raw_secret