Skip to content

Commit 4c7139d

Browse files
authored
Merge 8d336e8 into fbc9d17
2 parents fbc9d17 + 8d336e8 commit 4c7139d

File tree

6 files changed

+492
-0
lines changed

6 files changed

+492
-0
lines changed

ydb/tests/fq/kikimr/conftest.py

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import pytest
5+
6+
from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
7+
from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
8+
from ydb.tests.tools.fq_runner.kikimr_utils import ExtensionPoint
9+
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
10+
from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
11+
from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
12+
from ydb.tests.tools.fq_runner.kikimr_utils import StatsModeExtension
13+
from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr
14+
15+
16+
@pytest.fixture
17+
def stats_mode():
18+
return ''
19+
20+
21+
@pytest.fixture
22+
def kikimr(request: pytest.FixtureRequest, yq_version: str, stats_mode: str):
23+
kikimr_extensions = [DefaultConfigExtension(""),
24+
YQv2Extension(yq_version),
25+
ComputeExtension(),
26+
StatsModeExtension(stats_mode)]
27+
with start_kikimr(request, kikimr_extensions) as kikimr:
28+
yield kikimr
29+
30+
31+
class ManyRetriesConfigExtension(ExtensionPoint):
32+
def __init__(self):
33+
super().__init__()
34+
35+
def is_applicable(self, request):
36+
return True
37+
38+
def apply_to_kikimr(self, request, kikimr):
39+
kikimr.compute_plane.fq_config['control_plane_storage']['retry_policy_mapping'] = [
40+
{
41+
'status_code': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
42+
'policy': {
43+
'retry_count': 10000
44+
}
45+
}
46+
]
47+
48+
49+
@pytest.fixture
50+
def kikimr_many_retries(request: pytest.FixtureRequest, yq_version: str):
51+
kikimr_extensions = [DefaultConfigExtension(""),
52+
ManyRetriesConfigExtension(),
53+
YQv2Extension(yq_version),
54+
ComputeExtension()]
55+
with start_kikimr(request, kikimr_extensions) as kikimr:
56+
yield kikimr
57+
58+
59+
def create_client(kikimr, request):
60+
return FederatedQueryClient(request.param["folder_id"] if request is not None else "my_folder",
61+
streaming_over_kikimr=kikimr)
62+
63+
64+
@pytest.fixture
65+
def client(kikimr, request=None):
66+
return create_client(kikimr, request)
67+
68+
69+
@pytest.fixture
70+
def client_many_retries(kikimr_many_retries, request=None):
71+
return create_client(kikimr_many_retries, request)

