|
6 | 6 | import time
|
7 | 7 |
|
8 | 8 | import azure.core
|
9 |
| -from azure.core.credentials import AccessToken |
| 9 | +from azure.core.credentials import AccessToken, AzureKeyCredential |
10 | 10 | from azure.core.exceptions import ServiceRequestError
|
11 | 11 | from azure.core.pipeline import Pipeline
|
12 |
| -from azure.core.pipeline.policies import BearerTokenCredentialPolicy, SansIOHTTPPolicy |
| 12 | +from azure.core.pipeline.policies import BearerTokenCredentialPolicy, SansIOHTTPPolicy, AzureKeyCredentialPolicy |
13 | 13 | from azure.core.pipeline.transport import HttpRequest
|
14 | 14 |
|
15 | 15 | import pytest
|
@@ -146,3 +146,39 @@ def test_key_vault_regression():
|
146 | 146 | policy._token = AccessToken(token, time.time() + 3600)
|
147 | 147 | assert not policy._need_new_token
|
148 | 148 | assert policy._token.token == token
|
| 149 | + |
| 150 | +def test_azure_key_credential_policy(): |
| 151 | + """Tests to see if we can create an AzureKeyCredentialPolicy""" |
| 152 | + def verify_authorization_header(request): |
| 153 | + assert request.http_request.headers["api_key"] == "test_key" |
| 154 | + api_key = "test_key" |
| 155 | + key_header = "api_key" |
| 156 | + |
| 157 | + transport=Mock(send=verify_authorization_header) |
| 158 | + credential = AzureKeyCredential(api_key) |
| 159 | + credential_policy = AzureKeyCredentialPolicy(credential=credential, name=key_header) |
| 160 | + pipeline = Pipeline(transport=Mock(), policies=[credential_policy]) |
| 161 | + |
| 162 | + pipeline.run(HttpRequest("GET", "https://test_key_credential")) |
| 163 | + |
| 164 | +def test_azure_key_credential_policy_raises(): |
| 165 | + """Tests AzureKeyCredential and AzureKeyCredentialPolicy raises with non-string input parameters.""" |
| 166 | + api_key = 1234 |
| 167 | + key_header = 5678 |
| 168 | + with pytest.raises(TypeError): |
| 169 | + credential = AzureKeyCredential(api_key) |
| 170 | + |
| 171 | + credential = AzureKeyCredential(str(api_key)) |
| 172 | + with pytest.raises(TypeError): |
| 173 | + credential_policy = AzureKeyCredentialPolicy(credential=credential, name=key_header) |
| 174 | + |
| 175 | +def test_azure_key_credential_updates(): |
| 176 | + """Tests AzureKeyCredential updates""" |
| 177 | + api_key = "original" |
| 178 | + |
| 179 | + credential = AzureKeyCredential(api_key) |
| 180 | + assert credential.key == "original" |
| 181 | + |
| 182 | + api_key = "new" |
| 183 | + credential.update(api_key) |
| 184 | + assert credential.key == "new" |
0 commit comments