Skip to content

Commit 243eeb8

Browse files
committed
YQ-2549 Move yds integration tests to github (ydb-platform#1592)
* Move mr tests
1 parent dd39aa2 commit 243eeb8

File tree

4 files changed

+751
-0
lines changed

4 files changed

+751
-0
lines changed

ydb/tests/fq/yds/test_recovery.py

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import logging
5+
import time
6+
import multiprocessing
7+
import pytest
8+
import os
9+
import random
10+
11+
import yatest
12+
13+
from ydb.tests.library.harness import param_constants
14+
import ydb.tests.library.common.yatest_common as yatest_common
15+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
16+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
17+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
18+
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
19+
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
20+
21+
import ydb.public.api.protos.draft.fq_pb2 as fq
22+
23+
import library.python.retry as retry
24+
25+
26+
def run_with_sleep(args):
27+
program_args, time_min, time_max, duration = args
28+
deadline = time.time() + duration
29+
while time.time() < deadline:
30+
yatest.common.execute(program_args)
31+
time.sleep(random.uniform(time_min, time_max))
32+
33+
34+
@pytest.fixture
35+
def kikimr():
36+
kikimr_conf = StreamingOverKikimrConfig(node_count=8, cloud_mode=True)
37+
kikimr = StreamingOverKikimr(kikimr_conf)
38+
kikimr.start_mvp_mock_server()
39+
kikimr.start()
40+
yield kikimr
41+
kikimr.stop()
42+
kikimr.stop_mvp_mock_server()
43+
44+
45+
class TestRecovery(TestYdsBase):
46+
@classmethod
47+
def setup_class(cls):
48+
# for retry
49+
cls.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1)
50+
51+
@retry.retry_intrusive
52+
def get_graph_master_node_id(self, query_id):
53+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
54+
if self.kikimr.control_plane.get_task_count(node_index, query_id) > 0:
55+
return node_index
56+
assert False, "No active graphs found"
57+
58+
def get_ca_count(self, node_index):
59+
result = self.kikimr.control_plane.get_sensors(node_index, "utils").find_sensor({"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"})
60+
return result if result is not None else 0
61+
62+
def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)):
63+
deadline = time.time() + wait_time
64+
while True:
65+
wcs = 0
66+
ccs = 0
67+
list = []
68+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
69+
wc = self.kikimr.control_plane.get_worker_count(node_index)
70+
cc = self.get_ca_count(node_index)
71+
wcs += wc
72+
ccs += cc
73+
list.append([node_index, wc, cc])
74+
if wcs == worker_count and ccs == ca_count:
75+
for [s, w, c] in list:
76+
if w * 2 != c:
77+
continue
78+
for [s, w, c] in list:
79+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
80+
return
81+
if time.time() > deadline:
82+
for [s, w, c] in list:
83+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
84+
assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count)
85+
86+
@yq_v1
87+
def test_delete(self, client, kikimr):
88+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
89+
kikimr.control_plane.wait_bootstrap(node_index)
90+
91+
self.kikimr = kikimr
92+
self.init_topics("recovery", partitions_count=2)
93+
94+
# Consumer and topics to create are written in ya.make file.
95+
sql = R'''
96+
PRAGMA dq.MaxTasksPerStage="2";
97+
98+
INSERT INTO myyds.`{output_topic}`
99+
SELECT STREAM
100+
*
101+
FROM myyds.`{input_topic}`;'''\
102+
.format(
103+
input_topic=self.input_topic,
104+
output_topic=self.output_topic,
105+
)
106+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
107+
108+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
109+
logging.debug("Uuid = {}".format(kikimr.uuid))
110+
111+
self.dump_workers(2, 4)
112+
113+
client.abort_query(query_id)
114+
client.wait_query(query_id)
115+
116+
self.dump_workers(0, 0)
117+
118+
@yq_v1
119+
def test_program_state_recovery(self, client, kikimr):
120+
# 100 105 110 115 120 125 130 135 140 (ms)
121+
# [ Bucket1 ) |(emited)
122+
# [ Bucket2 ) |(emited)
123+
# .<------------------------------------- restart
124+
# [ Bucket3 ) |(emited)
125+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
126+
kikimr.control_plane.wait_bootstrap(node_index)
127+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
128+
kikimr.control_plane.wait_discovery(node_index)
129+
130+
self.kikimr = kikimr
131+
self.init_topics("program_state_recovery", partitions_count=1)
132+
133+
# Consumer and topics to create are written in ya.make file.
134+
sql = f'''
135+
PRAGMA dq.MaxTasksPerStage="1";
136+
INSERT INTO myyds.`{self.output_topic}`
137+
SELECT STREAM
138+
Yson::SerializeText(Yson::From(TableRow()))
139+
FROM (
140+
SELECT STREAM
141+
Sum(t) as sum
142+
FROM (
143+
SELECT STREAM
144+
Yson::LookupUint64(ys, "time") as t
145+
FROM (
146+
SELECT STREAM
147+
Yson::Parse(Data) AS ys
148+
FROM myyds.`{self.input_topic}`))
149+
GROUP BY
150+
HOP(DateTime::FromMilliseconds(CAST(Unwrap(t) as Uint32)), "PT0.01S", "PT0.01S", "PT0.01S"));'''
151+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
152+
153+
query_id = client.create_query("test_program_state_recovery", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
154+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
155+
logging.debug("Uuid = {}".format(kikimr.uuid))
156+
master_node_index = self.get_graph_master_node_id(query_id)
157+
logging.debug("Master node {}".format(master_node_index))
158+
kikimr.compute_plane.wait_zero_checkpoint(query_id)
159+
160+
self.write_stream([f'{{"time" = {i};}}' for i in range(100, 115, 2)])
161+
162+
kikimr.compute_plane.wait_completed_checkpoints(query_id, self.kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)
163+
164+
# restart node with CA
165+
node_to_restart = None
166+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
167+
wc = kikimr.control_plane.get_worker_count(node_index)
168+
if wc is not None:
169+
if wc > 0 and node_index != master_node_index and node_to_restart is None:
170+
node_to_restart = node_index
171+
assert node_to_restart is not None, "Can't find any task on non master node"
172+
173+
logging.debug("Restart non-master node {}".format(node_to_restart))
174+
175+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
176+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
177+
kikimr.control_plane.wait_bootstrap(node_to_restart)
178+
179+
self.write_stream([f'{{"time" = {i};}}' for i in range(116, 144, 2)])
180+
181+
# wait aggregated
182+
expected = [
183+
'{"sum" = 520u}',
184+
'{"sum" = 570u}',
185+
'{"sum" = 620u}',
186+
]
187+
received = self.read_stream(3)
188+
assert received == expected
189+
190+
client.abort_query(query_id)
191+
client.wait_query(query_id)
192+
193+
self.dump_workers(0, 0)
194+
195+
@yq_v1
196+
# @pytest.mark.parametrize(
197+
# "restart_master",
198+
# [False, True],
199+
# ids=["not_master", "master"]
200+
# )
201+
def test_recovery(self, client, kikimr):
202+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
203+
kikimr.control_plane.wait_bootstrap(node_index)
204+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
205+
kikimr.control_plane.wait_discovery(node_index)
206+
207+
self.init_topics("recovery", partitions_count=2)
208+
209+
self.kikimr = kikimr
210+
211+
# Consumer and topics to create are written in ya.make file.
212+
sql = R'''
213+
PRAGMA dq.MaxTasksPerStage="2";
214+
215+
INSERT INTO myyds.`{output_topic}`
216+
SELECT STREAM
217+
*
218+
FROM myyds.`{input_topic}`;'''\
219+
.format(
220+
input_topic=self.input_topic,
221+
output_topic=self.output_topic,
222+
)
223+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
224+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
225+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
226+
kikimr.compute_plane.wait_zero_checkpoint(query_id)
227+
228+
logging.debug("Uuid = {}".format(kikimr.uuid))
229+
master_node_index = self.get_graph_master_node_id(query_id)
230+
logging.debug("Master node {}".format(master_node_index))
231+
232+
self.write_stream([str(i) for i in range(1, 11)])
233+
234+
d = {}
235+
236+
read_data = self.read_stream(10)
237+
assert len(read_data) == 10
238+
for m in read_data:
239+
n = int(m)
240+
assert n >= 1 and n <= 10
241+
assert n not in d
242+
d[n] = 1
243+
244+
self.dump_workers(2, 4)
245+
246+
node_to_restart = None
247+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
248+
wc = kikimr.control_plane.get_worker_count(node_index)
249+
if wc is not None:
250+
if wc > 0 and node_index != master_node_index and node_to_restart is None:
251+
node_to_restart = node_index
252+
assert node_to_restart is not None, "Can't find any task on non master node"
253+
254+
logging.debug("Restart non-master node {}".format(node_to_restart))
255+
256+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
257+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
258+
kikimr.control_plane.wait_bootstrap(node_to_restart)
259+
260+
self.dump_workers(2, 4)
261+
262+
self.write_stream([str(i) for i in range(11, 21)])
263+
264+
read_data = self.read_stream(10)
265+
assert len(read_data) == 10
266+
for m in read_data:
267+
n = int(m)
268+
assert n >= 1 and n <= 20
269+
if n in d:
270+
d[n] = d[n] + 1
271+
else:
272+
d[n] = 1
273+
274+
assert len(d) == 20
275+
276+
logging.debug("Restart Master node {}".format(master_node_index))
277+
278+
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].stop()
279+
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].start()
280+
kikimr.control_plane.wait_bootstrap(master_node_index)
281+
master_node_index = self.get_graph_master_node_id(query_id)
282+
283+
logging.debug("New master node {}".format(master_node_index))
284+
285+
self.dump_workers(2, 4)
286+
287+
self.write_stream([str(i) for i in range(21, 31)])
288+
289+
read_data = self.read_stream(10)
290+
assert len(read_data) == 10
291+
for m in read_data:
292+
n = int(m)
293+
assert n >= 1 and n <= 30
294+
if n in d:
295+
d[n] = d[n] + 1
296+
else:
297+
d[n] = 1
298+
assert len(d) == 30
299+
300+
zero_checkpoints_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(query_id, "StartedFromEmptyCheckpoint")
301+
restored_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(query_id, "RestoredFromSavedCheckpoint")
302+
assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(restored_metric, zero_checkpoints_metric)
303+
304+
client.abort_query(query_id)
305+
client.wait_query(query_id)
306+
307+
def close_ic_session_args(self, node1, node2):
308+
s1 = self.kikimr.control_plane.kikimr_cluster.nodes[node1]
309+
s2 = self.kikimr.control_plane.kikimr_cluster.nodes[node2]
310+
# action = "closepeersocket"
311+
# action = "poisonsession"
312+
action = "closeinputsession"
313+
return [param_constants.kikimr_driver_path(),
314+
"-s", "{}:{}".format(s1.host, s1.grpc_port),
315+
"admin", "debug", "interconnect", action,
316+
"--node", str(s2.node_id)]
317+
318+
def slowpoke_args(self, node):
319+
s = self.kikimr.control_plane.kikimr_cluster.nodes[node]
320+
return [param_constants.kikimr_driver_path(),
321+
"-s", "{}:{}".format(s.host, s.grpc_port),
322+
"admin", "debug", "interconnect", "slowpoke",
323+
"--pool-id", "4",
324+
"--duration", "30s",
325+
"--sleep-min", yatest_common.plain_or_under_sanitizer("10ms", "50ms"),
326+
"--sleep-max", yatest_common.plain_or_under_sanitizer("100ms", "500ms"),
327+
"--reschedule-min", "10ms", "--reschedule-max", "100ms",
328+
"--num-actors", "2"]
329+
330+
def start_close_ic_sessions_processes(self):
331+
pool = multiprocessing.Pool()
332+
args = []
333+
334+
for node1_index in self.kikimr.control_plane.kikimr_cluster.nodes:
335+
yatest.common.execute(self.slowpoke_args(node1_index))
336+
for node2_index in self.kikimr.control_plane.kikimr_cluster.nodes:
337+
if node2_index > node1_index:
338+
args.append((self.close_ic_session_args(node1_index, node2_index), 0.1, 2, 30))
339+
return pool.map_async(run_with_sleep, args)
340+
341+
@yq_v1
342+
@pytest.mark.skip(reason="Should be tuned")
343+
def test_ic_disconnection(self, client):
344+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
345+
self.kikimr.control_plane.wait_bootstrap(node_index)
346+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
347+
self.kikimr.control_plane.wait_discovery(node_index)
348+
349+
self.kikimr = kikimr
350+
self.init_topics("disconnection", partitions_count=2)
351+
input_topic_1 = "disconnection_i_1"
352+
input_topic_2 = "disconnection_i_2"
353+
create_stream(input_topic_1)
354+
create_stream(input_topic_2)
355+
356+
# Consumer and topics to create are written in ya.make file.
357+
sql = R'''
358+
PRAGMA dq.MaxTasksPerStage="42";
359+
360+
INSERT INTO myyds.`{output_topic}`
361+
SELECT (S1.Data || S2.Data) || ""
362+
FROM myyds.`{input_topic_1}` AS S1
363+
INNER JOIN (SELECT * FROM myyds.`{input_topic_2}`) AS S2
364+
ON S1.Data = S2.Data
365+
'''\
366+
.format(
367+
input_topic_1=input_topic_1,
368+
input_topic_2=input_topic_2,
369+
output_topic=self.output_topic,
370+
)
371+
372+
close_ic_sessions_future = self.start_close_ic_sessions_processes()
373+
374+
folder_id = "my_folder"
375+
# automatic query will not clean up metrics after failure
376+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
377+
query_id = client.create_query("disconnected", sql, type=fq.QueryContent.QueryType.STREAMING, automatic=True).result.query_id
378+
automatic_id = "automatic_" + folder_id
379+
380+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
381+
382+
# Checkpointing must be finished
383+
deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900)
384+
while True:
385+
status = client.describe_query(query_id).result.query.meta.status
386+
assert status == fq.QueryMeta.RUNNING, "Unexpected status " + fq.QueryMeta.ComputeStatus.Name(status)
387+
completed = self.kikimr.control_plane.get_completed_checkpoints(automatic_id, False)
388+
if completed >= 5:
389+
break
390+
assert time.time() < deadline, "Completed: {}".format(completed)
391+
time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))
392+
393+
close_ic_sessions_future.wait()

0 commit comments

Comments
 (0)