forked from project-codeflare/codeflare-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmnist_raycluster_sdk_kind_test.py
116 lines (91 loc) · 3.45 KB
/
mnist_raycluster_sdk_kind_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import requests
from time import sleep
from codeflare_sdk import Cluster, ClusterConfiguration
from codeflare_sdk.ray.client import RayJobClient
import pytest
from support import *
# This test creates a Ray Cluster and covers the Ray Job submission functionality on Kind Cluster
@pytest.mark.kind
class TestRayClusterSDKKind:
def setup_method(self):
initialize_kubernetes_client(self)
def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)
def test_mnist_ray_cluster_sdk_kind(self):
self.setup_method()
create_namespace(self)
create_kueue_resources(self)
self.run_mnist_raycluster_sdk_kind(accelerator="cpu")
@pytest.mark.nvidia_gpu
def test_mnist_ray_cluster_sdk_kind_nvidia_gpu(self):
self.setup_method()
create_namespace(self)
create_kueue_resources(self)
self.run_mnist_raycluster_sdk_kind(accelerator="gpu", number_of_gpus=1)
def run_mnist_raycluster_sdk_kind(
self, accelerator, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
):
cluster = Cluster(
ClusterConfiguration(
name="mnist",
namespace=self.namespace,
num_workers=1,
head_cpu_requests="500m",
head_cpu_limits="500m",
worker_cpu_requests="500m",
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
worker_extended_resource_requests={gpu_resource_name: number_of_gpus},
write_to_file=True,
verify_tls=False,
)
)
cluster.up()
cluster.status()
cluster.wait_ready()
cluster.status()
cluster.details()
self.assert_jobsubmit_withoutlogin_kind(cluster, accelerator, number_of_gpus)
assert_get_cluster_and_jobsubmit(
self, "mnist", accelerator="gpu", number_of_gpus=1
)
# Assertions
def assert_jobsubmit_withoutlogin_kind(self, cluster, accelerator, number_of_gpus):
ray_dashboard = cluster.cluster_dashboard_uri()
client = RayJobClient(address=ray_dashboard, verify=False)
submission_id = client.submit_job(
entrypoint="python mnist.py",
runtime_env={
"working_dir": "./tests/e2e/",
"pip": "./tests/e2e/mnist_pip_requirements.txt",
"env_vars": get_setup_env_variables(ACCELERATOR=accelerator),
},
entrypoint_num_gpus=number_of_gpus,
)
print(f"Submitted job with ID: {submission_id}")
done = False
time = 0
timeout = 900
while not done:
status = client.get_job_status(submission_id)
if status.is_terminal():
break
if not done:
print(status)
if timeout and time >= timeout:
raise TimeoutError(f"job has timed out after waiting {timeout}s")
sleep(5)
time += 5
logs = client.get_job_logs(submission_id)
print(logs)
self.assert_job_completion(status)
client.delete_job(submission_id)
def assert_job_completion(self, status):
if status == "SUCCEEDED":
print(f"Job has completed: '{status}'")
assert True
else:
print(f"Job has completed: '{status}'")
assert False