9
9
10
10
import json
11
11
import re
12
+ from pathlib import Path
13
+ from typing import Iterable
12
14
from typing import List
13
- from typing import Set
14
15
15
16
from packageurl import PackageURL
17
+ from univers .version_range import NugetVersionRange
18
+ from univers .versions import NugetVersion
16
19
17
20
from vulnerabilities .importer import AdvisoryData
18
- from vulnerabilities .importer import GitImporter
21
+ from vulnerabilities .importer import AffectedPackage
22
+ from vulnerabilities .importer import Importer
19
23
from vulnerabilities .importer import Reference
20
- from vulnerabilities .utils import AffectedPackage
21
24
22
25
23
- class RetireDotnetImporter (GitImporter ):
24
- def __enter__ (self ):
25
- super (RetireDotnetImporter , self ).__enter__ ()
26
+ class RetireDotnetImporter (Importer ):
27
+ license_url = "https://github.com/RetireNet/Packages/blob/master/LICENSE"
28
+ spdx_license_expression = "MIT"
29
+ repo_url = "git+https://github.com/RetireNet/Packages/"
26
30
27
- if not getattr (self , "_added_files" , None ) :
28
- self . _added_files , self . _updated_files = self . file_changes (
29
- recursive = True , file_ext = "json" , subdir = "./Content"
30
- )
31
+ def advisory_data (self ) -> Iterable [ AdvisoryData ] :
32
+ try :
33
+ self . clone ( self . repo_url )
34
+ path = Path ( self . vcs_response . dest_dir )
31
35
32
- def updated_advisories ( self ) -> Set [ AdvisoryData ]:
33
- files = self . _updated_files . union ( self . _added_files )
34
- advisories = []
35
- for f in files :
36
- processed_data = self . process_file ( f )
37
- if processed_data :
38
- advisories . append ( processed_data )
39
- return self .batch_advisories ( advisories )
36
+ vuln = path / "Content"
37
+ for file in vuln . glob ( "*.json" ):
38
+ advisory = self . process_file ( file )
39
+ if advisory :
40
+ yield advisory
41
+ finally :
42
+ if self . vcs_response :
43
+ self .vcs_response . delete ( )
40
44
41
45
@staticmethod
42
46
def vuln_id_from_desc (desc ):
@@ -50,33 +54,40 @@ def vuln_id_from_desc(desc):
50
54
def process_file (self , path ) -> List [AdvisoryData ]:
51
55
with open (path ) as f :
52
56
json_doc = json .load (f )
53
- if self .vuln_id_from_desc (json_doc ["description" ]):
54
- vuln_id = self .vuln_id_from_desc (json_doc ["description" ])
55
- else :
56
- return
57
-
57
+ description = json_doc .get ("description" ) or ""
58
+ alias = self .vuln_id_from_desc (description )
58
59
affected_packages = []
59
- for pkg in json_doc ["packages" ]:
60
+ for pkg in json_doc .get ("packages" ) or []:
61
+ name = pkg .get ("id" )
62
+ if not name :
63
+ continue
64
+ affected_version_range = None
65
+ fixed_version = None
66
+ if pkg .get ("affected" ):
67
+ affected_version_range = NugetVersionRange .from_versions ([pkg ["affected" ]])
68
+ if pkg .get ("fix" ):
69
+ fixed_version = NugetVersion (pkg ["fix" ])
70
+ if not affected_version_range and not fixed_version :
71
+ continue
60
72
affected_packages .append (
61
73
AffectedPackage (
62
- vulnerable_package = PackageURL (
63
- name = pkg ["id" ], version = pkg ["affected" ], type = "nuget"
64
- ),
65
- patched_package = PackageURL (
66
- name = pkg ["id" ], version = pkg ["fix" ], type = "nuget"
67
- ),
74
+ package = PackageURL (name = name , type = "nuget" ),
75
+ affected_version_range = affected_version_range ,
76
+ fixed_version = fixed_version ,
68
77
)
69
78
)
70
79
71
- vuln_reference = [
72
- Reference (
73
- url = json_doc ["link" ],
80
+ link = json_doc .get ("link" )
81
+ if link :
82
+ vuln_reference = [
83
+ Reference (
84
+ url = link ,
85
+ )
86
+ ]
87
+ if alias :
88
+ return AdvisoryData (
89
+ aliases = [alias ],
90
+ summary = description ,
91
+ affected_packages = affected_packages ,
92
+ references = vuln_reference ,
74
93
)
75
- ]
76
-
77
- return AdvisoryData (
78
- vulnerability_id = vuln_id ,
79
- summary = json_doc ["description" ],
80
- affected_packages = affected_packages ,
81
- references = vuln_reference ,
82
- )
0 commit comments