From fa0e0d36f9b970acfb551891ce091d8c2f79da2d Mon Sep 17 00:00:00 2001 From: Eoin Gallinagh Date: Thu, 4 Apr 2024 14:15:17 +0100 Subject: [PATCH 1/2] remove: DDPJobDefinition and update tests --- src/codeflare_sdk/__init__.py | 2 +- src/codeflare_sdk/job/__init__.py | 2 - src/codeflare_sdk/job/jobs.py | 207 ----------- tests/e2e/mnist_raycluster_sdk_oauth_test.py | 42 ++- tests/e2e/mnist_raycluster_sdk_test.py | 40 ++- tests/e2e/mnist_rayjob.py | 33 +- tests/unit_test.py | 351 ------------------- tests/unit_test_support.py | 34 -- 8 files changed, 67 insertions(+), 644 deletions(-) delete mode 100644 src/codeflare_sdk/job/jobs.py diff --git a/src/codeflare_sdk/__init__.py b/src/codeflare_sdk/__init__.py index 86b6da880..28a9c4db4 100644 --- a/src/codeflare_sdk/__init__.py +++ b/src/codeflare_sdk/__init__.py @@ -16,6 +16,6 @@ list_all_clusters, ) -from .job import JobDefinition, Job, DDPJobDefinition, DDPJob, RayJobClient +from .job import RayJobClient from .utils import generate_cert diff --git a/src/codeflare_sdk/job/__init__.py b/src/codeflare_sdk/job/__init__.py index c5b5819a8..f230eb778 100644 --- a/src/codeflare_sdk/job/__init__.py +++ b/src/codeflare_sdk/job/__init__.py @@ -1,3 +1 @@ -from .jobs import JobDefinition, Job, DDPJobDefinition, DDPJob - from .ray_jobs import RayJobClient diff --git a/src/codeflare_sdk/job/jobs.py b/src/codeflare_sdk/job/jobs.py deleted file mode 100644 index 655107df4..000000000 --- a/src/codeflare_sdk/job/jobs.py +++ /dev/null @@ -1,207 +0,0 @@ -# Copyright 2023 IBM, Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -The jobs sub-module contains methods needed to submit Distributed Data Parallel(DDP) jobs to Ray Clusters created by the CodeFlare SDK. -""" - -import abc -from typing import TYPE_CHECKING, Optional, Dict, List -from pathlib import Path - -from torchx.components.dist import ddp -from torchx.runner import get_runner, Runner -from torchx.schedulers.ray_scheduler import RayScheduler -from torchx.specs import AppHandle, parse_app_handle, AppDryRunInfo - - -if TYPE_CHECKING: - from ..cluster.cluster import Cluster -from ..cluster.cluster import get_current_namespace - -all_jobs: List["Job"] = [] - - -class JobDefinition(metaclass=abc.ABCMeta): - def _dry_run(self, cluster: "Cluster"): - pass - - def submit(self, cluster: "Cluster"): - pass - - -class Job(metaclass=abc.ABCMeta): - def status(self): - pass - - def logs(self): - pass - - -class DDPJobDefinition(JobDefinition): - def __init__( - self, - script: Optional[str] = None, - m: Optional[str] = None, - script_args: Optional[List[str]] = None, - name: Optional[str] = None, - cpu: Optional[int] = None, - gpu: Optional[int] = None, - memMB: Optional[int] = None, - h: Optional[str] = None, - j: Optional[str] = None, - env: Optional[Dict[str, str]] = None, - max_retries: int = 0, - mounts: Optional[List[str]] = None, - rdzv_port: int = 29500, - rdzv_backend: str = None, - scheduler_args: Optional[Dict[str, str]] = None, - image: Optional[str] = None, - workspace: Optional[str] = f"file://{Path.cwd()}", - ): - if bool(script) == bool(m): # logical XOR - raise ValueError( - "Exactly one of the following arguments must be defined: [script, m]." - ) - self.script = script - self.m = m - self.script_args: List[str] = script_args if script_args is not None else [] - self.name = name - self.cpu = cpu - self.gpu = gpu - self.memMB = memMB - self.h = h - self.j = j - self.env: Dict[str, str] = env if env is not None else dict() - self.max_retries = max_retries - self.mounts: List[str] = mounts if mounts is not None else [] - self.rdzv_port = rdzv_port - self.rdzv_backend = rdzv_backend - self.scheduler_args: Dict[str, str] = ( - scheduler_args if scheduler_args is not None else dict() - ) - self.image = image - self.workspace = workspace - - def _dry_run(self, cluster: "Cluster"): - j = f"{cluster.config.num_workers}x{max(cluster.config.num_gpus, 1)}" # # of proc. = # of gpus - runner = get_runner(ray_client=cluster.job_client) - runner._scheduler_instances["ray"] = RayScheduler( - session_name=runner._name, ray_client=cluster.job_client - ) - return ( - runner.dryrun( - app=ddp( - *self.script_args, - script=self.script, - m=self.m, - name=self.name, - h=self.h, - cpu=self.cpu if self.cpu is not None else cluster.config.max_cpus, - gpu=self.gpu if self.gpu is not None else cluster.config.num_gpus, - memMB=self.memMB - if self.memMB is not None - else cluster.config.max_memory * 1024, - j=self.j if self.j is not None else j, - env=self.env, - max_retries=self.max_retries, - rdzv_port=self.rdzv_port, - rdzv_backend=self.rdzv_backend - if self.rdzv_backend is not None - else "static", - mounts=self.mounts, - ), - scheduler=cluster.torchx_scheduler, - cfg=cluster.torchx_config(**self.scheduler_args), - workspace=self.workspace, - ), - runner, - ) - - def _missing_spec(self, spec: str): - raise ValueError(f"Job definition missing arg: {spec}") - - def _dry_run_no_cluster(self): - if self.scheduler_args is not None: - if self.scheduler_args.get("namespace") is None: - self.scheduler_args["namespace"] = get_current_namespace() - runner = get_runner() - return ( - runner.dryrun( - app=ddp( - *self.script_args, - script=self.script, - m=self.m, - name=self.name - if self.name is not None - else self._missing_spec("name"), - h=self.h, - cpu=self.cpu - if self.cpu is not None - else self._missing_spec("cpu (# cpus per worker)"), - gpu=self.gpu - if self.gpu is not None - else self._missing_spec("gpu (# gpus per worker)"), - memMB=self.memMB - if self.memMB is not None - else self._missing_spec("memMB (memory in MB)"), - j=self.j - if self.j is not None - else self._missing_spec( - "j (`workers`x`procs`)" - ), # # of proc. = # of gpus, - env=self.env, # should this still exist? - max_retries=self.max_retries, - rdzv_port=self.rdzv_port, # should this still exist? - rdzv_backend=self.rdzv_backend - if self.rdzv_backend is not None - else "c10d", - mounts=self.mounts, - image=self.image - if self.image is not None - else self._missing_spec("image"), - ), - scheduler="kubernetes_mcad", - cfg=self.scheduler_args, - workspace="", - ), - runner, - ) - - def submit(self, cluster: "Cluster" = None) -> "Job": - return DDPJob(self, cluster) - - -class DDPJob(Job): - def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster" = None): - self.job_definition = job_definition - self.cluster = cluster - if self.cluster: - definition, runner = job_definition._dry_run(cluster) - self._app_handle = runner.schedule(definition) - self._runner = runner - else: - definition, runner = job_definition._dry_run_no_cluster() - self._app_handle = runner.schedule(definition) - self._runner = runner - all_jobs.append(self) - - def status(self) -> str: - return self._runner.status(self._app_handle) - - def logs(self) -> str: - return "".join(self._runner.log_lines(self._app_handle, None)) - - def cancel(self): - self._runner.cancel(self._app_handle) diff --git a/tests/e2e/mnist_raycluster_sdk_oauth_test.py b/tests/e2e/mnist_raycluster_sdk_oauth_test.py index 90bec08df..1ffff8ec4 100644 --- a/tests/e2e/mnist_raycluster_sdk_oauth_test.py +++ b/tests/e2e/mnist_raycluster_sdk_oauth_test.py @@ -2,10 +2,8 @@ from time import sleep -from torchx.specs.api import AppState, is_terminal - from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication -from codeflare_sdk.job.jobs import DDPJobDefinition +from codeflare_sdk.job import RayJobClient import pytest @@ -79,7 +77,7 @@ def assert_jobsubmit_withoutLogin(self, cluster): "entrypoint": "python mnist.py", "runtime_env": { "working_dir": "./tests/e2e/", - "pip": "mnist_pip_requirements.txt", + "pip": "./tests/e2e/mnist_pip_requirements.txt", }, } try: @@ -98,19 +96,26 @@ def assert_jobsubmit_withoutLogin(self, cluster): def assert_jobsubmit_withlogin(self, cluster): self.assert_appwrapper_exists() - jobdef = DDPJobDefinition( - name="mnist", - script="./tests/e2e/mnist.py", - scheduler_args={"requirements": "./tests/e2e/mnist_pip_requirements.txt"}, + auth_token = run_oc_command(["whoami", "--show-token=true"]) + ray_dashboard = cluster.cluster_dashboard_uri() + header = {"Authorization": f"Bearer {auth_token}"} + client = RayJobClient(address=ray_dashboard, headers=header, verify=True) + + # Submit the job + submission_id = client.submit_job( + entrypoint="python mnist.py", + runtime_env={ + "working_dir": "./tests/e2e/", + "pip": "mnist_pip_requirements.txt", + }, ) - job = jobdef.submit(cluster) - + print(f"Submitted job with ID: {submission_id}") done = False time = 0 timeout = 900 while not done: - status = job.status() - if is_terminal(status.state): + status = client.get_job_status(submission_id) + if status.is_terminal(): break if not done: print(status) @@ -119,11 +124,12 @@ def assert_jobsubmit_withlogin(self, cluster): sleep(5) time += 5 - print(job.status()) - self.assert_job_completion(status) + logs = client.get_job_logs(submission_id) + print(logs) - print(job.logs()) + self.assert_job_completion(status) + client.delete_job(submission_id) cluster.down() def assert_appwrapper_exists(self): @@ -144,9 +150,9 @@ def assert_appwrapper_exists(self): assert False def assert_job_completion(self, status): - if status.state == AppState.SUCCEEDED: - print(f"Job has completed: '{status.state}'") + if status == "SUCCEEDED": + print(f"Job has completed: '{status}'") assert True else: - print(f"Job has completed: '{status.state}'") + print(f"Job has completed: '{status}'") assert False diff --git a/tests/e2e/mnist_raycluster_sdk_test.py b/tests/e2e/mnist_raycluster_sdk_test.py index a38cb48df..348a02cc2 100644 --- a/tests/e2e/mnist_raycluster_sdk_test.py +++ b/tests/e2e/mnist_raycluster_sdk_test.py @@ -7,10 +7,8 @@ import ray -from torchx.specs.api import AppState, is_terminal - from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration -from codeflare_sdk.job.jobs import DDPJobDefinition +from codeflare_sdk.job import RayJobClient import pytest @@ -68,19 +66,26 @@ def run_mnist_raycluster_sdk(self): cluster.details() - jobdef = DDPJobDefinition( - name="mnist", - script="./tests/e2e/mnist.py", - scheduler_args={"requirements": "./tests/e2e/mnist_pip_requirements.txt"}, + auth_token = run_oc_command(["whoami", "--show-token=true"]) + ray_dashboard = cluster.cluster_dashboard_uri() + header = {"Authorization": f"Bearer {auth_token}"} + client = RayJobClient(address=ray_dashboard, headers=header, verify=True) + + # Submit the job + submission_id = client.submit_job( + entrypoint="python mnist.py", + runtime_env={ + "working_dir": "./tests/e2e/", + "pip": "./tests/e2e/mnist_pip_requirements.txt", + }, ) - job = jobdef.submit(cluster) - + print(f"Submitted job with ID: {submission_id}") done = False time = 0 timeout = 900 while not done: - status = job.status() - if is_terminal(status.state): + status = client.get_job_status(submission_id) + if status.is_terminal(): break if not done: print(status) @@ -89,11 +94,12 @@ def run_mnist_raycluster_sdk(self): sleep(5) time += 5 - print(job.status()) - self.assert_job_completion(status) + logs = client.get_job_logs(submission_id) + print(logs) - print(job.logs()) + self.assert_job_completion(status) + client.delete_job(submission_id) cluster.down() # Assertions @@ -128,9 +134,9 @@ def assert_raycluster_exists(self): assert False def assert_job_completion(self, status): - if status.state == AppState.SUCCEEDED: - print(f"Job has completed: '{status.state}'") + if status == "SUCCEEDED": + print(f"Job has completed: '{status}'") assert True else: - print(f"Job has completed: '{status.state}'") + print(f"Job has completed: '{status}'") assert False diff --git a/tests/e2e/mnist_rayjob.py b/tests/e2e/mnist_rayjob.py index 8557a55c1..c9306da69 100644 --- a/tests/e2e/mnist_rayjob.py +++ b/tests/e2e/mnist_rayjob.py @@ -2,10 +2,10 @@ from time import sleep -from torchx.specs.api import AppState, is_terminal +from support import * from codeflare_sdk.cluster.cluster import get_cluster -from codeflare_sdk.job.jobs import DDPJobDefinition +from codeflare_sdk.job import RayJobClient namespace = sys.argv[1] @@ -13,19 +13,23 @@ cluster.details() -jobdef = DDPJobDefinition( - name="mnist", - script="mnist.py", - scheduler_args={"requirements": "requirements.txt"}, -) -job = jobdef.submit(cluster) +auth_token = run_oc_command(["whoami", "--show-token=true"]) +ray_dashboard = cluster.cluster_dashboard_uri() +header = {"Authorization": f"Bearer {auth_token}"} +client = RayJobClient(address=ray_dashboard, headers=header, verify=True) +# Submit the job +submission_id = client.submit_job( + entrypoint="python mnist.py", + runtime_env={"working_dir": "/", "pip": "requirements.txt"}, +) +print(f"Submitted job with ID: {submission_id}") done = False time = 0 timeout = 900 while not done: - status = job.status() - if is_terminal(status.state): + status = client.get_job_status(submission_id) + if status.is_terminal(): break if not done: print(status) @@ -34,13 +38,14 @@ sleep(5) time += 5 -print(f"Job has completed: {status.state}") - -print(job.logs()) +logs = client.get_job_logs(submission_id) +print(logs) +client.delete_job(submission_id) cluster.down() -if not status.state == AppState.SUCCEEDED: + +if not status == "SUCCEEDED": exit(1) else: exit(0) diff --git a/tests/unit_test.py b/tests/unit_test.py index b25d3dd0c..ce6f28573 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -59,12 +59,6 @@ RayClusterStatus, CodeFlareClusterStatus, ) -from codeflare_sdk.job.jobs import ( - JobDefinition, - Job, - DDPJobDefinition, - DDPJob, -) from codeflare_sdk.utils.generate_cert import ( generate_ca_cert, generate_tls_cert, @@ -73,10 +67,7 @@ from unit_test_support import ( createClusterWithConfig, - createTestDDP, - createDDPJob_no_cluster, createClusterConfig, - createDDPJob_with_cluster, ) import codeflare_sdk.utils.kube_api_helpers @@ -2863,352 +2854,10 @@ def test_wait_ready(mocker, capsys): ) -def test_jobdefinition_coverage(mocker): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", - return_value={"spec": {"domain": ""}}, - ) - abstract = JobDefinition() - cluster = createClusterWithConfig(mocker) - abstract._dry_run(cluster) - abstract.submit(cluster) - - -def test_job_coverage(): - abstract = Job() - abstract.status() - abstract.logs() - - -def test_DDPJobDefinition_creation(mocker): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - ddp = createTestDDP() - assert ddp.script == "test.py" - assert ddp.m == None - assert ddp.script_args == ["test"] - assert ddp.name == "test" - assert ddp.cpu == 1 - assert ddp.gpu == 0 - assert ddp.memMB == 1024 - assert ddp.h == None - assert ddp.j == "2x1" - assert ddp.env == {"test": "test"} - assert ddp.max_retries == 0 - assert ddp.mounts == [] - assert ddp.rdzv_port == 29500 - assert ddp.scheduler_args == {"requirements": "test"} - - -def test_DDPJobDefinition_dry_run(mocker: MockerFixture): - """ - Test that the dry run method returns the correct type: AppDryRunInfo, - that the attributes of the returned object are of the correct type, - and that the values from cluster and job definition are correctly passed. - """ - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", - return_value="", - ) - mocker.patch.object(Cluster, "job_client") - ddp = createTestDDP() - cluster = createClusterWithConfig(mocker) - ddp_job, _ = ddp._dry_run(cluster) - assert type(ddp_job) == AppDryRunInfo - assert ddp_job._fmt is not None - assert type(ddp_job.request) == RayJob - assert type(ddp_job._app) == AppDef - assert type(ddp_job._cfg) == type(dict()) - assert type(ddp_job._scheduler) == type(str()) - - assert ddp_job.request.app_id.startswith("test") - assert ddp_job.request.cluster_name == "unit-test-cluster" - assert ddp_job.request.requirements == "test" - - assert ddp_job._app.roles[0].resource.cpu == 1 - assert ddp_job._app.roles[0].resource.gpu == 0 - assert ddp_job._app.roles[0].resource.memMB == 1024 - - assert ddp_job._cfg["cluster_name"] == "unit-test-cluster" - assert ddp_job._cfg["requirements"] == "test" - - assert ddp_job._scheduler == "ray" - - -def test_DDPJobDefinition_dry_run_no_cluster(mocker): - """ - Test that the dry run method returns the correct type: AppDryRunInfo, - that the attributes of the returned object are of the correct type, - and that the values from cluster and job definition are correctly passed. - """ - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch( - "codeflare_sdk.job.jobs.get_current_namespace", - return_value="opendatahub", - ) - - ddp = createTestDDP() - ddp.image = "fake-image" - ddp_job, _ = ddp._dry_run_no_cluster() - assert type(ddp_job) == AppDryRunInfo - assert ddp_job._fmt is not None - assert type(ddp_job.request) == KubernetesMCADJob - assert type(ddp_job._app) == AppDef - assert type(ddp_job._cfg) == type(dict()) - assert type(ddp_job._scheduler) == type(str()) - - assert ( - ddp_job.request.resource["spec"]["resources"]["GenericItems"][0][ - "generictemplate" - ] - .spec.containers[0] - .image - == "fake-image" - ) - - assert ddp_job._app.roles[0].resource.cpu == 1 - assert ddp_job._app.roles[0].resource.gpu == 0 - assert ddp_job._app.roles[0].resource.memMB == 1024 - - assert ddp_job._cfg["requirements"] == "test" - - assert ddp_job._scheduler == "kubernetes_mcad" - - -def test_DDPJobDefinition_dry_run_no_resource_args(mocker): - """ - Test that the dry run correctly gets resources from the cluster object - when the job definition does not specify resources. - """ - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch.object(Cluster, "job_client") - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", - return_value={"spec": {"domain": ""}}, - ) - mocker.patch( - "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", - return_value="", - ) - cluster = createClusterWithConfig(mocker) - ddp = DDPJobDefinition( - script="test.py", - m=None, - script_args=["test"], - name="test", - h=None, - env={"test": "test"}, - max_retries=0, - mounts=[], - rdzv_port=29500, - scheduler_args={"requirements": "test"}, - ) - ddp_job, _ = ddp._dry_run(cluster) - - assert ddp_job._app.roles[0].resource.cpu == cluster.config.max_cpus - assert ddp_job._app.roles[0].resource.gpu == cluster.config.num_gpus - assert ddp_job._app.roles[0].resource.memMB == cluster.config.max_memory * 1024 - assert ( - parse_j(ddp_job._app.roles[0].args[1]) - == f"{cluster.config.num_workers}x{cluster.config.num_gpus}" - ) - - -def test_DDPJobDefinition_dry_run_no_cluster_no_resource_args(mocker): - """ - Test that the dry run method returns the correct type: AppDryRunInfo, - that the attributes of the returned object are of the correct type, - and that the values from cluster and job definition are correctly passed. - """ - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - - mocker.patch( - "codeflare_sdk.job.jobs.get_current_namespace", - return_value="opendatahub", - ) - - ddp = createTestDDP() - try: - ddp._dry_run_no_cluster() - assert 0 == 1 - except ValueError as e: - assert str(e) == "Job definition missing arg: image" - ddp.image = "fake-image" - ddp.name = None - try: - ddp._dry_run_no_cluster() - assert 0 == 1 - except ValueError as e: - assert str(e) == "Job definition missing arg: name" - ddp.name = "fake" - ddp.cpu = None - try: - ddp._dry_run_no_cluster() - assert 0 == 1 - except ValueError as e: - assert str(e) == "Job definition missing arg: cpu (# cpus per worker)" - ddp.cpu = 1 - ddp.gpu = None - try: - ddp._dry_run_no_cluster() - assert 0 == 1 - except ValueError as e: - assert str(e) == "Job definition missing arg: gpu (# gpus per worker)" - ddp.gpu = 1 - ddp.memMB = None - try: - ddp._dry_run_no_cluster() - assert 0 == 1 - except ValueError as e: - assert str(e) == "Job definition missing arg: memMB (memory in MB)" - ddp.memMB = 1 - ddp.j = None - try: - ddp._dry_run_no_cluster() - assert 0 == 1 - except ValueError as e: - assert str(e) == "Job definition missing arg: j (`workers`x`procs`)" - - -def test_DDPJobDefinition_submit(mocker: MockerFixture): - """ - Tests that the submit method returns the correct type: DDPJob - And that the attributes of the returned object are of the correct type - """ - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mock_schedule = MagicMock() - mocker.patch.object(Runner, "schedule", mock_schedule) - mock_schedule.return_value = "fake-dashboard-url" - mocker.patch.object(Cluster, "job_client") - ddp_def = createTestDDP() - cluster = createClusterWithConfig(mocker) - mocker.patch( - "codeflare_sdk.job.jobs.get_current_namespace", - side_effect="opendatahub", - ) - mocker.patch.object( - Cluster, "cluster_dashboard_uri", return_value="fake-dashboard-url" - ) - ddp_job = ddp_def.submit(cluster) - assert type(ddp_job) == DDPJob - assert type(ddp_job.job_definition) == DDPJobDefinition - assert type(ddp_job.cluster) == Cluster - assert type(ddp_job._app_handle) == str - assert ddp_job._app_handle == "fake-dashboard-url" - - ddp_def.image = "fake-image" - ddp_job = ddp_def.submit() - assert type(ddp_job) == DDPJob - assert type(ddp_job.job_definition) == DDPJobDefinition - assert ddp_job.cluster == None - assert type(ddp_job._app_handle) == str - assert ddp_job._app_handle == "fake-dashboard-url" - - -def test_DDPJob_creation(mocker: MockerFixture): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch.object(Cluster, "job_client") - mock_schedule = MagicMock() - mocker.patch.object(Runner, "schedule", mock_schedule) - mocker.patch.object( - Cluster, "cluster_dashboard_uri", return_value="fake-dashboard-url" - ) - ddp_def = createTestDDP() - cluster = createClusterWithConfig(mocker) - mock_schedule.return_value = "fake-dashboard-url" - ddp_job = createDDPJob_with_cluster(mocker, ddp_def, cluster) - assert type(ddp_job) == DDPJob - assert type(ddp_job.job_definition) == DDPJobDefinition - assert type(ddp_job.cluster) == Cluster - assert type(ddp_job._app_handle) == str - assert ddp_job._app_handle == "fake-dashboard-url" - _, args, kwargs = mock_schedule.mock_calls[0] - assert type(args[0]) == AppDryRunInfo - job_info = args[0] - assert type(job_info.request) == RayJob - assert type(job_info._app) == AppDef - assert type(job_info._cfg) == type(dict()) - assert type(job_info._scheduler) == type(str()) - - -def test_DDPJob_creation_no_cluster(mocker: MockerFixture): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - ddp_def = createTestDDP() - ddp_def.image = "fake-image" - mocker.patch( - "codeflare_sdk.job.jobs.get_current_namespace", - side_effect="opendatahub", - ) - mock_schedule = MagicMock() - mocker.patch.object(Runner, "schedule", mock_schedule) - mock_schedule.return_value = "fake-app-handle" - ddp_job = createDDPJob_no_cluster(ddp_def, None) - assert type(ddp_job) == DDPJob - assert type(ddp_job.job_definition) == DDPJobDefinition - assert ddp_job.cluster == None - assert type(ddp_job._app_handle) == str - assert ddp_job._app_handle == "fake-app-handle" - _, args, kwargs = mock_schedule.mock_calls[0] - assert type(args[0]) == AppDryRunInfo - job_info = args[0] - assert type(job_info.request) == KubernetesMCADJob - assert type(job_info._app) == AppDef - assert type(job_info._cfg) == type(dict()) - assert type(job_info._scheduler) == type(str()) - - -def test_DDPJob_status(mocker: MockerFixture): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - # Setup the neccesary mock patches - mock_status = MagicMock() - mocker.patch.object(Runner, "status", mock_status) - test_DDPJob_creation(mocker) - ddp_def = createTestDDP() - cluster = createClusterWithConfig(mocker) - ddp_job = createDDPJob_with_cluster(mocker, ddp_def, cluster) - mock_status.return_value = "fake-status" - assert ddp_job.status() == "fake-status" - _, args, kwargs = mock_status.mock_calls[0] - assert args[0] == "fake-dashboard-url" - - -def test_DDPJob_logs(mocker: MockerFixture): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mock_log = MagicMock() - mocker.patch.object(Runner, "log_lines", mock_log) - # Setup the neccesary mock patches - test_DDPJob_creation(mocker) - ddp_def = createTestDDP() - cluster = createClusterWithConfig(mocker) - ddp_job = createDDPJob_with_cluster(mocker, ddp_def, cluster) - mock_log.return_value = "fake-logs" - assert ddp_job.logs() == "fake-logs" - _, args, kwargs = mock_log.mock_calls[0] - assert args[0] == "fake-dashboard-url" - - def arg_check_side_effect(*args): assert args[0] == "fake-app-handle" -def test_DDPJob_cancel(mocker: MockerFixture): - mock_cancel = MagicMock() - mocker.patch.object(Runner, "cancel", mock_cancel) - # Setup the neccesary mock patches - test_DDPJob_creation_no_cluster(mocker) - ddp_def = createTestDDP() - ddp_def.image = "fake-image" - ddp_job = createDDPJob_no_cluster(ddp_def, None) - mocker.patch( - "openshift.get_project_name", - return_value="opendatahub", - ) - mock_cancel.side_effect = arg_check_side_effect - ddp_job.cancel() - - def parse_j(cmd): pattern = r"--nnodes\s+\d+\s+--nproc_per_node\s+\d+" match = re.search(pattern, cmd) diff --git a/tests/unit_test_support.py b/tests/unit_test_support.py index 190c4f1a9..329df45ed 100644 --- a/tests/unit_test_support.py +++ b/tests/unit_test_support.py @@ -1,38 +1,9 @@ -from codeflare_sdk.job.jobs import ( - DDPJobDefinition, - DDPJob, -) - from codeflare_sdk.cluster.cluster import ( Cluster, ClusterConfiguration, ) -def createTestDDP(): - ddp = DDPJobDefinition( - script="test.py", - m=None, - script_args=["test"], - name="test", - cpu=1, - gpu=0, - memMB=1024, - h=None, - j="2x1", - env={"test": "test"}, - max_retries=0, - mounts=[], - rdzv_port=29500, - scheduler_args={"requirements": "test"}, - ) - return ddp - - -def createDDPJob_no_cluster(ddp_def, cluster): - return DDPJob(ddp_def, cluster) - - def createClusterConfig(): config = ClusterConfiguration( name="unit-test-cluster", @@ -61,8 +32,3 @@ def createClusterWithConfig(mocker): ) cluster = Cluster(createClusterConfig()) return cluster - - -def createDDPJob_with_cluster(mocker, ddp_def, cluster=None): - cluster = createClusterWithConfig(mocker) - return DDPJob(ddp_def, cluster) From e5b5a32f28b75e106d2da08bd5710a0d9388e88f Mon Sep 17 00:00:00 2001 From: Eoin Gallinagh Date: Fri, 5 Apr 2024 17:26:04 +0100 Subject: [PATCH 2/2] add: address comments --- src/codeflare_sdk/cluster/cluster.py | 18 ------------------ tests/unit_test.py | 5 ----- 2 files changed, 23 deletions(-) diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index be59c5c6c..81f7a7116 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -21,10 +21,8 @@ from time import sleep from typing import List, Optional, Tuple, Dict -import openshift as oc from kubernetes import config from ray.job_submission import JobSubmissionClient -import urllib3 from .auth import config_check, api_config_handler from ..utils import pretty_print @@ -58,8 +56,6 @@ class Cluster: Note that currently, the underlying implementation is a Ray cluster. """ - torchx_scheduler = "ray" - def __init__(self, config: ClusterConfiguration): """ Create the resource cluster object by passing in a ClusterConfiguration @@ -477,20 +473,6 @@ def job_logs(self, job_id: str) -> str: """ return self.job_client.get_job_logs(job_id) - def torchx_config( - self, working_dir: str = None, requirements: str = None - ) -> Dict[str, str]: - dashboard_address = urllib3.util.parse_url(self.cluster_dashboard_uri()).host - to_return = { - "cluster_name": self.config.name, - "dashboard_address": dashboard_address, - } - if working_dir: - to_return["working_dir"] = working_dir - if requirements: - to_return["requirements"] = requirements - return to_return - def from_k8_cluster_object( rc, mcad=True, diff --git a/tests/unit_test.py b/tests/unit_test.py index ce6f28573..935cdd100 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO: replace all instances of torchx_runner from pathlib import Path import sys @@ -81,10 +80,6 @@ import openshift from openshift.selector import Selector import ray -from torchx.specs import AppDryRunInfo, AppDef -from torchx.runner import get_runner, Runner -from torchx.schedulers.ray_scheduler import RayJob -from torchx.schedulers.kubernetes_mcad_scheduler import KubernetesMCADJob import pytest import yaml from unittest.mock import MagicMock