-
-
Notifications
You must be signed in to change notification settings - Fork 486
/
Copy pathapp.py
198 lines (166 loc) · 6.01 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.
import json
import logging
# system imports
import os
import secrets
import shutil
# web imports
from pathlib import Path
from flask import Flask
from flask_executor import Executor
from flask_executor.futures import Future
from flask_shell2http import Shell2HTTP
from werkzeug.utils import safe_join
# get flask-shell2http logger instance
logger = logging.getLogger("flask_shell2http")
# logger config
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log_level = os.getenv("LOG_LEVEL", logging.INFO)
log_path = os.getenv("LOG_PATH", "/var/log/intel_owl/malware_tools_analyzers")
# create new file handlers, files are created if doesn't already exists
fh = logging.FileHandler(f"{log_path}/malware_tools_analyzers.log")
fh.setFormatter(formatter)
fh.setLevel(log_level)
fh_err = logging.FileHandler(f"{log_path}/malware_tools_analyzers_errors.log")
fh_err.setFormatter(formatter)
fh_err.setLevel(logging.ERROR)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(fh_err)
logger.setLevel(log_level)
# Globals
app = Flask(__name__)
app.config["SECRET_KEY"] = secrets.token_hex(16)
executor = Executor(app)
shell2http = Shell2HTTP(app, executor)
# we are changeing the directory for execution of
# artifacts script as it requires us to be in the
# same directory
os.chdir("/opt/deploy/artifacts/artifacts")
# Box-JS report
def read_files_and_make_report(dir_loc):
report = {}
files_to_read = [
"IOC.json",
"snippets.json",
"resources.json",
"analysis.log",
"urls.json",
"active_urls.json",
]
# Read output from files one by one
for fname in files_to_read:
try:
with open(safe_join(dir_loc, fname), encoding="utf-8") as fp:
try:
report[fname] = json.load(fp)
except json.JSONDecodeError:
report[fname] = fp.readlines()
except FileNotFoundError:
report[fname] = f"FileNotFoundError: {fname}"
return report
def intercept_box_js_result(context, future: Future) -> None:
"""
Box-JS doesn't output result to standard output but to a file,
using this callback function,
we intercept the future object and update its result attribute
by reading the final analysis result from the saved result file
before it is ready to be consumed.
"""
# get current result
res = future.result()
fname = context.get("read_result_from", "")
dir_loc = safe_join("/tmp/boxjs", fname + ".results")
if not fname:
if res.get("returncode", -1) == 0:
res["returncode"] = -1
raise Exception("No file specified to read result from")
try:
res["report"] = read_files_and_make_report(dir_loc)
except Exception as e:
res["error"] += str(e)
if not Path(dir_loc).exists():
res["report"]["error"] += f"Path {dir_loc} does not exist"
# set final result
future._result = res # skipcq PYL-W0212
# Remove the directory
shutil.rmtree(dir_loc, ignore_errors=True)
def intercept_droidlysis_result(context, future: Future) -> None:
logger.info("Intercepting droidlysis result")
res = future.result()
dir_loc = "/opt/deploy/droidlysis/out/"
subdir = os.listdir(dir_loc)[0]
f_loc = safe_join(dir_loc, subdir, "report.json")
logger.info(f"Reading result from {f_loc}")
if not os.path.exists(f_loc):
res["error"] += f", result file {f_loc} does not exists."
if res.get("returncode", -1) == 0:
res["returncode"] = -1
else:
with open(f_loc, "r", encoding="utf-8") as fp:
try:
res["report"] = json.load(fp)
except json.JSONDecodeError:
res["report"] = fp.read()
# 4. set final result after modifications
future._result = res # skipcq PYL-W0212
# 5. directory can be removed now
if dir_loc:
shutil.rmtree(dir_loc, ignore_errors=True)
# with this, we can make http calls to the endpoint: /capa
shell2http.register_command(endpoint="capa", command_name="/usr/local/bin/capa -q -j")
# with this, we can make http calls to the endpoint: /floss
shell2http.register_command(
endpoint="floss",
command_name="/usr/local/bin/floss -q -j",
)
# with this, we can make http calls to the endpoint: /peframe
shell2http.register_command(
endpoint="peframe", command_name="/opt/deploy/peframe/venv/bin/peframe"
)
# with this, we can make http calls to the endpoint: /stringsifter
shell2http.register_command(
endpoint="stringsifter",
command_name="/opt/deploy/stringsifter/wrapper.py",
)
# with this, we can make http calls to the endpoint: /clamav
shell2http.register_command(
endpoint="clamav", command_name="/usr/bin/clamdscan --allmatch --infected"
)
# with this, we can make http calls to the endpoint: /boxjs
shell2http.register_command(
endpoint="boxjs",
command_name="/usr/local/bin/box-js",
callback_fn=intercept_box_js_result,
)
# with this, we can make http calls to the endpoint: /apkid
shell2http.register_command(
endpoint="apkid", command_name="/opt/deploy/apkid/venv/bin/apkid"
)
shell2http.register_command(
endpoint="qiling",
command_name="/opt/deploy/qiling/venv/bin/python3 /opt/deploy/qiling/analyze.py",
)
# mobsfscan is the command for DroidLysis
shell2http.register_command(
endpoint="mobsf",
command_name="/opt/deploy/mobsf/venv/bin/mobsfscan",
)
# droidlysis is the command for DroidLysis
shell2http.register_command(
endpoint="droidlysis",
command_name="/opt/deploy/droidlysis/venv/bin/droidlysis",
callback_fn=intercept_droidlysis_result,
)
# flake8: noqa
shell2http.register_command(
endpoint="artifacts",
command_name="/opt/deploy/artifacts/venv/bin/python3 /opt/deploy/artifacts/artifacts/artifacts.py",
)
# goresym is the command for GoReSym
shell2http.register_command(
endpoint="goresym",
command_name="/usr/local/bin/goresym",
)