@@ -65,19 +65,30 @@ def create_namespace(self):
65
65
return RuntimeError (e )
66
66
67
67
68
- def create_new_resource_flavor (self ):
69
- self .resource_flavor = f"test-resource-flavor-{ random_choice ()} "
70
- create_resource_flavor (self , self .resource_flavor )
68
+ def create_new_resource_flavor (self , num_flavors = 2 ):
69
+ self .resource_flavors = []
70
+ for i in range (num_flavors ):
71
+ default = i < 1
72
+ resource_flavor = f"test-resource-flavor-{ random_choice ()} "
73
+ create_resource_flavor (self , resource_flavor , default )
74
+ self .resource_flavors .append (resource_flavor )
71
75
72
76
73
- def create_new_cluster_queue (self ):
74
- self .cluster_queue = f"test-cluster-queue-{ random_choice ()} "
75
- create_cluster_queue (self , self .cluster_queue , self .resource_flavor )
77
+ def create_new_cluster_queue (self , num_queues = 2 ):
78
+ self .cluster_queues = []
79
+ for i in range (num_queues ):
80
+ cluster_queue_name = f"test-cluster-queue-{ random_choice ()} "
81
+ create_cluster_queue (self , cluster_queue_name , self .resource_flavors [i ])
82
+ self .cluster_queues .append (cluster_queue_name )
76
83
77
84
78
- def create_new_local_queue (self ):
79
- self .local_queue = f"test-local-queue-{ random_choice ()} "
80
- create_local_queue (self , self .cluster_queue , self .local_queue )
85
+ def create_new_local_queue (self , num_queues = 2 ):
86
+ self .local_queues = []
87
+ for i in range (num_queues ):
88
+ is_default = i == 0
89
+ local_queue_name = f"test-local-queue-{ random_choice ()} "
90
+ create_local_queue (self , self .cluster_queues [i ], local_queue_name , is_default )
91
+ self .local_queues .append (local_queue_name )
81
92
82
93
83
94
def create_namespace_with_name (self , namespace_name ):
@@ -132,7 +143,7 @@ def create_cluster_queue(self, cluster_queue, flavor):
132
143
{"name" : "memory" , "nominalQuota" : "36Gi" },
133
144
{"name" : "nvidia.com/gpu" , "nominalQuota" : 1 },
134
145
],
135
- }
146
+ },
136
147
],
137
148
}
138
149
],
@@ -161,11 +172,21 @@ def create_cluster_queue(self, cluster_queue, flavor):
161
172
self .cluster_queue = cluster_queue
162
173
163
174
164
- def create_resource_flavor (self , flavor ):
175
+ def create_resource_flavor (self , flavor , default = True ):
165
176
resource_flavor_json = {
166
177
"apiVersion" : "kueue.x-k8s.io/v1beta1" ,
167
178
"kind" : "ResourceFlavor" ,
168
179
"metadata" : {"name" : flavor },
180
+ "spec" : {
181
+ "nodeLabels" : {"worker-1" if default else "ingress-ready" : "true" },
182
+ "tolerations" : [
183
+ {
184
+ "key" : "node-role.kubernetes.io/control-plane" ,
185
+ "operator" : "Exists" ,
186
+ "effect" : "NoSchedule" ,
187
+ }
188
+ ],
189
+ },
169
190
}
170
191
171
192
try :
@@ -190,14 +211,14 @@ def create_resource_flavor(self, flavor):
190
211
self .resource_flavor = flavor
191
212
192
213
193
- def create_local_queue (self , cluster_queue , local_queue ):
214
+ def create_local_queue (self , cluster_queue , local_queue , is_default = True ):
194
215
local_queue_json = {
195
216
"apiVersion" : "kueue.x-k8s.io/v1beta1" ,
196
217
"kind" : "LocalQueue" ,
197
218
"metadata" : {
198
219
"namespace" : self .namespace ,
199
220
"name" : local_queue ,
200
- "annotations" : {"kueue.x-k8s.io/default-queue" : "true" },
221
+ "annotations" : {"kueue.x-k8s.io/default-queue" : str ( is_default ). lower () },
201
222
},
202
223
"spec" : {"clusterQueue" : cluster_queue },
203
224
}
@@ -235,25 +256,43 @@ def create_kueue_resources(self):
235
256
236
257
def delete_kueue_resources (self ):
237
258
# Delete if given cluster-queue exists
238
- try :
239
- self .custom_api .delete_cluster_custom_object (
240
- group = "kueue.x-k8s.io" ,
241
- plural = "clusterqueues" ,
242
- version = "v1beta1" ,
243
- name = self .cluster_queue ,
244
- )
245
- print (f"\n '{ self .cluster_queue } ' cluster-queue deleted" )
246
- except Exception as e :
247
- print (f"\n Error deleting cluster-queue '{ self .cluster_queue } ' : { e } " )
259
+ for cq in self .cluster_queues :
260
+ try :
261
+ self .custom_api .delete_cluster_custom_object (
262
+ group = "kueue.x-k8s.io" ,
263
+ plural = "clusterqueues" ,
264
+ version = "v1beta1" ,
265
+ name = cq ,
266
+ )
267
+ print (f"\n '{ cq } ' cluster-queue deleted" )
268
+ except Exception as e :
269
+ print (f"\n Error deleting cluster-queue '{ cq } ' : { e } " )
248
270
249
271
# Delete if given resource-flavor exists
272
+ for flavor in self .resource_flavors :
273
+ try :
274
+ self .custom_api .delete_cluster_custom_object (
275
+ group = "kueue.x-k8s.io" ,
276
+ plural = "resourceflavors" ,
277
+ version = "v1beta1" ,
278
+ name = flavor ,
279
+ )
280
+ print (f"'{ flavor } ' resource-flavor deleted" )
281
+ except Exception as e :
282
+ print (f"\n Error deleting resource-flavor '{ flavor } ': { e } " )
283
+
284
+
285
+ def get_pod_node (self , namespace , name ):
286
+ label_selector = f"ray.io/cluster={ name } "
250
287
try :
251
- self .custom_api .delete_cluster_custom_object (
252
- group = "kueue.x-k8s.io" ,
253
- plural = "resourceflavors" ,
254
- version = "v1beta1" ,
255
- name = self .resource_flavor ,
288
+ pods = self .api_instance .list_namespaced_pod (
289
+ namespace , label_selector = label_selector
256
290
)
257
- print (f"'{ self .resource_flavor } ' resource-flavor deleted" )
291
+ if not pods .items :
292
+ raise ValueError (
293
+ f"No pods found with label 'ray.io/cluster={ name } ' in namespace '{ namespace } '"
294
+ )
295
+ pod = pods .items [0 ]
296
+ return pod .spec .node_name
258
297
except Exception as e :
259
- print (f"\n Error deleting resource-flavor ' { self . resource_flavor } ' : { e } " )
298
+ print (f"\n Error retrieving pod node : { e } " )
0 commit comments