Skip to content

Commit 2342b89

Browse files
committed
Migrate ubuntu usn importer
Signed-off-by: Tushar Goel <[email protected]>
1 parent fe574fc commit 2342b89

File tree

5 files changed

+65
-95
lines changed

5 files changed

+65
-95
lines changed

vulnerabilities/importers/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from vulnerabilities.importers import redhat
2424
from vulnerabilities.importers import retiredotnet
2525
from vulnerabilities.importers import ubuntu
26+
from vulnerabilities.importers import ubuntu_usn
2627

2728
IMPORTERS_REGISTRY = [
2829
nginx.NginxImporter,
@@ -41,6 +42,7 @@
4142
debian_oval.DebianOvalImporter,
4243
npm.NpmImporter,
4344
retiredotnet.RetireDotnetImporter,
45+
ubuntu_usn.UbuntuUSNImporter,
4446
]
4547

4648
IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}

vulnerabilities/importers/ubuntu_usn.py

+17-30
Original file line numberDiff line numberDiff line change
@@ -15,55 +15,42 @@
1515
from vulnerabilities.importer import AdvisoryData
1616
from vulnerabilities.importer import Importer
1717
from vulnerabilities.importer import Reference
18-
from vulnerabilities.utils import create_etag
1918
from vulnerabilities.utils import is_cve
2019

2120

2221
class UbuntuUSNImporter(Importer):
23-
def updated_advisories(self):
24-
advisories = []
25-
if create_etag(data_src=self, url=self.config.db_url, etag_key="etag"):
26-
advisories.extend(self.to_advisories(fetch(self.config.db_url)))
22+
db_url = "https://usn.ubuntu.com/usn-db/database-all.json.bz2"
23+
spdx_license_expression = "GPL"
2724

28-
return self.batch_advisories(advisories)
29-
30-
def create_etag(self, url):
31-
etag = requests.head(url).headers.get("etag")
32-
if not etag:
33-
return True
34-
35-
elif url in self.config.etags:
36-
if self.config.etags[url] == etag:
37-
return False
38-
39-
self.config.etags[url] = etag
40-
return True
25+
def advisory_data(self):
26+
usn_db = fetch(self.db_url)
27+
yield from self.to_advisories(usn_db=usn_db)
4128

4229
@staticmethod
4330
def to_advisories(usn_db):
44-
advisories = []
4531
for usn in usn_db:
46-
reference = get_usn_references(usn_db[usn]["id"])
47-
for cve in usn_db[usn].get("cves", [""]):
32+
usn_data = usn_db[usn]
33+
references = get_usn_references(usn_data.get("id"))
34+
for cve in usn_data.get("cves", []):
4835
# The db sometimes contains entries like
4936
# {'cves': ['python-pgsql vulnerabilities', 'CVE-2006-2313', 'CVE-2006-2314']}
5037
# This `if` filters entries like 'python-pgsql vulnerabilities'
5138
if not is_cve(cve):
5239
cve = ""
5340

