|
10 | 10 | # See the License for the specific language governing permissions and
|
11 | 11 | # limitations under the License.
|
12 | 12 | import json
|
| 13 | +import re |
13 | 14 |
|
14 | 15 | import pretend
|
15 | 16 | import pytest
|
@@ -132,28 +133,74 @@ def test_parse_attestations_fails_malformed_attestation(self, metrics, db_reques
|
132 | 133 | in metrics.increment.calls
|
133 | 134 | )
|
134 | 135 |
|
135 |
| - def test_parse_attestations_fails_multiple_attestations( |
| 136 | + def test_parse_attestations_fails_multiple_attestations_exceeds_limit( |
136 | 137 | self, metrics, db_request, dummy_attestation
|
137 | 138 | ):
|
138 | 139 | integrity_service = services.IntegrityService(
|
139 | 140 | metrics=metrics,
|
140 | 141 | session=db_request.db,
|
141 | 142 | )
|
142 | 143 |
|
| 144 | + max_attestations = len(services.SUPPORTED_ATTESTATION_TYPES) |
| 145 | + |
143 | 146 | db_request.oidc_publisher = pretend.stub(attestation_identity=pretend.stub())
|
144 | 147 | db_request.POST["attestations"] = TypeAdapter(list[Attestation]).dump_json(
|
145 |
| - [dummy_attestation, dummy_attestation] |
| 148 | + [dummy_attestation] * (max_attestations + 1) |
146 | 149 | )
|
147 | 150 | with pytest.raises(
|
148 |
| - AttestationUploadError, match="Only a single attestation per file" |
| 151 | + AttestationUploadError, |
| 152 | + match=f"A maximum of {max_attestations} attestations per file are " |
| 153 | + f"supported", |
149 | 154 | ):
|
150 | 155 | integrity_service.parse_attestations(
|
151 | 156 | db_request,
|
152 | 157 | pretend.stub(),
|
153 | 158 | )
|
154 | 159 |
|
155 | 160 | assert (
|
156 |
| - pretend.call("warehouse.upload.attestations.failed_multiple_attestations") |
| 161 | + pretend.call( |
| 162 | + "warehouse.upload.attestations.failed_limit_multiple_attestations" |
| 163 | + ) |
| 164 | + in metrics.increment.calls |
| 165 | + ) |
| 166 | + |
| 167 | + def test_parse_attestations_fails_multiple_attestations_same_predicate( |
| 168 | + self, metrics, monkeypatch, db_request, dummy_attestation |
| 169 | + ): |
| 170 | + integrity_service = services.IntegrityService( |
| 171 | + metrics=metrics, |
| 172 | + session=db_request.db, |
| 173 | + ) |
| 174 | + max_attestations = len(services.SUPPORTED_ATTESTATION_TYPES) |
| 175 | + db_request.oidc_publisher = pretend.stub( |
| 176 | + attestation_identity=pretend.stub(), |
| 177 | + ) |
| 178 | + db_request.oidc_claims = {"sha": "somesha"} |
| 179 | + db_request.POST["attestations"] = TypeAdapter(list[Attestation]).dump_json( |
| 180 | + [dummy_attestation] * max_attestations |
| 181 | + ) |
| 182 | + |
| 183 | + monkeypatch.setattr(Verifier, "production", lambda: pretend.stub()) |
| 184 | + monkeypatch.setattr( |
| 185 | + Attestation, "verify", lambda *args: (AttestationType.PYPI_PUBLISH_V1, {}) |
| 186 | + ) |
| 187 | + |
| 188 | + with pytest.raises( |
| 189 | + AttestationUploadError, |
| 190 | + match=re.escape( |
| 191 | + "Multiple attestations for the same file with the same predicate " |
| 192 | + "type (https://docs.pypi.org/attestations/publish/v1) are not supported" |
| 193 | + ), |
| 194 | + ): |
| 195 | + integrity_service.parse_attestations( |
| 196 | + db_request, |
| 197 | + pretend.stub(), |
| 198 | + ) |
| 199 | + |
| 200 | + assert ( |
| 201 | + pretend.call( |
| 202 | + "warehouse.upload.attestations.failed_duplicate_predicate_type" |
| 203 | + ) |
157 | 204 | in metrics.increment.calls
|
158 | 205 | )
|
159 | 206 |
|
|
0 commit comments