|
| 1 | +#!/usr/bin/env python |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | + |
| 4 | +import os |
| 5 | +import pytest |
| 6 | +import time |
| 7 | + |
| 8 | +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase |
| 9 | +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr |
| 10 | +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig |
| 11 | +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient |
| 12 | + |
| 13 | +import ydb.public.api.protos.draft.fq_pb2 as fq |
| 14 | + |
| 15 | + |
| 16 | +class Param(object): |
| 17 | + def __init__( |
| 18 | + self, |
| 19 | + retry_limit=2, |
| 20 | + retry_period=20, |
| 21 | + task_lease_ttl=4, |
| 22 | + ping_period=2, |
| 23 | + ): |
| 24 | + self.retry_limit = retry_limit |
| 25 | + self.retry_period = retry_period |
| 26 | + self.task_lease_ttl = task_lease_ttl |
| 27 | + self.ping_period = ping_period |
| 28 | + |
| 29 | + |
| 30 | +@pytest.fixture |
| 31 | +def kikimr(request): |
| 32 | + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True) |
| 33 | + kikimr = StreamingOverKikimr(kikimr_conf) |
| 34 | + # control |
| 35 | + kikimr.control_plane.fq_config['control_plane_storage']['mapping'] = {"common_tenant_name": ["/compute"]} |
| 36 | + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {} |
| 37 | + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'][ |
| 38 | + 'retry_count' |
| 39 | + ] = request.param.retry_limit |
| 40 | + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_period'] = "{}s".format( |
| 41 | + request.param.retry_period |
| 42 | + ) |
| 43 | + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "{}s".format( |
| 44 | + request.param.task_lease_ttl |
| 45 | + ) |
| 46 | + # compute |
| 47 | + kikimr.compute_plane.fq_config['pinger']['ping_period'] = "{}s".format(request.param.ping_period) |
| 48 | + kikimr.start_mvp_mock_server() |
| 49 | + kikimr.start() |
| 50 | + yield kikimr |
| 51 | + kikimr.stop() |
| 52 | + kikimr.stop_mvp_mock_server() |
| 53 | + |
| 54 | + |
| 55 | +class TestRetry(TestYdsBase): |
| 56 | + # TODO: fix this place. We need to speed up the drain and use retry_limit=3, retry_period=20 |
| 57 | + @pytest.mark.parametrize( |
| 58 | + "kikimr", [Param(retry_limit=2, retry_period=600, task_lease_ttl=1, ping_period=0.5)], indirect=["kikimr"] |
| 59 | + ) |
| 60 | + def test_high_rate(self, kikimr): |
| 61 | + topic_name = "high_rate" |
| 62 | + connection = "high_rate" |
| 63 | + self.init_topics(topic_name) |
| 64 | + sql = R'''SELECT * FROM {connection}.`{input_topic}`;'''.format( |
| 65 | + input_topic=self.input_topic, connection=connection |
| 66 | + ) |
| 67 | + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) |
| 68 | + client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) |
| 69 | + query_id = client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id |
| 70 | + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) |
| 71 | + for _ in range(10): |
| 72 | + deadline = time.time() + 1 |
| 73 | + kikimr.compute_plane.stop() |
| 74 | + kikimr.compute_plane.start() |
| 75 | + kikimr.compute_plane.wait_bootstrap() |
| 76 | + if client.describe_query(query_id).result.query.meta.status == fq.QueryMeta.ABORTED_BY_SYSTEM: |
| 77 | + break |
| 78 | + delta = deadline - time.time() |
| 79 | + if delta > 0: |
| 80 | + time.sleep(delta) |
| 81 | + else: |
| 82 | + assert False, "Query was NOT aborted" |
0 commit comments