Skip to content

Commit 551be8e

Browse files
committed
added next test
1 parent 8d336e8 commit 551be8e

File tree

2 files changed

+392
-2
lines changed

2 files changed

+392
-2
lines changed

ydb/tests/fq/kikimr/test_recovery.py

+391
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,391 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import logging
5+
import time
6+
import multiprocessing
7+
import pytest
8+
import os
9+
import random
10+
11+
import yatest
12+
13+
from ydb.tests.library.harness import param_constants
14+
import ydb.tests.library.common.yatest_common as yatest_common
15+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
16+
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
17+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
18+
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
19+
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
20+
21+
import ydb.public.api.protos.draft.fq_pb2 as fq
22+
23+
import library.python.retry as retry
24+
25+
26+
def run_with_sleep(args):
27+
program_args, time_min, time_max, duration = args
28+
deadline = time.time() + duration
29+
while time.time() < deadline:
30+
yatest.common.execute(program_args)
31+
time.sleep(random.uniform(time_min, time_max))
32+
33+
@pytest.fixture
34+
def kikimr():
35+
kikimr_conf = StreamingOverKikimrConfig(node_count=8, cloud_mode=True)
36+
kikimr = StreamingOverKikimr(kikimr_conf)
37+
kikimr.start_mvp_mock_server()
38+
kikimr.start()
39+
yield kikimr
40+
kikimr.stop()
41+
kikimr.stop_mvp_mock_server()
42+
43+
class TestRecovery(TestYdsBase):
44+
@classmethod
45+
def setup_class(cls):
46+
# for retry
47+
cls.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1)
48+
49+
@retry.retry_intrusive
50+
def get_graph_master_node_id(self, query_id):
51+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
52+
if self.kikimr.control_plane.get_task_count(node_index, query_id) > 0:
53+
return node_index
54+
assert False, "No active graphs found"
55+
56+
def get_ca_count(self, node_index):
57+
result = self.kikimr.control_plane.get_sensors(node_index, "utils").find_sensor({"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"})
58+
return result if result is not None else 0
59+
60+
def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)):
61+
deadline = time.time() + wait_time
62+
while True:
63+
wcs = 0
64+
ccs = 0
65+
list = []
66+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
67+
wc = self.kikimr.control_plane.get_worker_count(node_index)
68+
cc = self.get_ca_count(node_index)
69+
wcs += wc
70+
ccs += cc
71+
list.append([node_index, wc, cc])
72+
if wcs == worker_count and ccs == ca_count:
73+
for [s, w, c] in list:
74+
if w * 2 != c:
75+
continue
76+
for [s, w, c] in list:
77+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
78+
return
79+
if time.time() > deadline:
80+
for [s, w, c] in list:
81+
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
82+
assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count)
83+
84+
@yq_v1
85+
def test_delete(self, client, kikimr):
86+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
87+
kikimr.control_plane.wait_bootstrap(node_index)
88+
89+
self.kikimr = kikimr
90+
self.init_topics("recovery", partitions_count=2)
91+
92+
# Consumer and topics to create are written in ya.make file.
93+
sql = R'''
94+
PRAGMA dq.MaxTasksPerStage="2";
95+
96+
INSERT INTO myyds.`{output_topic}`
97+
SELECT STREAM
98+
*
99+
FROM myyds.`{input_topic}`;'''\
100+
.format(
101+
input_topic=self.input_topic,
102+
output_topic=self.output_topic,
103+
)
104+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
105+
106+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
107+
logging.debug("Uuid = {}".format(kikimr.uuid))
108+
109+
self.dump_workers(2, 4)
110+
111+
client.abort_query(query_id)
112+
client.wait_query(query_id)
113+
114+
self.dump_workers(0, 0)
115+
116+
@yq_v1
117+
def test_program_state_recovery(self, client, kikimr):
118+
# 100 105 110 115 120 125 130 135 140 (ms)
119+
# [ Bucket1 ) |(emited)
120+
# [ Bucket2 ) |(emited)
121+
# .<------------------------------------- restart
122+
# [ Bucket3 ) |(emited)
123+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
124+
kikimr.control_plane.wait_bootstrap(node_index)
125+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
126+
kikimr.control_plane.wait_discovery(node_index)
127+
128+
self.kikimr = kikimr
129+
self.init_topics("program_state_recovery", partitions_count=1)
130+
131+
# Consumer and topics to create are written in ya.make file.
132+
sql = f'''
133+
PRAGMA dq.MaxTasksPerStage="1";
134+
INSERT INTO myyds.`{self.output_topic}`
135+
SELECT STREAM
136+
Yson::SerializeText(Yson::From(TableRow()))
137+
FROM (
138+
SELECT STREAM
139+
Sum(t) as sum
140+
FROM (
141+
SELECT STREAM
142+
Yson::LookupUint64(ys, "time") as t
143+
FROM (
144+
SELECT STREAM
145+
Yson::Parse(Data) AS ys
146+
FROM myyds.`{self.input_topic}`))
147+
GROUP BY
148+
HOP(DateTime::FromMilliseconds(CAST(Unwrap(t) as Uint32)), "PT0.01S", "PT0.01S", "PT0.01S"));'''
149+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
150+
151+
query_id = client.create_query("test_program_state_recovery", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
152+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
153+
logging.debug("Uuid = {}".format(kikimr.uuid))
154+
master_node_index = self.get_graph_master_node_id(query_id)
155+
logging.debug("Master node {}".format(master_node_index))
156+
kikimr.compute_plane.wait_zero_checkpoint(query_id)
157+
158+
self.write_stream([f'{{"time" = {i};}}' for i in range(100, 115, 2)])
159+
160+
kikimr.compute_plane.wait_completed_checkpoints(query_id, self.kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)
161+
162+
# restart node with CA
163+
node_to_restart = None
164+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
165+
wc = kikimr.control_plane.get_worker_count(node_index)
166+
if wc is not None:
167+
if wc > 0 and node_index != master_node_index and node_to_restart is None:
168+
node_to_restart = node_index
169+
assert node_to_restart is not None, "Can't find any task on non master node"
170+
171+
logging.debug("Restart non-master node {}".format(node_to_restart))
172+
173+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
174+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
175+
kikimr.control_plane.wait_bootstrap(node_to_restart)
176+
177+
self.write_stream([f'{{"time" = {i};}}' for i in range(116, 144, 2)])
178+
179+
# wait aggregated
180+
expected = [
181+
'{"sum" = 520u}',
182+
'{"sum" = 570u}',
183+
'{"sum" = 620u}',
184+
]
185+
received = self.read_stream(3)
186+
assert received == expected
187+
188+
client.abort_query(query_id)
189+
client.wait_query(query_id)
190+
191+
self.dump_workers(0, 0)
192+
193+
@yq_v1
194+
# @pytest.mark.parametrize(
195+
# "restart_master",
196+
# [False, True],
197+
# ids=["not_master", "master"]
198+
# )
199+
def test_recovery(self, client, kikimr):
200+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
201+
kikimr.control_plane.wait_bootstrap(node_index)
202+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
203+
kikimr.control_plane.wait_discovery(node_index)
204+
205+
self.init_topics("recovery", partitions_count=2)
206+
207+
self.kikimr = kikimr
208+
209+
# Consumer and topics to create are written in ya.make file.
210+
sql = R'''
211+
PRAGMA dq.MaxTasksPerStage="2";
212+
213+
INSERT INTO myyds.`{output_topic}`
214+
SELECT STREAM
215+
*
216+
FROM myyds.`{input_topic}`;'''\
217+
.format(
218+
input_topic=self.input_topic,
219+
output_topic=self.output_topic,
220+
)
221+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
222+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
223+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
224+
kikimr.compute_plane.wait_zero_checkpoint(query_id)
225+
226+
logging.debug("Uuid = {}".format(kikimr.uuid))
227+
master_node_index = self.get_graph_master_node_id(query_id)
228+
logging.debug("Master node {}".format(master_node_index))
229+
230+
self.write_stream([str(i) for i in range(1, 11)])
231+
232+
d = {}
233+
234+
read_data = self.read_stream(10)
235+
assert len(read_data) == 10
236+
for m in read_data:
237+
n = int(m)
238+
assert n >= 1 and n <= 10
239+
assert n not in d
240+
d[n] = 1
241+
242+
self.dump_workers(2, 4)
243+
244+
node_to_restart = None
245+
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
246+
wc = kikimr.control_plane.get_worker_count(node_index)
247+
if wc is not None:
248+
if wc > 0 and node_index != master_node_index and node_to_restart is None:
249+
node_to_restart = node_index
250+
assert node_to_restart is not None, "Can't find any task on non master node"
251+
252+
logging.debug("Restart non-master node {}".format(node_to_restart))
253+
254+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
255+
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
256+
kikimr.control_plane.wait_bootstrap(node_to_restart)
257+
258+
self.dump_workers(2, 4)
259+
260+
self.write_stream([str(i) for i in range(11, 21)])
261+
262+
read_data = self.read_stream(10)
263+
assert len(read_data) == 10
264+
for m in read_data:
265+
n = int(m)
266+
assert n >= 1 and n <= 20
267+
if n in d:
268+
d[n] = d[n] + 1
269+
else:
270+
d[n] = 1
271+
272+
assert len(d) == 20
273+
274+
logging.debug("Restart Master node {}".format(master_node_index))
275+
276+
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].stop()
277+
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].start()
278+
kikimr.control_plane.wait_bootstrap(master_node_index)
279+
master_node_index = self.get_graph_master_node_id(query_id)
280+
281+
logging.debug("New master node {}".format(master_node_index))
282+
283+
self.dump_workers(2, 4)
284+
285+
self.write_stream([str(i) for i in range(21, 31)])
286+
287+
read_data = self.read_stream(10)
288+
assert len(read_data) == 10
289+
for m in read_data:
290+
n = int(m)
291+
assert n >= 1 and n <= 30
292+
if n in d:
293+
d[n] = d[n] + 1
294+
else:
295+
d[n] = 1
296+
assert len(d) == 30
297+
298+
zero_checkpoints_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(query_id, "StartedFromEmptyCheckpoint")
299+
restored_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(query_id, "RestoredFromSavedCheckpoint")
300+
assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(restored_metric, zero_checkpoints_metric)
301+
302+
client.abort_query(query_id)
303+
client.wait_query(query_id)
304+
305+
def close_ic_session_args(self, node1, node2):
306+
s1 = self.kikimr.control_plane.kikimr_cluster.nodes[node1]
307+
s2 = self.kikimr.control_plane.kikimr_cluster.nodes[node2]
308+
# action = "closepeersocket"
309+
# action = "poisonsession"
310+
action = "closeinputsession"
311+
return [param_constants.kikimr_driver_path(),
312+
"-s", "{}:{}".format(s1.host, s1.grpc_port),
313+
"admin", "debug", "interconnect", action,
314+
"--node", str(s2.node_id)]
315+
316+
def slowpoke_args(self, node):
317+
s = self.kikimr.control_plane.kikimr_cluster.nodes[node]
318+
return [param_constants.kikimr_driver_path(),
319+
"-s", "{}:{}".format(s.host, s.grpc_port),
320+
"admin", "debug", "interconnect", "slowpoke",
321+
"--pool-id", "4",
322+
"--duration", "30s",
323+
"--sleep-min", yatest_common.plain_or_under_sanitizer("10ms", "50ms"),
324+
"--sleep-max", yatest_common.plain_or_under_sanitizer("100ms", "500ms"),
325+
"--reschedule-min", "10ms", "--reschedule-max", "100ms",
326+
"--num-actors", "2"]
327+
328+
def start_close_ic_sessions_processes(self):
329+
pool = multiprocessing.Pool()
330+
args = []
331+
332+
for node1_index in self.kikimr.control_plane.kikimr_cluster.nodes:
333+
yatest.common.execute(self.slowpoke_args(node1_index))
334+
for node2_index in self.kikimr.control_plane.kikimr_cluster.nodes:
335+
if node2_index > node1_index:
336+
args.append((self.close_ic_session_args(node1_index, node2_index), 0.1, 2, 30))
337+
return pool.map_async(run_with_sleep, args)
338+
339+
@yq_v1
340+
@pytest.mark.skip(reason="Should be tuned")
341+
def test_ic_disconnection(self, client):
342+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
343+
self.kikimr.control_plane.wait_bootstrap(node_index)
344+
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
345+
self.kikimr.control_plane.wait_discovery(node_index)
346+
347+
self.kikimr = kikimr
348+
self.init_topics("disconnection", partitions_count=2)
349+
input_topic_1 = "disconnection_i_1"
350+
input_topic_2 = "disconnection_i_2"
351+
create_stream(input_topic_1)
352+
create_stream(input_topic_2)
353+
354+
# Consumer and topics to create are written in ya.make file.
355+
sql = R'''
356+
PRAGMA dq.MaxTasksPerStage="42";
357+
358+
INSERT INTO myyds.`{output_topic}`
359+
SELECT (S1.Data || S2.Data) || ""
360+
FROM myyds.`{input_topic_1}` AS S1
361+
INNER JOIN (SELECT * FROM myyds.`{input_topic_2}`) AS S2
362+
ON S1.Data = S2.Data
363+
'''\
364+
.format(
365+
input_topic_1=input_topic_1,
366+
input_topic_2=input_topic_2,
367+
output_topic=self.output_topic,
368+
)
369+
370+
close_ic_sessions_future = self.start_close_ic_sessions_processes()
371+
372+
folder_id = "my_folder"
373+
# automatic query will not clean up metrics after failure
374+
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
375+
query_id = client.create_query("disconnected", sql, type=fq.QueryContent.QueryType.STREAMING, automatic=True).result.query_id
376+
automatic_id = "automatic_" + folder_id
377+
378+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
379+
380+
# Checkpointing must be finished
381+
deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900)
382+
while True:
383+
status = client.describe_query(query_id).result.query.meta.status
384+
assert status == fq.QueryMeta.RUNNING, "Unexpected status " + fq.QueryMeta.ComputeStatus.Name(status)
385+
completed = self.kikimr.control_plane.get_completed_checkpoints(automatic_id, False)
386+
if completed >= 5:
387+
break
388+
assert time.time() < deadline, "Completed: {}".format(completed)
389+
time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))
390+
391+
close_ic_sessions_future.wait()

0 commit comments

Comments
 (0)