5
5
import logging
6
6
import os
7
7
import tempfile
8
+ from dataclasses import dataclass
8
9
from functools import partial
9
10
10
11
from packageurl import PackageURL
28
29
logger : logging .Logger = logging .getLogger (__name__ )
29
30
30
31
32
+ @dataclass (frozen = True )
33
+ class ProvenanceAsset :
34
+ """This class exists to hold a provenance payload with the original asset's name and URL."""
35
+
36
+ payload : InTotoPayload
37
+ name : str
38
+ url : str
39
+
40
+
31
41
class ProvenanceFinder :
32
42
"""This class is used to find and retrieve provenance files from supported registries."""
33
43
@@ -42,7 +52,7 @@ def __init__(self) -> None:
42
52
elif isinstance (registry , JFrogMavenRegistry ):
43
53
self .jfrog_registry = registry
44
54
45
- def find_provenance (self , purl : PackageURL ) -> list [InTotoPayload ]:
55
+ def find_provenance (self , purl : PackageURL ) -> list [ProvenanceAsset ]:
46
56
"""Find the provenance file(s) of the passed PURL.
47
57
48
58
Parameters
@@ -52,8 +62,8 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
52
62
53
63
Returns
54
64
-------
55
- list[InTotoPayload ]
56
- The provenance payload , or an empty list if not found.
65
+ list[ProvenanceAsset ]
66
+ The provenance asset , or an empty list if not found.
57
67
"""
58
68
logger .debug ("Seeking provenance of: %s" , purl )
59
69
@@ -82,7 +92,7 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
82
92
logger .debug ("Provenance finding not supported for PURL type: %s" , purl .type )
83
93
return []
84
94
85
- def _find_provenance (self , discovery_functions : list [partial [list [InTotoPayload ]]]) -> list [InTotoPayload ]:
95
+ def _find_provenance (self , discovery_functions : list [partial [list [ProvenanceAsset ]]]) -> list [ProvenanceAsset ]:
86
96
"""Find the provenance file(s) using the passed discovery functions.
87
97
88
98
Parameters
@@ -93,7 +103,7 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
93
103
Returns
94
104
-------
95
105
list[InTotoPayload]
96
- The provenance payload (s) from the first successful function, or an empty list if none were.
106
+ The provenance asset (s) from the first successful function, or an empty list if none were.
97
107
"""
98
108
if not discovery_functions :
99
109
return []
@@ -108,7 +118,7 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
108
118
return []
109
119
110
120
111
- def find_npm_provenance (purl : PackageURL , registry : NPMRegistry ) -> list [InTotoPayload ]:
121
+ def find_npm_provenance (purl : PackageURL , registry : NPMRegistry ) -> list [ProvenanceAsset ]:
112
122
"""Find and download the NPM based provenance for the passed PURL.
113
123
114
124
Two kinds of attestation can be retrieved from npm: "Provenance" and "Publish". The "Provenance" attestation
@@ -125,8 +135,8 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP
125
135
126
136
Returns
127
137
-------
128
- list[InTotoPayload ]
129
- The provenance payload (s), or an empty list if not found.
138
+ list[ProvenanceAsset ]
139
+ The provenance asset (s), or an empty list if not found.
130
140
"""
131
141
if not registry .enabled :
132
142
logger .debug ("The npm registry is not enabled." )
@@ -172,16 +182,19 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP
172
182
publish_payload = load_provenance_payload (signed_download_path )
173
183
except LoadIntotoAttestationError as error :
174
184
logger .error ("Error while loading publish attestation: %s" , error )
175
- return [provenance_payload ]
185
+ return [ProvenanceAsset ( provenance_payload , npm_provenance_asset . name , npm_provenance_asset . url ) ]
176
186
177
- return [provenance_payload , publish_payload ]
187
+ return [
188
+ ProvenanceAsset (provenance_payload , npm_provenance_asset .name , npm_provenance_asset .url ),
189
+ ProvenanceAsset (publish_payload , npm_provenance_asset .name , npm_provenance_asset .url ),
190
+ ]
178
191
179
192
except OSError as error :
180
193
logger .error ("Error while storing provenance in the temporary directory: %s" , error )
181
194
return []
182
195
183
196
184
- def find_gav_provenance (purl : PackageURL , registry : JFrogMavenRegistry ) -> list [InTotoPayload ]:
197
+ def find_gav_provenance (purl : PackageURL , registry : JFrogMavenRegistry ) -> list [ProvenanceAsset ]:
185
198
"""Find and download the GAV based provenance for the passed PURL.
186
199
187
200
Parameters
@@ -193,8 +206,8 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
193
206
194
207
Returns
195
208
-------
196
- list[InTotoPayload ] | None
197
- The provenance payload if found, or an empty list otherwise.
209
+ list[ProvenanceAsset ] | None
210
+ The provenance asset if found, or an empty list otherwise.
198
211
199
212
Raises
200
213
------
@@ -263,7 +276,7 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
263
276
if not is_witness_provenance_payload (provenance_payload , witness_verifier_config .predicate_types ):
264
277
continue
265
278
266
- provenances .append (provenance_payload )
279
+ provenances .append (ProvenanceAsset ( provenance_payload , provenance_asset . name , provenance_asset . url ) )
267
280
except OSError as error :
268
281
logger .error ("Error while storing provenance in the temporary directory: %s" , error )
269
282
@@ -277,7 +290,7 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
277
290
278
291
def find_provenance_from_ci (
279
292
analyze_ctx : AnalyzeContext , git_obj : Git | None , download_path : str
280
- ) -> InTotoPayload | None :
293
+ ) -> ProvenanceAsset | None :
281
294
"""Try to find provenance from CI services of the repository.
282
295
283
296
Note that we stop going through the CI services once we encounter a CI service
@@ -372,7 +385,10 @@ def find_provenance_from_ci(
372
385
download_provenances_from_ci_service (ci_info , download_path )
373
386
374
387
# TODO consider how to handle multiple payloads here.
375
- return ci_info ["provenances" ][0 ].payload if ci_info ["provenances" ] else None
388
+ if ci_info ["provenances" ]:
389
+ provenance = ci_info ["provenances" ][0 ]
390
+ return ProvenanceAsset (provenance .payload , provenance .asset .name , provenance .asset .url )
391
+ return None
376
392
377
393
else :
378
394
logger .debug ("CI service not supported for provenance finding: %s" , ci_service .name )
0 commit comments