-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhash_scanner.py
177 lines (151 loc) · 6.31 KB
/
hash_scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import hashlib
import sys
import argparse
import requests
def parse_arguments():
parser = argparse.ArgumentParser()
# Required Arguments
parser.add_argument("-f", "--file", dest="file", required=True,
help="Specify a file that should be scanned")
parser.add_argument("-k", "--key", dest="key", required=True,
help="Unique API token to give rights to use endpoint")
parser.add_argument("-hash", "--hash", dest="hash", required=False, default="sha256",
help="Specify the hash function (type) to be used for the given file; default md5")
# Optional Arguments
parser.add_argument("-m", "--meta", dest="metadata", required=False, default=None,
help="Specify file metadata, 0 (don't add) or 1 (add)")
# file_name
parser.add_argument("-n", "--name", dest="preserve", action="store_true", required=False, default=None,
help="flag to preserve file name in scan")
# archivepwd
parser.add_argument("-p", "--password", dest="pwd", required=False, default=None,
help="password if submitted file is password protected")
#samplesharing (0 or 1)
parser.add_argument("-s", "--share", dest="share", action="store_true", default=None, required=False,
help="allows file scans to be shared or not (only working for paid users): allowed values 0/1")
# downloadfrom
parser.add_argument("-u", "--url", dest="link", default=None,
help="link to download file")
parser.add_argument("-w", "--workflow", dest="workflow", default=None,
help="active workflows, allowed values: mcl-metadefender-rest-sanitize-disabled-unarchive")
args = parser.parse_args()
if args.preserve:
args.preserve = args.file
validate(args)
return args
def validate(args):
workflow_values = ['mcl', 'metadefender',
'rest', 'sanitize', 'disabled', 'unarchive']
if args.workflow and args.workflow not in workflow_values:
print("Invalid workflow variable given, allowed values: mcl-metadefender-rest-sanitize-disabled-unarchive")
sys.exit(0)
def calculate_hash_file(filename, hash_type):
try:
if hash_type == "md5":
hash = hashlib.md5()
elif hash_type == "sha1":
hash = hashlib.sha1()
elif hash_type == "sha256":
hash = hashlib.sha256()
with open(filename, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b""):
hash.update(chunk)
except:
print("Unable to hash file")
sys.exit(0)
return hash.hexdigest()
def retrieve_scan_file(url, api_key):
headers = {'apikey': api_key}
try:
response = requests.get(url=url, headers=headers)
output_data = response.json()
except requests.exceptions.RequestException as e:
print(e)
sys.exit(0)
except requests.exceptions.ResponseException as e:
print(e)
sys.exit(0)
except requests.exceptions.ConnectionError as e:
print(e)
sys.exit(0)
except requests.exceptions.HTTPError as e:
print(e)
sys.exit(0)
except requests.exceptions.URLRequired as e:
print(e.reason)
sys.exit(0)
return output_data
def upload_file_to_metadefender(api_key, file):
base_url = "https://api.metadefender.com/v4/file"
headers = {'apikey': api_key, 'content-type': "application/octet-stream"}
try:
response = requests.post(base_url, headers=headers, data=file)
output_data = response.json()
except requests.exceptions.RequestException as e:
print(e)
sys.exit(0)
except requests.exceptions.HTTPError as e:
print(e)
sys.exit(0)
except requests.exceptions.ConnectionError as error:
print("Connection Error:", error)
sys.exit(0)
except requests.exceptions.Timeout as error:
print("Timeout:", error)
sys.exit(0)
except:
print("Unable to scan file")
sys.exit(0)
return output_data['data_id']
def file_analysis_result(api_key, data_id):
base_url = "https://api.metadefender.com/v4/file/{data_id}".format(
data_id=data_id)
headers = {'apikey': api_key, 'x-file-metadata': '0'}
try:
response = requests.get(base_url, headers=headers)
output_data = response.json()
except requests.exceptions.RequestException as e:
print(e)
sys.exit(0)
except requests.exceptions.HTTPError as e:
print(e)
sys.exit(0)
except requests.exceptions.ConnectionError as e:
print(e)
sys.exit(0)
except requests.exceptions.Timeout as e:
print(e)
sys.exit(0)
except:
print("Unable to retrieve file analysis result")
sys.exit(0)
return output_data
def show_results(results):
print("filename: {filename}".format(
filename=results['file_info']['display_name']))
print("overall_status: {status}".format(
status=results['scan_results']['scan_all_result_a']))
for i, j in results['scan_results']['scan_details'].items():
print("engine: {engine}".format(engine=i))
print("threat_found: {threat}".format(
threat=j['threat_found'] if j['threat_found'] else 'Clean'))
print("scan_result: {scanresult}".format(
scanresult=j['scan_result_i']))
print("def_time: {time}".format(time=j['def_time']))
if __name__ == '__main__':
args = parse_arguments()
hashed_file_code = calculate_hash_file(args.file, args.hash)
base_url = "https://api.metadefender.com/v4/hash/{}".format(
hashed_file_code)
retrieve_result = retrieve_scan_file(base_url, args.key)
try:
if retrieve_result['error']['code'] == 404003:
data_id = upload_file_to_metadefender(args.key, args.file)
file_result = file_analysis_result(args.key, data_id)
while file_result['scan_results']['scan_all_result_a'] == 'In Progress':
file_result = file_analysis_result(args.key, data_id)
print(show_results(file_result))
except:
while retrieve_result['scan_results']['progress_percentage'] != 100:
retrieve_result = retrieve_scan_file(base_url, args.key)
print(show_results(retrieve_result))