Skip to content

Commit b825613

Browse files
Rexbeast2terriko
andauthored
feat: adding EPSS data (intel#3130)
Co-authored-by: Terri Oda <[email protected]>
1 parent 59e522a commit b825613

File tree

2 files changed

+98
-6
lines changed

2 files changed

+98
-6
lines changed

cve_bin_tool/cvedb.py

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@
2222
from rich.progress import track
2323

2424
from cve_bin_tool.async_utils import run_coroutine
25-
from cve_bin_tool.data_sources import curl_source, gad_source, nvd_source, osv_source
25+
from cve_bin_tool.data_sources import (
26+
curl_source,
27+
epss_source,
28+
gad_source,
29+
nvd_source,
30+
osv_source,
31+
)
2632
from cve_bin_tool.error_handler import ERROR_CODES, CVEDBError, ErrorMode, SigningError
2733
from cve_bin_tool.fetch_json_db import Fetch_JSON_DB
2834
from cve_bin_tool.log import LOGGER
@@ -88,6 +94,8 @@ def __init__(
8894
self.cve_count = -1
8995
self.all_cve_entries: list[dict[str, Any]] | None = None
9096

97+
self.epss_data = None
98+
9199
self.exploits_list: list[Any] = []
92100
self.exploit_count = 0
93101

@@ -126,6 +134,8 @@ async def refresh(self) -> None:
126134
if self.version_check:
127135
check_latest_version()
128136

137+
epss = epss_source.Epss_Source()
138+
self.epss_data = await epss.update_epss()
129139
await self.get_data()
130140

131141
def refresh_cache_and_update_db(self) -> None:
@@ -342,7 +352,29 @@ def insert_queries(self):
342352
)
343353
VALUES (?,?,?)
344354
"""
345-
return insert_severity, insert_cve_range, insert_exploit
355+
insert_cve_metrics = """
356+
INSERT or REPLACE INTO cve_metrics (
357+
cve_number,
358+
metric_id,
359+
metric_score,
360+
metric_field
361+
)
362+
VALUES (?, ?, ?, ?)
363+
"""
364+
insert_metrics = """
365+
INSERT or REPLACE INTO metrics (
366+
metrics_id,
367+
metrics_name
368+
)
369+
VALUES (?, ?)
370+
"""
371+
return (
372+
insert_severity,
373+
insert_cve_range,
374+
insert_exploit,
375+
insert_cve_metrics,
376+
insert_metrics,
377+
)
346378

347379
def init_database(self) -> None:
348380
"""Initialize db tables used for storing cve/version data"""
@@ -429,6 +461,9 @@ def populate_db(self) -> None:
429461
we'll need a better parser to match those together.
430462
"""
431463

464+
self.store_epss_data()
465+
self.populate_metrics()
466+
432467
for idx, data in enumerate(self.data):
433468
_, source_name = data
434469

@@ -457,7 +492,7 @@ def populate_db(self) -> None:
457492
self.db_close()
458493

459494
def populate_severity(self, severity_data, cursor, data_source):
460-
(insert_severity, _, _) = self.insert_queries()
495+
(insert_severity, _, _, _, _) = self.insert_queries()
461496
del_cve_range = "DELETE from cve_range where CVE_number=?"
462497

463498
for cve in severity_data:
@@ -500,7 +535,7 @@ def populate_severity(self, severity_data, cursor, data_source):
500535
cursor.executemany(del_cve_range, [(cve["ID"],) for cve in severity_data])
501536

502537
def populate_affected(self, affected_data, cursor, data_source):
503-
(_, insert_cve_range, _) = self.insert_queries()
538+
(_, insert_cve_range, _, _, _) = self.insert_queries()
504539
try:
505540
cursor.executemany(
506541
insert_cve_range,
@@ -522,6 +557,21 @@ def populate_affected(self, affected_data, cursor, data_source):
522557
except Exception as e:
523558
LOGGER.info(f"Unable to insert data for {data_source} - {e}")
524559

560+
def populate_metrics(self):
561+
cursor = self.db_open_and_get_cursor()
562+
# Insert a row without specifying cve_metrics_id
563+
(_, _, _, _, insert_metrics) = self.insert_queries()
564+
data = [
565+
(1, "EPSS"),
566+
(2, "CVSS-2"),
567+
(3, "CVSS-3"),
568+
]
569+
# Execute the insert query for each row
570+
for row in data:
571+
cursor.execute(insert_metrics, row)
572+
self.connection.commit()
573+
self.db_close()
574+
525575
def clear_cached_data(self) -> None:
526576
self.create_cache_backup()
527577
if self.cachedir.exists():
@@ -712,12 +762,19 @@ def create_exploit_db(self):
712762
self.db_close()
713763

714764
def populate_exploit_db(self, exploits):
715-
(_, _, insert_exploit) = self.insert_queries()
765+
(_, _, insert_exploit, _, _) = self.insert_queries()
716766
cursor = self.db_open_and_get_cursor()
717767
cursor.executemany(insert_exploit, exploits)
718768
self.connection.commit()
719769
self.db_close()
720770

771+
def store_epss_data(self):
772+
(_, _, _, insert_cve_metrics, _) = self.insert_queries()
773+
cursor = self.db_open_and_get_cursor()
774+
cursor.executemany(insert_cve_metrics, self.epss_data)
775+
self.connection.commit()
776+
self.db_close()
777+
721778
def dict_factory(self, cursor, row):
722779
d = {}
723780
for idx, col in enumerate(cursor.description):
@@ -866,7 +923,13 @@ def db_to_json(self, path, private_key, passphrase):
866923
shutil.rmtree(temp_gnupg_home)
867924

868925
def json_to_db(self, cursor, db_column, json_data):
869-
(insert_severity, insert_cve_range, insert_exploit) = self.insert_queries()
926+
(
927+
insert_severity,
928+
insert_cve_range,
929+
insert_exploit,
930+
insert_cve_metrics,
931+
insert_metrics,
932+
) = self.insert_queries()
870933
columns = []
871934
for data in json_data:
872935
column = list(data.keys())
@@ -887,6 +950,10 @@ def json_to_db(self, cursor, db_column, json_data):
887950
cursor.executemany(insert_cve_range, values)
888951
elif db_column == "cve_severity":
889952
cursor.executemany(insert_severity, values)
953+
elif db_column == "cve_metrics":
954+
cursor.executemany(insert_cve_metrics, values)
955+
elif db_column == "metrics":
956+
cursor.executemany(insert_metrics, values)
890957

891958
def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error):
892959
try:

test/test_cvedb.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,28 @@ def test_import_export_json(self):
6666
self.cvedb.db_close()
6767

6868
assert cve_entries_before == cve_entries_after
69+
70+
def test_new_database_schema(self):
71+
# Check if the new schema is created in the database
72+
self.cvedb.init_database()
73+
cursor = self.cvedb.db_open_and_get_cursor()
74+
75+
tables_to_check = ["cve_metrics", "metrics"]
76+
required_columns = {
77+
"cve_metrics": ["cve_number", "metric_id", "metric_score", "metric_field"],
78+
"metrics": ["metrics_id", "metrics_name"],
79+
}
80+
81+
for table in tables_to_check:
82+
cursor.execute(
83+
f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'"
84+
)
85+
result = cursor.fetchone()
86+
assert result is not None # Assert that the table exists
87+
88+
cursor.execute(f"PRAGMA table_info({table})")
89+
columns = cursor.fetchall()
90+
column_names = [column[1] for column in columns]
91+
assert all(column in column_names for column in required_columns[table])
92+
93+
self.cvedb.db_close()

0 commit comments

Comments
 (0)