diff --git a/config/kube_config.py b/config/kube_config.py index 8d36197e..386b82c1 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -221,13 +221,21 @@ def _load_auth_provider_token(self): if provider['name'] == 'oidc': return self._load_oid_token(provider) + def _azure_is_expired(self, provider): + expires_on = provider['config']['expires-on'] + if expires_on.isdigit(): + return int(expires_on) < time.time() + else: + exp_time = time.strptime(expires_on, '%Y-%m-%d %H:%M:%S.%f') + return exp_time < time.gmtime() + def _load_azure_token(self, provider): if 'config' not in provider: return if 'access-token' not in provider['config']: return if 'expires-on' in provider['config']: - if int(provider['config']['expires-on']) < time.gmtime(): + if self._azure_is_expired(provider): self._refresh_azure_token(provider['config']) self.token = 'Bearer %s' % provider['config']['access-token'] return self.token diff --git a/config/kube_config_test.py b/config/kube_config_test.py index d89a2a50..04f6b11e 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -130,6 +130,10 @@ def _raise_exception(st): TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH) +TEST_AZURE_LOGIN = TEST_OIDC_LOGIN +TEST_AZURE_TOKEN = "test-azure-token" +TEST_AZURE_TOKEN_FULL = "Bearer " + TEST_AZURE_TOKEN + class BaseTestCase(unittest.TestCase): @@ -420,6 +424,41 @@ class TestKubeConfigLoader(BaseTestCase): "user": "oidc" } }, + { + "name": "azure", + "context": { + "cluster": "default", + "user": "azure" + } + }, + { + "name": "azure_num", + "context": { + "cluster": "default", + "user": "azure_num" + } + }, + { + "name": "azure_str", + "context": { + "cluster": "default", + "user": "azure_str" + } + }, + { + "name": "azure_num_error", + "context": { + "cluster": "default", + "user": "azure_str_error" + } + }, + { + "name": "azure_str_error", + "context": { + "cluster": "default", + "user": "azure_str_error" + } + }, { "name": "expired_oidc", "context": { @@ -603,6 +642,89 @@ class TestKubeConfigLoader(BaseTestCase): } } }, + { + "name": "azure", + "user": { + "auth-provider": { + "config": { + "access-token": TEST_AZURE_TOKEN, + "apiserver-id": "ApiserverId", + "environment": "AzurePublicCloud", + "refresh-token": "refreshToken", + "tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433" + }, + "name": "azure" + } + } + }, + { + "name": "azure_num", + "user": { + "auth-provider": { + "config": { + "access-token": TEST_AZURE_TOKEN, + "apiserver-id": "ApiserverId", + "environment": "AzurePublicCloud", + "expires-in": "0", + "expires-on": "156207275", + "refresh-token": "refreshToken", + "tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433" + }, + "name": "azure" + } + } + }, + { + "name": "azure_str", + "user": { + "auth-provider": { + "config": { + "access-token": TEST_AZURE_TOKEN, + "apiserver-id": "ApiserverId", + "environment": "AzurePublicCloud", + "expires-in": "0", + "expires-on": "2018-10-18 00:52:29.044727", + "refresh-token": "refreshToken", + "tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433" + }, + "name": "azure" + } + } + }, + { + "name": "azure_str_error", + "user": { + "auth-provider": { + "config": { + "access-token": TEST_AZURE_TOKEN, + "apiserver-id": "ApiserverId", + "environment": "AzurePublicCloud", + "expires-in": "0", + "expires-on": "2018-10-18 00:52", + "refresh-token": "refreshToken", + "tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433" + }, + "name": "azure" + } + } + }, + { + "name": "azure_num_error", + "user": { + "auth-provider": { + "config": { + "access-token": TEST_AZURE_TOKEN, + "apiserver-id": "ApiserverId", + "environment": "AzurePublicCloud", + "expires-in": "0", + "expires-on": "-1", + "refresh-token": "refreshToken", + "tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433" + }, + "name": "azure" + } + } + }, { "name": "expired_oidc", "user": { @@ -886,6 +1008,46 @@ def test_oidc_fails_if_invalid_padding_length(self): None, ) + def test_azure_no_refresh(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="azure", + ) + self.assertTrue(loader._load_auth_provider_token()) + self.assertEqual(TEST_AZURE_TOKEN_FULL, loader.token) + + def test_azure_with_expired_num(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="azure_num", + ) + provider = loader._user['auth-provider'] + self.assertTrue(loader._azure_is_expired(provider)) + + def test_azure_with_expired_str(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="azure_str", + ) + provider = loader._user['auth-provider'] + self.assertTrue(loader._azure_is_expired(provider)) + + def test_azure_with_expired_str_error(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="azure_str_error", + ) + provider = loader._user['auth-provider'] + self.assertRaises(ValueError, loader._azure_is_expired, provider) + + def test_azure_with_expired_int_error(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="azure_num_error", + ) + provider = loader._user['auth-provider'] + self.assertRaises(ValueError, loader._azure_is_expired, provider) + def test_user_pass(self): expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) actual = FakeConfig()