21
21
parent = Path (__file__ ).resolve ().parents [1 ]
22
22
sys .path .append (str (parent ) + "/src" )
23
23
24
- from kubernetes import client
24
+ from kubernetes import client , config
25
25
from codeflare_sdk .cluster .awload import AWManager
26
26
from codeflare_sdk .cluster .cluster import (
27
27
Cluster ,
35
35
)
36
36
from codeflare_sdk .cluster .auth import (
37
37
TokenAuthentication ,
38
- PasswordUserAuthentication ,
39
38
Authentication ,
39
+ KubeConfigFileAuthentication
40
40
)
41
41
from codeflare_sdk .utils .pretty_print import (
42
42
print_no_resources_found ,
65
65
)
66
66
67
67
import openshift
68
- from openshift import OpenShiftPythonException
69
68
from openshift .selector import Selector
70
69
import ray
71
70
from torchx .specs import AppDryRunInfo , AppDef
@@ -89,27 +88,8 @@ def att_side_effect(self):
89
88
return self .high_level_operation
90
89
91
90
92
- def att_side_effect_tls (self ):
93
- if "--insecure-skip-tls-verify" in self .high_level_operation [1 ]:
94
- return self .high_level_operation
95
- else :
96
- raise OpenShiftPythonException (
97
- "The server uses a certificate signed by unknown authority"
98
- )
99
-
100
-
101
91
def test_token_auth_creation ():
102
92
try :
103
- token_auth = TokenAuthentication ()
104
- assert token_auth .token == None
105
- assert token_auth .server == None
106
- assert token_auth .skip_tls == False
107
-
108
- token_auth = TokenAuthentication ("token" )
109
- assert token_auth .token == "token"
110
- assert token_auth .server == None
111
- assert token_auth .skip_tls == False
112
-
113
93
token_auth = TokenAuthentication ("token" , "server" )
114
94
assert token_auth .token == "token"
115
95
assert token_auth .server == "server"
@@ -130,79 +110,62 @@ def test_token_auth_creation():
130
110
assert token_auth .server == "server"
131
111
assert token_auth .skip_tls == True
132
112
113
+ token_auth = TokenAuthentication (token = "token" , server = "server" , skip_tls = False )
114
+ assert token_auth .token == "token"
115
+ assert token_auth .server == "server"
116
+ assert token_auth .skip_tls == False
117
+
118
+ token_auth = TokenAuthentication (token = "token" , server = "server" , skip_tls = False , ca_cert_path = "path/to/cert" )
119
+ assert token_auth .token == "token"
120
+ assert token_auth .server == "server"
121
+ assert token_auth .skip_tls == False
122
+ assert token_auth .ca_cert_path == "path/to/cert"
123
+
133
124
except Exception :
134
125
assert 0 == 1
135
126
136
127
137
128
def test_token_auth_login_logout (mocker ):
138
- mocker .patch ("openshift.invoke" , side_effect = arg_side_effect )
139
- mock_res = mocker .patch .object (openshift .Result , "out" )
140
- mock_res .side_effect = lambda : att_side_effect (fake_res )
129
+ mocker .patch .object (client , 'ApiClient' )
141
130
142
131
token_auth = TokenAuthentication (token = "testtoken" , server = "testserver:6443" )
143
132
assert token_auth .login () == (
144
- "login" ,
145
- ["--token=testtoken" , "--server=testserver:6443" ],
133
+ "Logged into testserver:6443"
146
134
)
147
135
assert token_auth .logout () == (
148
- "logout" ,
149
- ["--token=testtoken" , "--server=testserver:6443" ],
136
+ "Successfully logged out of testserver:6443"
150
137
)
151
138
152
139
153
140
def test_token_auth_login_tls (mocker ):
154
- mocker .patch ("openshift.invoke" , side_effect = arg_side_effect )
155
- mock_res = mocker .patch .object (openshift .Result , "out" )
156
- mock_res .side_effect = lambda : att_side_effect_tls (fake_res )
157
-
158
- # FIXME - Pytest mocker not allowing caught exception
159
- # token_auth = TokenAuthentication(token="testtoken", server="testserver")
160
- # assert token_auth.login() == "Error: certificate auth failure, please set `skip_tls=True` in TokenAuthentication"
141
+ mocker .patch .object (client , 'ApiClient' )
161
142
162
143
token_auth = TokenAuthentication (
163
144
token = "testtoken" , server = "testserver:6443" , skip_tls = True
164
145
)
165
146
assert token_auth .login () == (
166
- "login" ,
167
- ["--token=testtoken" , "--server=testserver:6443" , "--insecure-skip-tls-verify" ],
147
+ "Logged into testserver:6443"
148
+ )
149
+ token_auth = TokenAuthentication (
150
+ token = "testtoken" , server = "testserver:6443" , skip_tls = False
151
+ )
152
+ assert token_auth .login () == (
153
+ "Logged into testserver:6443"
154
+ )
155
+ token_auth = TokenAuthentication (
156
+ token = "testtoken" , server = "testserver:6443" , skip_tls = False , ca_cert_path = "path/to/cert"
157
+ )
158
+ assert token_auth .login () == (
159
+ "Logged into testserver:6443"
168
160
)
169
161
162
+ def test_load_kube_config (mocker ):
163
+ kube_config_auth = KubeConfigFileAuthentication ()
164
+ kube_config_auth .kube_config_path = '/path/to/your/config'
165
+ mocker .patch .object (config , 'load_kube_config' )
170
166
171
- def test_passwd_auth_creation ():
172
- try :
173
- passwd_auth = PasswordUserAuthentication ()
174
- assert passwd_auth .username == None
175
- assert passwd_auth .password == None
176
-
177
- passwd_auth = PasswordUserAuthentication ("user" )
178
- assert passwd_auth .username == "user"
179
- assert passwd_auth .password == None
180
-
181
- passwd_auth = PasswordUserAuthentication ("user" , "passwd" )
182
- assert passwd_auth .username == "user"
183
- assert passwd_auth .password == "passwd"
184
-
185
- passwd_auth = PasswordUserAuthentication ("user" , password = "passwd" )
186
- assert passwd_auth .username == "user"
187
- assert passwd_auth .password == "passwd"
188
-
189
- passwd_auth = PasswordUserAuthentication (username = "user" , password = "passwd" )
190
- assert passwd_auth .username == "user"
191
- assert passwd_auth .password == "passwd"
192
-
193
- except Exception :
194
- assert 0 == 1
195
-
196
-
197
- def test_passwd_auth_login_logout (mocker ):
198
- mocker .patch ("openshift.invoke" , side_effect = arg_side_effect )
199
- mocker .patch ("openshift.login" , side_effect = arg_side_effect )
200
- mock_res = mocker .patch .object (openshift .Result , "out" )
201
- mock_res .side_effect = lambda : att_side_effect (fake_res )
202
-
203
- token_auth = PasswordUserAuthentication (username = "user" , password = "passwd" )
204
- assert token_auth .login () == ("user" , "passwd" )
205
- assert token_auth .logout () == ("logout" ,)
167
+ response = kube_config_auth .load_kube_config ()
168
+ assert response == "Loaded user config file at path %s" % kube_config_auth .kube_config_path
206
169
207
170
208
171
def test_auth_coverage ():
0 commit comments