Skip to content

Commit 8d336e8

Browse files
committed
Add second test
1 parent 8fc6024 commit 8d336e8

File tree

2 files changed

+197
-0
lines changed

2 files changed

+197
-0
lines changed
+196
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import logging
5+
import time
6+
import pytest
7+
import random
8+
import os
9+
import yatest
10+
11+
import ydb.tests.library.common.yatest_common as yatest_common
12+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
13+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
14+
from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig
15+
from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
16+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
17+
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
18+
19+
import library.python.retry as retry
20+
import ydb.public.api.protos.draft.fq_pb2 as fq
21+
22+
23+
@pytest.fixture
24+
def kikimr():
25+
kikimr_conf = StreamingOverKikimrConfig(
26+
cloud_mode=True,
27+
node_count={"/cp": TenantConfig(1),
28+
"/compute": TenantConfig(8)})
29+
kikimr = StreamingOverKikimr(kikimr_conf)
30+
# control
31+
kikimr.control_plane.fq_config['control_plane_storage']['mapping'] = {"common_tenant_name": ["/compute"]}
32+
kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {}
33+
kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 5
34+
kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_period'] = "30s"
35+
kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "3s"
36+
# compute
37+
kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s"
38+
kikimr.start_mvp_mock_server()
39+
kikimr.start()
40+
yield kikimr
41+
kikimr.stop()
42+
kikimr.stop_mvp_mock_server()
43+
44+
45+
def run_with_sleep(args):
46+
program_args, time_min, time_max, duration = args
47+
deadline = time.time() + duration
48+
while time.time() < deadline:
49+
yatest.common.execute(program_args)
50+
time.sleep(random.uniform(time_min, time_max))
51+
52+
53+
class TestRecovery(TestYdsBase):
54+
55+
@retry.retry_intrusive
56+
def get_graph_master_node_id(self, query_id):
57+
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
58+
if self.kikimr.compute_plane.get_task_count(node_index, query_id) > 0:
59+
return node_index
60+
assert False, "No active graphs found"
61+
62+
def get_ca_count(self, node_index):
63+
result = self.kikimr.compute_plane.get_sensors(node_index, "utils").find_sensor({"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"})
64+
return result if result is not None else 0
65+
66+
def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)):
67+
deadline = time.time() + wait_time
68+
while True:
69+
wcs = 0
70+
ccs = 0
71+
list = []
72+
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
73+
wc = self.kikimr.compute_plane.get_worker_count(node_index)
74+
cc = self.get_ca_count(node_index)
75+
wcs += wc
76+
ccs += cc
77+
list.append([node_index, wc, cc])
78+
if wcs == worker_count and ccs == ca_count:
79+
for [s, w, c] in list:
80+
if w * 2 != c:
81+
continue
82+
for [s, w, c] in list:
83+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
84+
return
85+
if time.time() > deadline:
86+
for [s, w, c] in list:
87+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
88+
assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count)
89+
@yq_v1
90+
def test_recovery(self, kikimr, client, yq_version):
91+
self.init_topics(f"pq_kikimr_streaming_{yq_version}", partitions_count=2)
92+
93+
self.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1)
94+
self.kikimr = kikimr
95+
kikimr.compute_plane.wait_bootstrap()
96+
kikimr.compute_plane.wait_discovery()
97+
98+
# Consumer and topics to create are written in ya.make file.
99+
sql = R'''
100+
PRAGMA dq.MaxTasksPerStage="2";
101+
102+
INSERT INTO myyds.`{output_topic}`
103+
SELECT STREAM
104+
*
105+
FROM myyds.`{input_topic}`;'''\
106+
.format(
107+
input_topic=self.input_topic,
108+
output_topic=self.output_topic,
109+
)
110+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
111+
# client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
112+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
113+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
114+
self.kikimr.compute_plane.wait_zero_checkpoint(query_id)
115+
116+
logging.debug("Uuid = {}".format(kikimr.uuid))
117+
master_node_index = self.get_graph_master_node_id(query_id)
118+
logging.debug("Master node {}".format(master_node_index))
119+
120+
self.write_stream([str(i) for i in range(1, 11)])
121+
122+
read_data = self.read_stream(10)
123+
124+
for message in read_data:
125+
logging.info("Received message: {}".format(message))
126+
127+
assert len(read_data) == 10
128+
129+
d = {}
130+
for m in read_data:
131+
n = int(m)
132+
assert n >= 1 and n <= 10
133+
assert n not in d
134+
d[n] = 1
135+
136+
self.dump_workers(2, 4)
137+
138+
node_to_restart = None
139+
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
140+
wc = kikimr.compute_plane.get_worker_count(node_index)
141+
if wc is not None:
142+
if wc > 0 and node_index != master_node_index and node_to_restart is None:
143+
node_to_restart = node_index
144+
assert node_to_restart is not None, "Can't find any task on non master node"
145+
146+
logging.debug("Restart non-master node {}".format(node_to_restart))
147+
148+
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop()
149+
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start()
150+
kikimr.compute_plane.wait_bootstrap(node_to_restart)
151+
152+
self.dump_workers(2, 4)
153+
154+
self.write_stream([str(i) for i in range(11, 21)])
155+
156+
read_data = self.read_stream(10)
157+
assert len(read_data) == 10
158+
159+
for m in read_data:
160+
n = int(m)
161+
assert n >= 1 and n <= 20
162+
if n in d:
163+
d[n] = d[n] + 1
164+
else:
165+
d[n] = 1
166+
167+
logging.debug("Restart Master node {}".format(master_node_index))
168+
169+
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].stop()
170+
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].start()
171+
kikimr.compute_plane.wait_bootstrap(master_node_index)
172+
master_node_index = self.get_graph_master_node_id(query_id)
173+
174+
logging.debug("New master node {}".format(master_node_index))
175+
176+
self.dump_workers(2, 4)
177+
178+
self.write_stream([str(i) for i in range(21, 31)])
179+
180+
read_data = self.read_stream(10)
181+
assert len(read_data) == 10
182+
183+
for m in read_data:
184+
n = int(m)
185+
assert n >= 1 and n <= 30
186+
if n in d:
187+
d[n] = d[n] + 1
188+
else:
189+
d[n] = 1
190+
191+
zero_checkpoints_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, "StartedFromEmptyCheckpoint")
192+
restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, "RestoredFromSavedCheckpoint")
193+
assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(restored_metric, zero_checkpoints_metric)
194+
195+
client.abort_query(query_id)
196+
client.wait_query(query_id)

ydb/tests/fq/kikimr/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ PY_SRCS(
2121

2222
TEST_SRCS(
2323
test_recovery_match_recognize.py
24+
test_recovery_mz.py
2425
)
2526

2627
IF (SANITIZER_TYPE == "thread")

0 commit comments

Comments
 (0)