Skip to content

[kubernetes] external resources support #2242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,11 @@ def _container_templates(self):
resources["disk"],
)

extended_resources = resources.get("extended_resources", {})

qos_requests = {**qos_requests, **extended_resources}
qos_limits = {**qos_limits, **extended_resources}

# Create a ContainerTemplate for this node. Ideally, we would have
# liked to inline this ContainerTemplate and avoid scanning the workflow
# twice, but due to issues with variable substitution, we will have to
Expand Down Expand Up @@ -1962,6 +1967,7 @@ def _container_templates(self):
shared_memory=shared_memory,
port=port,
qos=resources["qos"],
extended_resources=extended_resources,
)

for k, v in env.items():
Expand Down
4 changes: 4 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def create_jobset(
port=None,
num_parallel=None,
qos=None,
extended_resources=None,
):
name = "js-%s" % str(uuid4())[:6]
jobset = (
Expand Down Expand Up @@ -227,6 +228,7 @@ def create_jobset(
port=port,
num_parallel=num_parallel,
qos=qos,
extended_resources=extended_resources,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down Expand Up @@ -488,6 +490,7 @@ def create_job_object(
name_pattern=None,
qos=None,
annotations=None,
extended_resources=None,
):
if env is None:
env = {}
Expand Down Expand Up @@ -530,6 +533,7 @@ def create_job_object(
shared_memory=shared_memory,
port=port,
qos=qos,
extended_resources=extended_resources,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down
8 changes: 8 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ def kubernetes():
type=JSONTypeClass(),
multiple=False,
)
@click.option(
"--extended-resources",
default=None,
type=JSONTypeClass(),
multiple=False,
)
@click.pass_context
def step(
ctx,
Expand Down Expand Up @@ -176,6 +182,7 @@ def step(
qos=None,
labels=None,
annotations=None,
extended_resources=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None, **kwargs):
Expand Down Expand Up @@ -319,6 +326,7 @@ def _sync_metadata():
qos=qos,
labels=labels,
annotations=annotations,
extended_resources=extended_resources,
)
except Exception:
traceback.print_exc(chain=False)
Expand Down
5 changes: 5 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class KubernetesDecorator(StepDecorator):
Only applicable when @parallel is used.
qos: str, default: Burstable
Quality of Service class to assign to the pod. Supported values are: Guaranteed, Burstable, BestEffort
extended_resources: Dict[str, str], optional, default None
Extended resources to be requested for the pod.
https://kubernetes.io/docs/tasks/administer-cluster/extended-resource-node/
"""

name = "kubernetes"
Expand Down Expand Up @@ -151,6 +154,7 @@ class KubernetesDecorator(StepDecorator):
"executable": None,
"hostname_resolution_timeout": 10 * 60,
"qos": KUBERNETES_QOS,
"extended_resources": {},
}
package_url = None
package_sha = None
Expand Down Expand Up @@ -473,6 +477,7 @@ def runtime_step_cli(
"persistent_volume_claims",
"labels",
"annotations",
"extended_resources",
]:
cli_args.command_options[k] = json.dumps(v)
else:
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def create_job_spec(self):
self._kwargs["memory"],
self._kwargs["disk"],
)
extended_resources = self._kwargs.get("extended_resources", {})
qos_requests = {**qos_requests, **extended_resources}
qos_limits = {**qos_limits, **extended_resources}

return client.V1JobSpec(
# Retries are handled by Metaflow when it is responsible for
Expand Down
5 changes: 5 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_jobsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@ def dump(self):
self._kwargs["memory"],
self._kwargs["disk"],
)

extended_resources = self._kwargs.get("extended_resources", {})
qos_requests = {**qos_requests, **extended_resources}
qos_limits = {**qos_limits, **extended_resources}

return dict(
name=self.name,
template=client.api_client.ApiClient().sanitize_for_serialization(
Expand Down
Loading