Skip to content

YQ-2549 Move yds integration tests to github #1592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions ydb/tests/fq/kikimr/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pytest

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
from ydb.tests.tools.fq_runner.kikimr_utils import ExtensionPoint
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
from ydb.tests.tools.fq_runner.kikimr_utils import StatsModeExtension
from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr


@pytest.fixture
def stats_mode():
return ''


@pytest.fixture
def kikimr(request: pytest.FixtureRequest, yq_version: str, stats_mode: str):
kikimr_extensions = [DefaultConfigExtension(""),
YQv2Extension(yq_version),
ComputeExtension(),
StatsModeExtension(stats_mode)]
with start_kikimr(request, kikimr_extensions) as kikimr:
yield kikimr


class ManyRetriesConfigExtension(ExtensionPoint):
def __init__(self):
super().__init__()

def is_applicable(self, request):
return True

def apply_to_kikimr(self, request, kikimr):
kikimr.compute_plane.fq_config['control_plane_storage']['retry_policy_mapping'] = [
{
'status_code': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
'policy': {
'retry_count': 10000
}
}
]


@pytest.fixture
def kikimr_many_retries(request: pytest.FixtureRequest, yq_version: str):
kikimr_extensions = [DefaultConfigExtension(""),
ManyRetriesConfigExtension(),
YQv2Extension(yq_version),
ComputeExtension()]
with start_kikimr(request, kikimr_extensions) as kikimr:
yield kikimr


def create_client(kikimr, request):
return FederatedQueryClient(request.param["folder_id"] if request is not None else "my_folder",
streaming_over_kikimr=kikimr)


@pytest.fixture
def client(kikimr, request=None):
return create_client(kikimr, request)


@pytest.fixture
def client_many_retries(kikimr_many_retries, request=None):
return create_client(kikimr_many_retries, request)
26 changes: 26 additions & 0 deletions ydb/tests/fq/kikimr/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase


class TestBaseWithAbortingConfigParams(TestYdsBase):

@classmethod
def setup_class(cls):
kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
cls.streaming_over_kikimr = StreamingOverKikimr(kikimr_conf)
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "2s"
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {}
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 1
cls.streaming_over_kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s"
cls.streaming_over_kikimr.start_mvp_mock_server()
cls.streaming_over_kikimr.start()

@classmethod
def teardown_class(cls):
if hasattr(cls, "streaming_over_kikimr"):
cls.streaming_over_kikimr.stop_mvp_mock_server()
cls.streaming_over_kikimr.stop()
160 changes: 160 additions & 0 deletions ydb/tests/fq/kikimr/test_recovery_match_recognize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/usr/bin/env python
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# -*- coding: utf-8 -*-

import pytest
import logging
import os
import time

import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
import library.python.retry as retry
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
import ydb.public.api.protos.draft.fq_pb2 as fq


@pytest.fixture
def kikimr(request):
kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True, node_count=2)
kikimr = StreamingOverKikimr(kikimr_conf)
kikimr.start_mvp_mock_server()
kikimr.start()
yield kikimr
kikimr.stop_mvp_mock_server()
kikimr.stop()


class TestRecoveryMatchRecognize(TestYdsBase):

@classmethod
def setup_class(cls):
# for retry
cls.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1)

@retry.retry_intrusive
def get_graph_master_node_id(self, kikimr, query_id):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
if kikimr.control_plane.get_task_count(node_index, query_id) > 0:
return node_index
assert False, "No active graphs found"

def get_ca_count(self, kikimr, node_index):
result = kikimr.control_plane.get_sensors(node_index, "utils").find_sensor(
{"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"}
)
return result if result is not None else 0

def dump_workers(self, kikimr, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)):
deadline = time.time() + wait_time
while True:
wcs = 0
ccs = 0
list = []
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
cc = self.get_ca_count(kikimr, node_index)
wcs += wc
ccs += cc
list.append([node_index, wc, cc])
if wcs == worker_count and ccs == ca_count:
for [s, w, c] in list:
if w * 2 != c:
continue
for [s, w, c] in list:
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
return
if time.time() > deadline:
for [s, w, c] in list:
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count)

@yq_v1
@pytest.mark.parametrize("kikimr", [(None, None, None)], indirect=["kikimr"])
def test_program_state_recovery(self, kikimr, client, yq_version):

self.init_topics(f"pq_kikimr_streaming_{yq_version}")

sql = R'''
PRAGMA dq.MaxTasksPerStage="2";

pragma FeatureR010="prototype";
pragma config.flags("TimeOrderRecoverDelay", "-1000000");
pragma config.flags("TimeOrderRecoverAhead", "1000000");

INSERT INTO myyds.`{output_topic}`
SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow()))))
FROM (SELECT * FROM myyds.`{input_topic}`
WITH (
format=json_each_row,
SCHEMA
(
dt UINT64
)))
MATCH_RECOGNIZE(
ORDER BY CAST(dt as Timestamp)
MEASURES
LAST(ALL_TRUE.dt) as dt
ONE ROW PER MATCH
PATTERN ( ALL_TRUE )
DEFINE
ALL_TRUE as True)''' \
.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
)

client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)

master_node_index = self.get_graph_master_node_id(kikimr, query_id)
logging.debug("Master node {}".format(master_node_index))

messages1 = ['{"dt": 1696849942400002}', '{"dt": 1696849942000001}']
self.write_stream(messages1)

logging.debug("get_completed_checkpoints {}".format(kikimr.compute_plane.get_completed_checkpoints(query_id)))
kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
)

# restart node with CA
node_to_restart = None
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
if wc is not None:
if wc > 0 and node_index != master_node_index and node_to_restart is None:
node_to_restart = node_index
assert node_to_restart is not None, "Can't find any task on non master node"

logging.debug("Restart non-master node {}".format(node_to_restart))

kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.control_plane.wait_bootstrap(node_to_restart)

messages2 = [
'{"dt": 1696849942800000}',
'{"dt": 1696849943200003}',
'{"dt": 1696849943300003}',
'{"dt": 1696849943600003}',
'{"dt": 1696849943900003}'
]
self.write_stream(messages2)

assert client.get_query_status(query_id) == fq.QueryMeta.RUNNING

expected = ['{"dt":1696849942000001}', '{"dt":1696849942400002}', '{"dt":1696849942800000}']

read_data = self.read_stream(len(expected))
logging.info("Data was read: {}".format(read_data))

assert read_data == expected

client.abort_query(query_id)
client.wait_query(query_id)

self.dump_workers(kikimr, 0, 0)
Loading