|
1 | 1 | import functools
|
2 | 2 | import sys
|
3 |
| -from typing import Any, Iterable, List, Optional, Tuple |
| 3 | +from typing import Any, Dict, Iterable, List, Optional, Tuple |
4 | 4 |
|
5 | 5 | import pytest
|
6 | 6 |
|
@@ -334,3 +334,140 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non
|
334 | 334 | url, allow_netrc=False, allow_keyring=True
|
335 | 335 | ) == (None, None)
|
336 | 336 | assert keyring_broken._call_count == 1
|
| 337 | + |
| 338 | + |
| 339 | +class KeyringSubprocessResult(KeyringModuleV1): |
| 340 | + """Represents the subprocess call to keyring""" |
| 341 | + |
| 342 | + returncode = 0 # Default to zero retcode |
| 343 | + |
| 344 | + def __call__( |
| 345 | + self, |
| 346 | + cmd: List[str], |
| 347 | + *, |
| 348 | + env: Dict[str, str], |
| 349 | + stdin: Optional[Any] = None, |
| 350 | + capture_output: Optional[bool] = None, |
| 351 | + input: Optional[bytes] = None, |
| 352 | + ) -> Any: |
| 353 | + if cmd[1] == "get": |
| 354 | + assert stdin == -3 # subprocess.DEVNULL |
| 355 | + assert capture_output is True |
| 356 | + assert env["PYTHONIOENCODING"] == "utf-8" |
| 357 | + |
| 358 | + password = self.get_password(*cmd[2:]) |
| 359 | + if password is None: |
| 360 | + self.returncode = 1 |
| 361 | + else: |
| 362 | + self.stdout = password.encode("utf-8") + b"\n" |
| 363 | + |
| 364 | + if cmd[1] == "set": |
| 365 | + assert stdin is None |
| 366 | + assert capture_output is None |
| 367 | + assert env["PYTHONIOENCODING"] == "utf-8" |
| 368 | + assert input is not None |
| 369 | + |
| 370 | + self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip("\n")) |
| 371 | + |
| 372 | + return self |
| 373 | + |
| 374 | + def check_returncode(self) -> None: |
| 375 | + if self.returncode: |
| 376 | + raise Exception() |
| 377 | + |
| 378 | + |
| 379 | +@pytest.mark.parametrize( |
| 380 | + "url, expect", |
| 381 | + ( |
| 382 | + ("http://example.com/path1", (None, None)), |
| 383 | + # path1 URLs will be resolved by netloc |
| 384 | + ("http://[email protected]/path1", ("user", "user!netloc")), |
| 385 | + ("http://[email protected]/path1", ("user2", "user2!netloc")), |
| 386 | + # path2 URLs will be resolved by index URL |
| 387 | + ("http://example.com/path2/path3", (None, None)), |
| 388 | + ("http://[email protected]/path2/path3", ("foo", "foo!url")), |
| 389 | + ), |
| 390 | +) |
| 391 | +def test_keyring_cli_get_password( |
| 392 | + monkeypatch: pytest.MonkeyPatch, |
| 393 | + url: str, |
| 394 | + expect: Tuple[Optional[str], Optional[str]], |
| 395 | +) -> None: |
| 396 | + monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") |
| 397 | + monkeypatch.setattr( |
| 398 | + pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult() |
| 399 | + ) |
| 400 | + auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) |
| 401 | + |
| 402 | + actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) |
| 403 | + assert actual == expect |
| 404 | + |
| 405 | + |
| 406 | +@pytest.mark.parametrize( |
| 407 | + "response_status, creds, expect_save", |
| 408 | + ( |
| 409 | + (403, ("user", "pass", True), False), |
| 410 | + ( |
| 411 | + 200, |
| 412 | + ("user", "pass", True), |
| 413 | + True, |
| 414 | + ), |
| 415 | + ( |
| 416 | + 200, |
| 417 | + ("user", "pass", False), |
| 418 | + False, |
| 419 | + ), |
| 420 | + ), |
| 421 | +) |
| 422 | +def test_keyring_cli_set_password( |
| 423 | + monkeypatch: pytest.MonkeyPatch, |
| 424 | + response_status: int, |
| 425 | + creds: Tuple[str, str, bool], |
| 426 | + expect_save: bool, |
| 427 | +) -> None: |
| 428 | + monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") |
| 429 | + keyring = KeyringSubprocessResult() |
| 430 | + monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring) |
| 431 | + auth = MultiDomainBasicAuth(prompting=True) |
| 432 | + monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) |
| 433 | + monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) |
| 434 | + if creds[2]: |
| 435 | + # when _prompt_for_password indicates to save, we should save |
| 436 | + def should_save_password_to_keyring(*a: Any) -> bool: |
| 437 | + return True |
| 438 | + |
| 439 | + else: |
| 440 | + # when _prompt_for_password indicates not to save, we should |
| 441 | + # never call this function |
| 442 | + def should_save_password_to_keyring(*a: Any) -> bool: |
| 443 | + assert False, "_should_save_password_to_keyring should not be called" |
| 444 | + |
| 445 | + monkeypatch.setattr( |
| 446 | + auth, "_should_save_password_to_keyring", should_save_password_to_keyring |
| 447 | + ) |
| 448 | + |
| 449 | + req = MockRequest("https://example.com") |
| 450 | + resp = MockResponse(b"") |
| 451 | + resp.url = req.url |
| 452 | + connection = MockConnection() |
| 453 | + |
| 454 | + def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse: |
| 455 | + assert sent_req is req |
| 456 | + assert "Authorization" in sent_req.headers |
| 457 | + r = MockResponse(b"") |
| 458 | + r.status_code = response_status |
| 459 | + return r |
| 460 | + |
| 461 | + # https://github.com/python/mypy/issues/2427 |
| 462 | + connection._send = _send # type: ignore[assignment] |
| 463 | + |
| 464 | + resp.request = req |
| 465 | + resp.status_code = 401 |
| 466 | + resp.connection = connection |
| 467 | + |
| 468 | + auth.handle_401(resp) |
| 469 | + |
| 470 | + if expect_save: |
| 471 | + assert keyring.saved_passwords == [("example.com", creds[0], creds[1])] |
| 472 | + else: |
| 473 | + assert keyring.saved_passwords == [] |
0 commit comments