Skip to content

Commit 818a8a0

Browse files
authored
feat: actualize ydb_configure for k8s + add missing features (#13536)
1 parent 1a6a6bc commit 818a8a0

File tree

11 files changed

+255
-237
lines changed

11 files changed

+255
-237
lines changed

ydb/tools/cfg/base.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,11 @@ def merge_with_default(dft, override):
6060

6161

6262
class KiKiMRDrive(object):
63-
def __init__(self, type, path, shared_with_os=False, expected_slot_count=None, kind=None):
63+
def __init__(self, type, path, shared_with_os=False, expected_slot_count=None, kind=None, pdisk_config=None):
6464
self.type = type
6565
self.path = path
6666
self.shared_with_os = shared_with_os
67+
self.pdisk_config = pdisk_config
6768
self.expected_slot_count = expected_slot_count
6869
self.kind = kind
6970

@@ -74,10 +75,11 @@ def __eq__(self, other):
7475
and self.shared_with_os == other.shared_with_os
7576
and self.expected_slot_count == other.expected_slot_count
7677
and self.kind == other.kind
78+
and self.pdisk_config == other.pdisk_config
7779
)
7880

7981
def __hash__(self):
80-
return hash("\0".join(map(str, (self.type, self.path, self.shared_with_os, self.expected_slot_count, self.kind))))
82+
return hash("\0".join(map(str, (self.type, self.path, self.shared_with_os, self.expected_slot_count, self.kind, self.pdisk_config))))
8183

8284