ydb/tests/fq/kikimr/test_base.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
5+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
6+
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
7+
8+
9+
class TestBaseWithAbortingConfigParams(TestYdsBase):
10+
11+
@classmethod
12+
def setup_class(cls):
13+
kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
14+
cls.streaming_over_kikimr = StreamingOverKikimr(kikimr_conf)
15+
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "2s"
16+
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {}
17+
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 1
18+
cls.streaming_over_kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s"
19+
cls.streaming_over_kikimr.start_mvp_mock_server()
20+
cls.streaming_over_kikimr.start()
21+
22+
@classmethod
23+
def teardown_class(cls):
24+
if hasattr(cls, "streaming_over_kikimr"):
25+
cls.streaming_over_kikimr.stop_mvp_mock_server()
26+
cls.streaming_over_kikimr.stop()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import pytest
5+
import logging
6+
import os
7+
import time
8+
9+
import ydb.tests.library.common.yatest_common as yatest_common
10+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
11+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
12+
import library.python.retry as retry
13+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
14+
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
15+
import ydb.public.api.protos.draft.fq_pb2 as fq
16+
17+
18+
@pytest.fixture
19+
def kikimr(request):
20+
kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True, node_count=2)
21+
kikimr = StreamingOverKikimr(kikimr_conf)
22+
kikimr.start_mvp_mock_server()
23+
kikimr.start()
24+
yield kikimr
25+
kikimr.stop_mvp_mock_server()
26+
kikimr.stop()
27+
28+
29+
class TestRecoveryMatchRecognize(TestYdsBase):
30+
31+
@classmethod
32+
def setup_class(cls):
33+
# for retry
34+
cls.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1)
35+
36+
@retry.retry_intrusive
37+
def get_graph_master_node_id(self, kikimr, query_id):
38+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
39+
if kikimr.control_plane.get_task_count(node_index, query_id) > 0:
40+
return node_index
41+
assert False, "No active graphs found"
42+
43+
def get_ca_count(self, kikimr, node_index):
44+
result = kikimr.control_plane.get_sensors(node_index, "utils").find_sensor(
45+
{"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"}
46+
)
47+
return result if result is not None else 0
48+
49+
def dump_workers(self, kikimr, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)):
50+
deadline = time.time() + wait_time
51+
while True:
52+
wcs = 0
53+
ccs = 0
54+
list = []
55+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
56+
wc = kikimr.control_plane.get_worker_count(node_index)
57+
cc = self.get_ca_count(kikimr, node_index)
58+
wcs += wc
59+
ccs += cc
60+
list.append([node_index, wc, cc])
61+
if wcs == worker_count and ccs == ca_count:
62+
for [s, w, c] in list:
63+
if w * 2 != c:
64+
continue
65+
for [s, w, c] in list:
66+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
67+
return
68+
if time.time() > deadline:
69+
for [s, w, c] in list:
70+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
71+
assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count)
72+
73+
@yq_v1
74+
@pytest.mark.parametrize("kikimr", [(None, None, None)], indirect=["kikimr"])
75+
def test_program_state_recovery(self, kikimr, client, yq_version):
76+
77+
self.init_topics(f"pq_kikimr_streaming_{yq_version}")
78+
79+
sql = R'''
80+
PRAGMA dq.MaxTasksPerStage="2";
81+
82+
pragma FeatureR010="prototype";
83+
pragma config.flags("TimeOrderRecoverDelay", "-1000000");
84+
pragma config.flags("TimeOrderRecoverAhead", "1000000");
85+
86+
INSERT INTO myyds.`{output_topic}`
87+
SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow()))))
88+
FROM (SELECT * FROM myyds.`{input_topic}`
89+
WITH (
90+
format=json_each_row,
91+
SCHEMA
92+
(
93+
dt UINT64
94+
)))
95+
MATCH_RECOGNIZE(
96+
ORDER BY CAST(dt as Timestamp)
97+
MEASURES
98+
LAST(ALL_TRUE.dt) as dt
99+
ONE ROW PER MATCH
100+
PATTERN ( ALL_TRUE )
101+
DEFINE
102+
ALL_TRUE as True)''' \
103+
.format(
104+
input_topic=self.input_topic,
105+
output_topic=self.output_topic,
106+
)
107+
108+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
109+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
110+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
111+
kikimr.compute_plane.wait_zero_checkpoint(query_id)
112+
113+
master_node_index = self.get_graph_master_node_id(kikimr, query_id)
114+
logging.debug("Master node {}".format(master_node_index))
115+
116+
messages1 = ['{"dt": 1696849942400002}', '{"dt": 1696849942000001}']
117+
self.write_stream(messages1)
118+
119+
logging.debug("get_completed_checkpoints {}".format(kikimr.compute_plane.get_completed_checkpoints(query_id)))
120+
kikimr.compute_plane.wait_completed_checkpoints(
121+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
122+
)
123+
124+
# restart node with CA
125+
node_to_restart = None
126+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
127+
wc = kikimr.control_plane.get_worker_count(node_index)
128+
if wc is not None:
129+
if wc > 0 and node_index != master_node_index and node_to_restart is None:
130+
node_to_restart = node_index
131+
assert node_to_restart is not None, "Can't find any task on non master node"
132+
133+
logging.debug("Restart non-master node {}".format(node_to_restart))
134+
135+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
136+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
137+
kikimr.control_plane.wait_bootstrap(node_to_restart)
138+
139+
messages2 = [
140+
'{"dt": 1696849942800000}',
141+
'{"dt": 1696849943200003}',
142+
'{"dt": 1696849943300003}',
143+
'{"dt": 1696849943600003}',
144+
'{"dt": 1696849943900003}'
145+
]
146+
self.write_stream(messages2)
147+
148+
assert client.get_query_status(query_id) == fq.QueryMeta.RUNNING
149+
150+
expected = ['{"dt":1696849942000001}', '{"dt":1696849942400002}', '{"dt":1696849942800000}']
151+
152+
read_data = self.read_stream(len(expected))
153+
logging.info("Data was read: {}".format(read_data))
154+
155+
assert read_data == expected
156+
157+
client.abort_query(query_id)
158+
client.wait_query(query_id)
159+
160+
self.dump_workers(kikimr, 0, 0)

0 commit comments

Comments
 (0)