54-
advisories.append(
55-
AdvisoryData(
56-
vulnerability_id=cve,
57-
summary="",
58-
references=[reference],
59-
)
41+
if not cve:
42+
continue
43+
yield AdvisoryData(
44+
aliases=[cve],
45+
summary="",
46+
references=references,
6047
)
6148

62-
return advisories
63-
6449

6550
def get_usn_references(usn_id):
66-
return Reference(reference_id="USN-" + usn_id, url="https://usn.ubuntu.com/{}/".format(usn_id))
51+
if not usn_id:
52+
return []
53+
return [Reference(reference_id=f"USN-{usn_id}", url=f"https://usn.ubuntu.com/{usn_id}/")]
6754

6855

6956
def fetch(url):

vulnerabilities/tests/conftest.py

-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,5 @@ def no_rmtree(monkeypatch):
4242
"test_suse_backports.py",
4343
"test_suse.py",
4444
"test_suse_scores.py",
45-
"test_ubuntu_usn.py",
4645
"test_upstream.py",
4746
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[
2+
{
3+
"aliases": [
4+
"CVE-2009-0698"
5+
],
6+
"summary": "",
7+
"affected_packages": [],
8+
"references": [
9+
{
10+
"reference_id": "USN-763-1",
11+
"url": "https://usn.ubuntu.com/763-1/",
12+
"severities": []
13+
}
14+
],
15+
"date_published": null
16+
},
17+
{
18+
"aliases": [
19+
"CVE-2009-1274"
20+
],
21+
"summary": "",
22+
"affected_packages": [],
23+
"references": [
24+
{
25+
"reference_id": "USN-763-1",
26+
"url": "https://usn.ubuntu.com/763-1/",
27+
"severities": []
28+
}
29+
],
30+
"date_published": null
31+
}
32+
]

vulnerabilities/tests/test_ubuntu_usn.py

+14-64
Original file line numberDiff line numberDiff line change
@@ -17,71 +17,21 @@
1717

1818
from packageurl import PackageURL
1919

20-
import vulnerabilities.importers.ubuntu_usn as ubuntu_usn
2120
from vulnerabilities.importer import AdvisoryData
2221
from vulnerabilities.importer import Reference
22+
from vulnerabilities.importers.ubuntu_usn import UbuntuUSNImporter
23+
from vulnerabilities.tests import util_tests
2324

2425
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
25-
TEST_DATA = os.path.join(BASE_DIR, "test_data/", "ubuntu_usn_db", "database-all.json.bz2")
26-
27-
28-
class TestUbuntuUSNImporter(TestCase):
29-
@classmethod
30-
def setUpClass(cls):
31-
data_src_cfg = {"etags": {}, "db_url": "http://exampledb.com"}
32-
cls.data_src = ubuntu_usn.UbuntuUSNImporter(batch_size=1, config=data_src_cfg)
33-
with open(TEST_DATA, "rb") as f:
34-
cls.raw_data = f.read()
35-
cls.db = json.loads(bz2.decompress(cls.raw_data))
36-
37-
def test_get_usn_references(self):
38-
39-
eg_usn = "435-1"
40-
expected_references = Reference(
41-
reference_id="USN-435-1", url="https://usn.ubuntu.com/435-1/"
42-
)
43-
44-
found_references = ubuntu_usn.get_usn_references(eg_usn)
45-
assert found_references == expected_references
46-
47-
def test_fetch(self):
48-
49-
mock_response = MagicMock()
50-
mock_response.content = self.raw_data
51-
with patch("vulnerabilities.importers.ubuntu_usn.requests.get", return_value=mock_response):
52-
assert ubuntu_usn.fetch("www.db.com") == self.db
53-
54-
def test_to_advisories(self):
55-
56-
expected_advisories = [
57-
Advisory(
58-
summary="",
59-
references=[
60-
Reference(url="https://usn.ubuntu.com/763-1/", reference_id="USN-763-1")
61-
],
62-
vulnerability_id="CVE-2009-0698",
63-
),
64-
Advisory(
65-
summary="",
66-
references=[
67-
Reference(url="https://usn.ubuntu.com/763-1/", reference_id="USN-763-1")
68-
],
69-
vulnerability_id="CVE-2009-1274",
70-
),
71-
]
72-
found_advisories = self.data_src.to_advisories(self.db)
73-
74-
found_advisories = list(map(Advisory.normalized, found_advisories))
75-
expected_advisories = list(map(Advisory.normalized, expected_advisories))
76-
assert sorted(found_advisories) == sorted(expected_advisories)
77-
78-
def test_create_etag(self):
79-
assert self.data_src.config.etags == {}
80-
81-
mock_response = MagicMock()
82-
mock_response.headers = {"etag": "2131151243&2191"}
83-
84-
with patch("vulnerabilities.importers.ubuntu.requests.head", return_value=mock_response):
85-
assert self.data_src.create_etag("https://example.org")
86-
assert self.data_src.config.etags == {"https://example.org": "2131151243&2191"}
87-
assert not self.data_src.create_etag("https://example.org")
26+
TEST_DIR = os.path.join(BASE_DIR, "test_data", "ubuntu_usn_db")
27+
28+
29+
def test_ubuntu_usn():
30+
database = os.path.join(TEST_DIR, "database-all.json.bz2")
31+
with open(database, "rb") as f:
32+
raw_data = f.read()
33+
db = json.loads(bz2.decompress(raw_data))
34+
advisories = UbuntuUSNImporter().to_advisories(db)
35+
expected_file = os.path.join(TEST_DIR, f"ubuntu-usn-expected.json")
36+
result = [data.to_dict() for data in list(advisories)]
37+
util_tests.check_results_against_json(result, expected_file)

0 commit comments

Comments
 (0)