8385
Domain = collections.namedtuple(
@@ -264,7 +266,7 @@ def normalize_domain(domain_name):
264266

265267

266268
class ClusterDetailsProvider(object):
267-
def __init__(self, template, walle_provider, validator=None, database=None, use_new_style_cfg=False):
269+
def __init__(self, template, host_info_provider, validator=None, database=None, use_new_style_cfg=False):
268270
if not validator:
269271
validator = validation.default_validator()
270272

@@ -277,10 +279,14 @@ def __init__(self, template, walle_provider, validator=None, database=None, use_
277279
if database is not None:
278280
self.__cluster_description = self.get_subjective_description(self.__cluster_description, database, self.__validator)
279281

280-
self._use_walle = self.__cluster_description.get("use_walle", True)
281-
if not walle_provider:
282-
walle_provider = walle.NopHostsInformationProvider()
283-
self._walle = walle_provider
282+
self._use_walle = self.__cluster_description.get("use_walle", False)
283+
self._k8s_settings = self.__cluster_description.get("k8s_settings", {"use": False})
284+
285+
if host_info_provider is not None:
286+
self._host_info_provider = host_info_provider
287+
else:
288+
self._host_info_provider = walle.NopHostsInformationProvider
289+
284290
self.__translated_storage_pools_deprecated = None
285291
self.__translated_hosts = None
286292
self.__racks = {}
@@ -299,6 +305,7 @@ def __init__(self, template, walle_provider, validator=None, database=None, use_
299305
self.blob_storage_config = self.__cluster_description.get("blob_storage_config")
300306
self.memory_controller_config = self.__cluster_description.get("memory_controller_config", {})
301307
self.channel_profile_config = self.__cluster_description.get("channel_profile_config")
308+
self.immediate_controls_config = self.__cluster_description.get("immediate_controls_config")
302309
self.pdisk_key_config = self.__cluster_description.get("pdisk_key_config", {})
303310
if not self.need_txt_files and not self.use_new_style_kikimr_cfg:
304311
assert "cannot remove txt files without new style kikimr cfg!"
@@ -344,6 +351,18 @@ def storage_config_generation(self):
344351
def use_walle(self):
345352
return self._use_walle
346353

354+
@property
355+
def use_k8s_api(self):
356+
return self._k8s_settings.get("use")
357+
358+
@property
359+
def k8s_rack_label(self):
360+
return self._k8s_settings.get("k8s_rack_label")
361+
362+
@property
363+
def k8s_dc_label(self):
364+
return self._k8s_settings.get("k8s_dc_label")
365+
347366
@property
348367
def security_settings(self):
349368
return self.__cluster_description.get("security_settings", {})
@@ -358,23 +377,29 @@ def _get_datacenter(self, host_description):
358377
dc = host_description.get("location", {}).get("data_center", None)
359378
if dc:
360379
return str(dc)
361-
return str(self._walle.get_datacenter(host_description.get("name", host_description.get("host"))))
380+
return str(self._host_info_provider.get_datacenter(host_description.get("name", host_description.get("host"))))
362381

363382
def _get_rack(self, host_description):
364383
if host_description.get("rack") is not None:
365384
return str(host_description.get("rack"))
366385
rack = host_description.get("location", {}).get("rack", None)
367386
if rack:
368387
return str(rack)
369-
return str(self._walle.get_rack(host_description.get("name", host_description.get("host"))))
388+
389+
hostname = host_description.get("name", host_description.get("host"))
390+
391+
if isinstance(self._host_info_provider, walle.NopHostsInformationProvider):
392+
raise RuntimeError(f"there is no 'rack' specified for host {hostname} in template, and no host info provider has been specified")
393+
394+
return str(self._host_info_provider.get_rack(hostname))
370395

371396
def _get_body(self, host_description):
372397
if host_description.get("body") is not None:
373398
return str(host_description.get("body"))
374399
body = host_description.get("location", {}).get("body", None)
375400
if body:
376401
return str(body)
377-
return str(self._walle.get_body(host_description.get("name", host_description.get("host"))))
402+
return str(self._host_info_provider.get_body(host_description.get("name", host_description.get("host"))))
378403

379404
def _collect_drives_info(self, host_description):
380405
host_config_id = host_description.get("host_config_id", None)
@@ -670,6 +695,10 @@ def grpc_config(self):
670695
def dynamicnameservice_config(self):
671696
return merge_with_default(DYNAMIC_NAME_SERVICE, self.__cluster_description.get("dynamicnameservice", {}))
672697

698+
@property
699+
def nameservice_config(self):
700+
return self.__cluster_description.get("nameservice_config")
701+
673702
@property
674703
def grpc_port(self):
675704
return self.grpc_config.get("port")

ydb/tools/cfg/bin/__main__.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ydb.tools.cfg.static import StaticConfigGenerator
1313
from ydb.tools.cfg.utils import write_to_file
1414
from ydb.tools.cfg.walle import NopHostsInformationProvider, WalleHostsInformationProvider
15+
from ydb.tools.cfg.k8s_api import K8sApiHostsInformationProvider
1516

1617
logging_config.dictConfig(
1718
{
@@ -48,19 +49,36 @@ def cfg_generate(args):
4849
with open(args.cluster_description, "r") as yaml_template:
4950
cluster_template = yaml.safe_load(yaml_template)
5051

51-
hosts_provider = NopHostsInformationProvider()
52+
host_info_provider = NopHostsInformationProvider()
53+
54+
k8s_enabled = cluster_template.get("k8s_settings", {}).get("use", False)
55+
walle_enabled = cluster_template.get("use_walle", False)
56+
5257
if args.hosts_provider_url:
53-
hosts_provider = WalleHostsInformationProvider(args.hosts_provider_url)
58+
if not walle_enabled:
59+
raise RuntimeError("you specified --hosts-provider-url, but `use_walle` is false in template.\nSpecify `use_walle: True` to continue")
60+
host_info_provider = WalleHostsInformationProvider(args.hosts_provider_url)
61+
elif k8s_enabled:
62+
host_info_provider = K8sApiHostsInformationProvider(args.kubeconfig)
63+
64+
if walle_enabled and not isinstance(host_info_provider, WalleHostsInformationProvider):
65+
raise RuntimeError("you specified 'use_walle: True', but didn't specify --hosts-provider-url to initialize walle")
66+
67+
if walle_enabled and k8s_enabled:
68+
raise RuntimeError("you specified 'use_walle: True' and 'k8s_settings.use: True', please select a single host info provider")
69+
70+
if not walle_enabled and not k8s_enabled:
71+
logger.warning("you didn't specify any host info provider (neither walle nor k8s). Make sure you know what you are doing")
5472

55-
generator = cfg_cls(cluster_template, args.binary_path, args.output_dir, walle_provider=hosts_provider, **kwargs)
73+
generator = cfg_cls(cluster_template, args.binary_path, args.output_dir, host_info_provider=host_info_provider, **kwargs)
5674

5775
all_configs = generator.get_all_configs()
5876
for cfg_name, cfg_value in all_configs.items():
5977
write_to_file(os.path.join(args.output_dir, cfg_name), cfg_value)
6078

6179

6280
def main():
63-
parser = get_parser(cfg_generate, [{"name": "--hosts-provider-url", "help": "URL from which information about hosts can be obtained."}])
81+
parser = get_parser(cfg_generate)
6482
args = parser.parse_args()
6583
args.func(args)
6684

ydb/tools/cfg/configurator_setup.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import argparse
22
import random
33

4+
from pathlib import Path
5+
46

57
def parse_optional_arguments(args):
68
kwargs = {}
@@ -73,6 +75,19 @@ def get_parser(generate_func, extra_cfg_arguments=[]):
7375
help=v['help'],
7476
)
7577

78+
parser_cfg.add_argument(
79+
"--hosts-provider-url",
80+
type=str,
81+
help="""URL from which information about hosts can be obtained.
82+
Mutually exclusive with --hosts-provider-k8s""")
83+
84+
home_directory = str(Path.home())
85+
defaultKubeconfigLocation = "{0}/.kube/config".format(home_directory)
86+
parser_cfg.add_argument("--kubeconfig",
87+
type=str,
88+
help="path to the kubeconfig file. Default `$HOME/.kube/config`, also see --hosts-provider-k8s",
89+
default=defaultKubeconfigLocation)
90+
7691
argument_group = parser_cfg.add_mutually_exclusive_group()
7792

7893
argument_group.add_argument(

ydb/tools/cfg/dynamic.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@ def __init__(
2020
output_dir,
2121
grpc_endpoint=None,
2222
local_binary_path=None,
23-
walle_provider=None,
23+
host_info_provider=None,
2424
**kwargs
2525
):
2626
self._template = template
2727
self._binary_path = binary_path
2828
self._local_binary_path = local_binary_path or binary_path
2929
self._output_dir = output_dir
30-
self._walle_provider = walle_provider
31-
self._cluster_details = base.ClusterDetailsProvider(template, walle_provider=self._walle_provider)
30+
self._host_info_provider = host_info_provider
31+
self._cluster_details = base.ClusterDetailsProvider(template, host_info_provider=self._host_info_provider)
3232
self._grpc_endpoint = grpc_endpoint
3333
self.__configure_request = None
3434
self.__static_config = static.StaticConfigGenerator(
35-
template, binary_path, output_dir, walle_provider=walle_provider, local_binary_path=local_binary_path
35+
template, binary_path, output_dir, host_info_provider=host_info_provider, local_binary_path=local_binary_path
3636
)
3737

3838
@property

ydb/tools/cfg/k8s_api/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
from .k8s_api import K8sApiHostsInformationProvider # noqa
4+
5+
__all__ = ("k8s_api",)

ydb/tools/cfg/k8s_api/k8s_api.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import logging
2+
import threading
3+
import hashlib
4+
5+
from kubernetes import client, config
6+
7+
from ydb.tools.cfg.walle import HostsInformationProvider
8+
9+
logger = logging.getLogger()
10+
11+
12+
class K8sApiHostsInformationProvider(HostsInformationProvider):
13+
def __init__(self, kubeconfig):
14+
self._kubeconfig = kubeconfig
15+
self._cache = {}
16+
self._timeout_seconds = 5
17+
self._retry_count = 10
18+
self._lock = threading.Lock()
19+
self._k8s_rack_label = None
20+
self._k8s_dc_label = None
21+
22+
def _init_k8s_labels(self, rack_label, dc_label):
23+
self._k8s_rack_label = rack_label
24+
self._k8s_dc_label = dc_label
25+
logger.info(f"initialized rack with {rack_label}, dc with {dc_label}")
26+
self._populate_cache()
27+
28+
def _populate_cache(self):
29+
try:
30+
config.load_kube_config(config_file=self._kubeconfig)
31+
with client.ApiClient() as api_client:
32+
v1 = client.CoreV1Api(api_client)
33+
nodes = v1.list_node().items
34+
35+
with self._lock:
36+
for node in nodes:
37+
hostname = node.metadata.name
38+
self._cache[hostname] = node.metadata.labels
39+
except client.exceptions.ApiException as e:
40+
print(f"Failed to fetch node labels: {e}")
41+
42+
def get_rack(self, hostname):
43+
if self._k8s_rack_label is None:
44+
return "defaultRack"
45+
46+
labels = self._cache.get(hostname)
47+
if labels and self._k8s_rack_label in labels:
48+
logging.info(f"get_rack invoked on {hostname}, value {labels[self._k8s_rack_label]}")
49+
return labels[self._k8s_rack_label]
50+
logging.info(f"rack not found for hostname {hostname}")
51+
return ""
52+
53+
def get_datacenter(self, hostname):
54+
logging.info(f"get_datacenter invoked on {hostname}")
55+
56+
if self._k8s_dc_label is None:
57+
return "defaultDC"
58+
59+
labels = self._cache.get(hostname)
60+
if labels and self._k8s_dc_label in labels:
61+
return labels[self._k8s_dc_label]
62+
return ""
63+
64+
def get_body(self, hostname):
65+
# Just something for now, please present better ideas
66+
hex_digest = hashlib.md5(hostname.encode()).hexdigest()
67+
decimal_value = int(hex_digest, 16) % (1 << 31)
68+
return decimal_value

ydb/tools/cfg/k8s_api/ya.make

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
PY3_LIBRARY()
2+
3+
PY_SRCS(
4+
__init__.py
5+
k8s_api.py
6+
)
7+
8+
PEERDIR(
9+
contrib/python/kubernetes
10+
)
11+
12+
END()

0 commit comments

Comments
 (0)