Skip to content

Commit bd4b9be

Browse files
committed
Patch kubernetes Api Client to work around lock contention issues
Unclear whether the benefits are worth the risk of breaking changes here, but this is a workaround for the signifcant lock contention issues with the k8s client surfaced in #23933 (reply in thread). This appears to be a known issue with the k8s python client - see kubernetes-client/python#2284. Test Plan: BK
1 parent 94e3ce0 commit bd4b9be

File tree

4 files changed

+92
-7
lines changed

4 files changed

+92
-7
lines changed

python_modules/libraries/dagster-k8s/dagster_k8s/client.py

+41-2
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66

77
import kubernetes.client
88
import kubernetes.client.rest
9+
import six
910
from dagster import (
1011
DagsterInstance,
1112
_check as check,
1213
)
1314
from dagster._core.storage.dagster_run import DagsterRunStatus
15+
from kubernetes.client.api_client import ApiClient
1416
from kubernetes.client.models import V1Job, V1JobStatus
1517

1618
try:
@@ -91,6 +93,39 @@ class DagsterK8sJobStatusException(Exception):
9193
]
9294

9395

96+
class PatchedApiClient(ApiClient):
97+
# Forked from ApiClient implementation to pass configuration object down into created model
98+
# objects, avoiding lock contention issues. See https://github.com/kubernetes-client/python/issues/2284
99+
def __deserialize_model(self, data, klass):
100+
"""Deserializes list or dict to model.
101+
102+
:param data: dict, list.
103+
:param klass: class literal.
104+
:return: model object.
105+
"""
106+
if not klass.openapi_types and not hasattr(klass, "get_real_child_model"):
107+
return data
108+
109+
# Below is the only change from the base ApiClient implementation - pass through the
110+
# Configuration object to each newly created model so that each one does not have to create
111+
# one and acquire a lock
112+
kwargs = {"local_vars_configuration": self.configuration}
113+
114+
if data is not None and klass.openapi_types is not None and isinstance(data, (list, dict)):
115+
for attr, attr_type in six.iteritems(klass.openapi_types):
116+
if klass.attribute_map[attr] in data:
117+
value = data[klass.attribute_map[attr]]
118+
kwargs[attr] = self.__deserialize(value, attr_type)
119+
120+
instance = klass(**kwargs)
121+
122+
if hasattr(instance, "get_real_child_model"):
123+
klass_name = instance.get_real_child_model(data)
124+
if klass_name:
125+
instance = self.__deserialize(data, klass_name)
126+
return instance
127+
128+
94129
def k8s_api_retry(
95130
fn: Callable[..., T],
96131
max_retries: int,
@@ -209,8 +244,12 @@ def __init__(self, batch_api, core_api, logger, sleeper, timer):
209244
@staticmethod
210245
def production_client(batch_api_override=None, core_api_override=None):
211246
return DagsterKubernetesClient(
212-
batch_api=batch_api_override or kubernetes.client.BatchV1Api(),
213-
core_api=core_api_override or kubernetes.client.CoreV1Api(),
247+
batch_api=(
248+
batch_api_override or kubernetes.client.BatchV1Api(api_client=PatchedApiClient())
249+
),
250+
core_api=(
251+
core_api_override or kubernetes.client.CoreV1Api(api_client=PatchedApiClient())
252+
),
214253
logger=logging.info,
215254
sleeper=time.sleep,
216255
timer=time.time,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
KUBERNETES_VERSION_UPPER_BOUND = "32"
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,42 @@
1+
import packaging.version
2+
import requests
13
from dagster_k8s.version import __version__
24

35

46
def test_version():
57
assert __version__
8+
9+
10+
from dagster_k8s.kubernetes_version import KUBERNETES_VERSION_UPPER_BOUND
11+
12+
13+
def parse_package_version(version_str: str) -> packaging.version.Version:
14+
parsed_version = packaging.version.parse(version_str)
15+
assert isinstance(
16+
parsed_version, packaging.version.Version
17+
), f"Found LegacyVersion: {version_str}"
18+
return parsed_version
19+
20+
21+
def _get_latest_published_k8s_version() -> packaging.version.Version:
22+
res = requests.get("https://pypi.org/pypi/kubernetes/json")
23+
module_json = res.json()
24+
releases = module_json["releases"]
25+
release_versions = [
26+
parse_package_version(version)
27+
for version, files in releases.items()
28+
if not any(file.get("yanked") for file in files)
29+
]
30+
for release_version in reversed(sorted(release_versions)):
31+
if not release_version.is_prerelease:
32+
return release_version
33+
34+
raise Exception("Could not find any latest published kubernetes version")
35+
36+
37+
def test_latest_version_pin():
38+
latest_version = _get_latest_published_k8s_version()
39+
assert latest_version.major < packaging.version.parse(KUBERNETES_VERSION_UPPER_BOUND).major, (
40+
f"A new version {latest_version} of kubernetes has been released to pypi that exceeds our pin. "
41+
"Increase the pinned version in kubernetes_version.py and verify that it still passes tests."
42+
)

python_modules/libraries/dagster-k8s/setup.py

+13-5
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
11
from pathlib import Path
2-
from typing import Dict
2+
from typing import Dict, Tuple
33

44
from setuptools import find_packages, setup
55

66

7-
def get_version() -> str:
7+
def get_version() -> Tuple[str, str]:
88
version: Dict[str, str] = {}
9+
kubernetes_version: Dict[str, str] = {}
10+
911
with open(Path(__file__).parent / "dagster_k8s/version.py", encoding="utf8") as fp:
1012
exec(fp.read(), version)
1113

12-
return version["__version__"]
14+
with open(Path(__file__).parent / "dagster_k8s/kubernetes_version.py", encoding="utf8") as fp:
15+
exec(fp.read(), kubernetes_version)
16+
17+
return version["__version__"], kubernetes_version["KUBERNETES_VERSION_UPPER_BOUND"]
1318

1419

15-
ver = get_version()
20+
(
21+
ver,
22+
KUBERNETES_VERSION_UPPER_BOUND,
23+
) = get_version()
1624
# dont pin dev installs to avoid pip dep resolver issues
1725
pin = "" if ver == "1!0+dev" else f"=={ver}"
1826
setup(
@@ -37,7 +45,7 @@ def get_version() -> str:
3745
python_requires=">=3.8,<3.13",
3846
install_requires=[
3947
f"dagster{pin}",
40-
"kubernetes",
48+
f"kubernetes<{KUBERNETES_VERSION_UPPER_BOUND}",
4149
# exclude a google-auth release that added an overly restrictive urllib3 pin that confuses dependency resolvers
4250
"google-auth!=2.23.1",
4351
],

0 commit comments

Comments
 (0)