|
13 | 13 | from cryptography.hazmat.backends import default_backend
|
14 | 14 | from cryptography.hazmat.primitives import hashes
|
15 | 15 | from cryptography.hazmat.primitives.asymmetric import padding
|
| 16 | +from msal import TokenCache |
16 | 17 | import pytest
|
17 | 18 | from six.moves.urllib_parse import urlparse
|
18 | 19 |
|
@@ -135,9 +136,107 @@ def validate_jwt(request, client_id, pem_bytes):
|
135 | 136 | deserialized_header = json.loads(header.decode("utf-8"))
|
136 | 137 | assert deserialized_header["alg"] == "RS256"
|
137 | 138 | assert deserialized_header["typ"] == "JWT"
|
138 |
| - assert urlsafeb64_decode(deserialized_header["x5t"]) == cert.fingerprint(hashes.SHA1()) #nosec |
| 139 | + assert urlsafeb64_decode(deserialized_header["x5t"]) == cert.fingerprint(hashes.SHA1()) # nosec |
139 | 140 |
|
140 | 141 | assert claims["aud"] == request.url
|
141 | 142 | assert claims["iss"] == claims["sub"] == client_id
|
142 | 143 |
|
143 | 144 | cert.public_key().verify(signature, signed_part.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256())
|
| 145 | + |
| 146 | + |
| 147 | +@pytest.mark.parametrize("cert_path,cert_password", BOTH_CERTS) |
| 148 | +def test_enable_persistent_cache(cert_path, cert_password): |
| 149 | + """the credential should use the persistent cache only when given enable_persistent_cache=True""" |
| 150 | + |
| 151 | + persistent_cache = "azure.identity._internal.persistent_cache" |
| 152 | + required_arguments = ("tenant-id", "client-id", cert_path) |
| 153 | + |
| 154 | + # credential should default to an in memory cache |
| 155 | + raise_when_called = Mock(side_effect=Exception("credential shouldn't attempt to load a persistent cache")) |
| 156 | + with patch(persistent_cache + "._load_persistent_cache", raise_when_called): |
| 157 | + CertificateCredential(*required_arguments, password=cert_password) |
| 158 | + |
| 159 | + # allowing an unencrypted cache doesn't count as opting in to the persistent cache |
| 160 | + CertificateCredential(*required_arguments, password=cert_password, allow_unencrypted_cache=True) |
| 161 | + |
| 162 | + # keyword argument opts in to persistent cache |
| 163 | + with patch(persistent_cache + ".msal_extensions") as mock_extensions: |
| 164 | + CertificateCredential(*required_arguments, password=cert_password, enable_persistent_cache=True) |
| 165 | + assert mock_extensions.PersistedTokenCache.call_count == 1 |
| 166 | + |
| 167 | + # opting in on an unsupported platform raises an exception |
| 168 | + with patch(persistent_cache + ".sys.platform", "commodore64"): |
| 169 | + with pytest.raises(NotImplementedError): |
| 170 | + CertificateCredential(*required_arguments, password=cert_password, enable_persistent_cache=True) |
| 171 | + with pytest.raises(NotImplementedError): |
| 172 | + CertificateCredential( |
| 173 | + *required_arguments, password=cert_password, enable_persistent_cache=True, allow_unencrypted_cache=True |
| 174 | + ) |
| 175 | + |
| 176 | + |
| 177 | +@patch("azure.identity._internal.persistent_cache.sys.platform", "linux2") |
| 178 | +@patch("azure.identity._internal.persistent_cache.msal_extensions") |
| 179 | +@pytest.mark.parametrize("cert_path,cert_password", BOTH_CERTS) |
| 180 | +def test_persistent_cache_linux(mock_extensions, cert_path, cert_password): |
| 181 | + """The credential should use an unencrypted cache when encryption is unavailable and the user explicitly opts in. |
| 182 | +
|
| 183 | + This test was written when Linux was the only platform on which encryption may not be available. |
| 184 | + """ |
| 185 | + |
| 186 | + required_arguments = ("tenant-id", "client-id", cert_path) |
| 187 | + |
| 188 | + # the credential should prefer an encrypted cache even when the user allows an unencrypted one |
| 189 | + CertificateCredential( |
| 190 | + *required_arguments, password=cert_password, enable_persistent_cache=True, allow_unencrypted_cache=True |
| 191 | + ) |
| 192 | + assert mock_extensions.PersistedTokenCache.called_with(mock_extensions.LibsecretPersistence) |
| 193 | + mock_extensions.PersistedTokenCache.reset_mock() |
| 194 | + |
| 195 | + # (when LibsecretPersistence's dependencies aren't available, constructing it raises ImportError) |
| 196 | + mock_extensions.LibsecretPersistence = Mock(side_effect=ImportError) |
| 197 | + |
| 198 | + # encryption unavailable, no opt in to unencrypted cache -> credential should raise |
| 199 | + with pytest.raises(ValueError): |
| 200 | + CertificateCredential(*required_arguments, password=cert_password, enable_persistent_cache=True) |
| 201 | + |
| 202 | + CertificateCredential( |
| 203 | + *required_arguments, password=cert_password, enable_persistent_cache=True, allow_unencrypted_cache=True |
| 204 | + ) |
| 205 | + assert mock_extensions.PersistedTokenCache.called_with(mock_extensions.FilePersistence) |
| 206 | + |
| 207 | + |
| 208 | +@pytest.mark.parametrize("cert_path,cert_password", BOTH_CERTS) |
| 209 | +def test_persistent_cache_multiple_clients(cert_path, cert_password): |
| 210 | + """the credential shouldn't use tokens issued to other service principals""" |
| 211 | + |
| 212 | + access_token_a = "token a" |
| 213 | + access_token_b = "not " + access_token_a |
| 214 | + transport_a = validating_transport( |
| 215 | + requests=[Request()], responses=[mock_response(json_payload=build_aad_response(access_token=access_token_a))] |
| 216 | + ) |
| 217 | + transport_b = validating_transport( |
| 218 | + requests=[Request()], responses=[mock_response(json_payload=build_aad_response(access_token=access_token_b))] |
| 219 | + ) |
| 220 | + |
| 221 | + cache = TokenCache() |
| 222 | + with patch("azure.identity._internal.persistent_cache._load_persistent_cache") as mock_cache_loader: |
| 223 | + mock_cache_loader.return_value = Mock(wraps=cache) |
| 224 | + credential_a = CertificateCredential( |
| 225 | + "tenant", "client-a", cert_path, password=cert_password, enable_persistent_cache=True, transport=transport_a |
| 226 | + ) |
| 227 | + assert mock_cache_loader.call_count == 1, "credential should load the persistent cache" |
| 228 | + credential_b = CertificateCredential( |
| 229 | + "tenant", "client-b", cert_path, password=cert_password, enable_persistent_cache=True, transport=transport_b |
| 230 | + ) |
| 231 | + assert mock_cache_loader.call_count == 2, "credential should load the persistent cache" |
| 232 | + |
| 233 | + # A caches a token |
| 234 | + scope = "scope" |
| 235 | + token_a = credential_a.get_token(scope) |
| 236 | + assert token_a.token == access_token_a |
| 237 | + assert transport_a.send.call_count == 1 |
| 238 | + |
| 239 | + # B should get a different token for the same scope |
| 240 | + token_b = credential_b.get_token(scope) |
| 241 | + assert token_b.token == access_token_b |
| 242 | + assert transport_b.send.call_count == 1 |
0 commit comments