Skip to content

Commit 3ab680b

Browse files
authored
Merge 551be8e into a9e6ce0
2 parents a9e6ce0 + 551be8e commit 3ab680b

File tree

7 files changed

+882
-0
lines changed

7 files changed

+882
-0
lines changed

ydb/tests/fq/kikimr/conftest.py

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import pytest
5+
6+
from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
7+
from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
8+
from ydb.tests.tools.fq_runner.kikimr_utils import ExtensionPoint
9+
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
10+
from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
11+
from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
12+
from ydb.tests.tools.fq_runner.kikimr_utils import StatsModeExtension
13+
from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr
14+
15+
16+
@pytest.fixture
17+
def stats_mode():
18+
return ''
19+
20+
21+
@pytest.fixture
22+
def kikimr(request: pytest.FixtureRequest, yq_version: str, stats_mode: str):
23+
kikimr_extensions = [DefaultConfigExtension(""),
24+
YQv2Extension(yq_version),
25+
ComputeExtension(),
26+
StatsModeExtension(stats_mode)]
27+
with start_kikimr(request, kikimr_extensions) as kikimr:
28+
yield kikimr
29+
30+
31+
class ManyRetriesConfigExtension(ExtensionPoint):
32+
def __init__(self):
33+
super().__init__()
34+
35+
def is_applicable(self, request):
36+
return True
37+
38+
def apply_to_kikimr(self, request, kikimr):
39+
kikimr.compute_plane.fq_config['control_plane_storage']['retry_policy_mapping'] = [
40+
{
41+
'status_code': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
42+
'policy': {
43+
'retry_count': 10000
44+
}
45+
}
46+
]
47+
48+
49+
@pytest.fixture
50+
def kikimr_many_retries(request: pytest.FixtureRequest, yq_version: str):
51+
kikimr_extensions = [DefaultConfigExtension(""),
52+
ManyRetriesConfigExtension(),
53+
YQv2Extension(yq_version),
54+
ComputeExtension()]
55+
with start_kikimr(request, kikimr_extensions) as kikimr:
56+
yield kikimr
57+
58+
59+
def create_client(kikimr, request):
60+
return FederatedQueryClient(request.param["folder_id"] if request is not None else "my_folder",
61+
streaming_over_kikimr=kikimr)
62+
63+
64+
@pytest.fixture
65+
def client(kikimr, request=None):
66+
return create_client(kikimr, request)
67+
68+
69+
@pytest.fixture
70+
def client_many_retries(kikimr_many_retries, request=None):
71+
return create_client(kikimr_many_retries, request)

ydb/tests/fq/kikimr/test_base.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
5+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
6+
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
7+
8+
9+
class TestBaseWithAbortingConfigParams(TestYdsBase):
10+
11+
@classmethod
12+
def setup_class(cls):
13+
kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
14+
cls.streaming_over_kikimr = StreamingOverKikimr(kikimr_conf)
15+
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "2s"
16+
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {}
17+
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 1
18+
cls.streaming_over_kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s"
19+
cls.streaming_over_kikimr.start_mvp_mock_server()
20+
cls.streaming_over_kikimr.start()
21+
22+
@classmethod
23+
def teardown_class(cls):
24+
if hasattr(cls, "streaming_over_kikimr"):
25+
cls.streaming_over_kikimr.stop_mvp_mock_server()
26+
cls.streaming_over_kikimr.stop()

0 commit comments

Comments
 (0)