forked from kubernetes-client/python-base
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathincluster_config.py
111 lines (89 loc) · 4.37 KB
/
incluster_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import datetime
from kubernetes.client import Configuration
from .config_exception import ConfigException
SERVICE_HOST_ENV_NAME = "KUBERNETES_SERVICE_HOST"
SERVICE_PORT_ENV_NAME = "KUBERNETES_SERVICE_PORT"
SERVICE_TOKEN_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/token"
SERVICE_CERT_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
def _join_host_port(host, port):
"""Adapted golang's net.JoinHostPort"""
template = "%s:%s"
host_requires_bracketing = ':' in host or '%' in host
if host_requires_bracketing:
template = "[%s]:%s"
return template % (host, port)
class InClusterConfigLoader(object):
def __init__(self, token_filename,
cert_filename, environ=os.environ):
self._token_filename = token_filename
self._cert_filename = cert_filename
self._environ = environ
self._token_refresh_period = datetime.timedelta(minutes=1)
def load_and_set(self, refresh_token=True):
self._load_config()
self._set_config(refresh_token=refresh_token)
def _load_config(self):
if (SERVICE_HOST_ENV_NAME not in self._environ or
SERVICE_PORT_ENV_NAME not in self._environ):
raise ConfigException("Service host/port is not set.")
if (not self._environ[SERVICE_HOST_ENV_NAME] or
not self._environ[SERVICE_PORT_ENV_NAME]):
raise ConfigException("Service host/port is set but empty.")
self.host = (
"https://" + _join_host_port(self._environ[SERVICE_HOST_ENV_NAME],
self._environ[SERVICE_PORT_ENV_NAME]))
if not os.path.isfile(self._token_filename):
raise ConfigException("Service token file does not exists.")
self._read_token_file()
if not os.path.isfile(self._cert_filename):
raise ConfigException(
"Service certification file does not exists.")
with open(self._cert_filename) as f:
if not f.read():
raise ConfigException("Cert file exists but empty.")
self.ssl_ca_cert = self._cert_filename
def _set_config(self, refresh_token):
configuration = Configuration()
configuration.host = self.host
configuration.ssl_ca_cert = self.ssl_ca_cert
configuration.api_key['authorization'] = "bearer " + self.token
Configuration.set_default(configuration)
if not refresh_token:
return
def wrap(f):
in_cluster_config = self
def wrapped(self, identifier):
if identifier == 'authorization' and identifier in self.api_key and in_cluster_config.token_expires_at <= datetime.datetime.now():
in_cluster_config._read_token_file()
self.api_key[identifier] = "bearer " + in_cluster_config.token
return f(self, identifier)
return wrapped
Configuration.get_api_key_with_prefix = wrap(Configuration.get_api_key_with_prefix)
def _read_token_file(self):
with open(self._token_filename) as f:
self.token = f.read()
self.token_expires_at = datetime.datetime.now() + self._token_refresh_period
if not self.token:
raise ConfigException("Token file exists but empty.")
def load_incluster_config(refresh_token=True):
"""
Use the service account kubernetes gives to pods to connect to kubernetes
cluster. It's intended for clients that expect to be running inside a pod
running on kubernetes. It will raise an exception if called from a process
not running in a kubernetes environment."""
InClusterConfigLoader(token_filename=SERVICE_TOKEN_FILENAME,
cert_filename=SERVICE_CERT_FILENAME).load_and_set(refresh_token=refresh_token)