diff --git a/ydb/tests/fq/s3/conftest.py b/ydb/tests/fq/s3/conftest.py index 948a237d2707..786ca79343aa 100644 --- a/ydb/tests/fq/s3/conftest.py +++ b/ydb/tests/fq/s3/conftest.py @@ -27,6 +27,19 @@ S3_PID_FILE = "s3.pid" +class TestCounter: + def __init__(self, tests_count_limit, error_string): + self.tests_count_limit = tests_count_limit + self.error_string = error_string + self.number_tests = 0 + + def on_test_start(self): + self.number_tests += 1 + assert self.number_tests <= self.tests_count_limit, \ + f"{self.error_string} exceeded limit {self.number_tests} vs {self.tests_count_limit}, " \ + "this may lead timeouts on CI, please split this file" + + @pytest.fixture(scope="module") def mvp_external_ydb_endpoint(request) -> str: return request.param["endpoint"] if request is not None and hasattr(request, 'param') else None @@ -86,14 +99,21 @@ def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external @pytest.fixture(scope="module") -def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint): +def kikimr_starts_counter(): + return TestCounter(10, "Number kikimr restarts in one module") + + +@pytest.fixture(scope="module") +def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter): + kikimr_starts_counter.on_test_start() kikimr_extensions = get_kikimr_extensions(s3, YQV1_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint) with start_kikimr(kikimr_params, kikimr_extensions) as kikimr: yield kikimr @pytest.fixture(scope="module") -def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint): +def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter): + kikimr_starts_counter.on_test_start() kikimr_extensions = get_kikimr_extensions(s3, YQV2_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint) with start_kikimr(kikimr_params, kikimr_extensions) as kikimr: yield kikimr @@ -115,8 +135,14 @@ def kikimr(yq_version: str, kikimr_yqv1, kikimr_yqv2): return kikimr +@pytest.fixture(scope="module") +def tests_counter(): + return TestCounter(200, "Number tests in one module") + + @pytest.fixture -def client(kikimr, request=None): +def client(kikimr, tests_counter, request=None): + tests_counter.on_test_start() client = FederatedQueryClient( request.param["folder_id"] if request is not None else "my_folder", streaming_over_kikimr=kikimr ) @@ -128,8 +154,5 @@ def client(kikimr, request=None): @pytest.fixture def unique_prefix(request: pytest.FixtureRequest): - name_hash = hash(request.node.name) - if name_hash >= 0: - return f"p{name_hash}_" - else: - return f"n{-name_hash}_" + name_hash = abs(hash(request.node.name)) + return f"h{name_hash}_{request.function.__name__}" diff --git a/ydb/tests/fq/s3/test_bindings.py b/ydb/tests/fq/s3/test_bindings_0.py similarity index 55% rename from ydb/tests/fq/s3/test_bindings.py rename to ydb/tests/fq/s3/test_bindings_0.py index 9021153cb37c..3503c1facb8c 100644 --- a/ydb/tests/fq/s3/test_bindings.py +++ b/ydb/tests/fq/s3/test_bindings_0.py @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- import boto3 -import logging import pytest import ydb.public.api.protos.ydb_value_pb2 as ydb @@ -346,270 +345,3 @@ def test_name_uniqueness_constraint(self, kikimr, client: FederatedQueryClient, == "Connection with the same name already exists. Please choose another name" ) assert modify_binding_result.issues[0].severity == 1 - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("kikimr_settings", [{"bindings_mode": "BM_DROP_WITH_WARNING"}], indirect=True) - def test_s3_insert(self, kikimr, s3, client, yq_version, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("bindbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - kikimr.control_plane.wait_bootstrap(1) - connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id - - fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32)) - barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8)) - storage_binding_name = unique_prefix + "s3binding" - client.create_object_storage_binding( - name=storage_binding_name, - path="path1/", - format="csv_with_names", - connection_id=connection_id, - columns=[fooType, barType], - ) - - sql = fR''' - insert into bindings.`{storage_binding_name}` - select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]); - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - if yq_version == "v2": - issues = str(client.describe_query(query_id).result.query.issue) - assert ( - "message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon" - in issues - ) - assert "severity: 2" in issues - - sql = fR''' - select foo, bar from bindings.`{storage_binding_name}`; - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - if yq_version == "v2": - issues = str(client.describe_query(query_id).result.query.issue) - assert ( - "message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon" - in issues - ) - assert "severity: 2" in issues - - data = client.get_result_data(query_id) - result_set = data.result.result_set - assert len(result_set.columns) == 2 - assert result_set.columns[0].name == "foo" - assert result_set.columns[0].type.type_id == ydb.Type.INT32 - assert result_set.columns[1].name == "bar" - assert result_set.columns[1].type.type_id == ydb.Type.UTF8 - assert len(result_set.rows) == 2 - assert result_set.rows[0].items[0].int32_value == 123 - assert result_set.rows[0].items[1].text_value == 'xxx' - assert result_set.rows[1].items[0].int32_value == 456 - assert result_set.rows[1].items[1].text_value == 'yyy' - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("bindbucket") - bucket.create(ACL='public-read') - - kikimr.control_plane.wait_bootstrap(1) - connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id - - fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8)) - barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32)) - storage_binding_name = unique_prefix + "s3binding" - client.create_object_storage_binding( - name=storage_binding_name, - path="path2/", - format="csv_with_names", - connection_id=connection_id, - columns=[fooType, barType], - ) - - sql = fR''' - insert into bindings.`{storage_binding_name}` - select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]); - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED]) - - describe_result = client.describe_query(query_id).result - describe_string = "{}".format(describe_result) - assert "Type mismatch between schema type" in describe_string, describe_string - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_pg_binding(self, kikimr, s3, client, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("fbucket") - bucket.create(ACL='public-read') - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = R'''Fruit,Price -Banana,3 -Apple,2 -Pear,15''' - s3_client.put_object(Body=fruits, Bucket='fbucket', Key='a/fruits.csv', ContentType='text/plain') - - kikimr.control_plane.wait_bootstrap(1) - connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket") - - fruitType = ydb.Column(name="Fruit", type=ydb.Type(pg_type=ydb.PgType(oid=25))) - priceType = ydb.Column(name="Price", type=ydb.Type(pg_type=ydb.PgType(oid=23))) - storage_binding_name = unique_prefix + "my_binding" - client.create_object_storage_binding( - name=storage_binding_name, - path="a/", - format="csv_with_names", - connection_id=connection_response.result.connection_id, - columns=[fruitType, priceType], - format_setting={"file_pattern": "*.csv"}, - ) - - sql = fR''' - SELECT * - FROM bindings.{storage_binding_name}; - ''' - - query_id = client.create_query( - "simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=True - ).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 2 - assert result_set.columns[0].name == "Fruit" - assert result_set.columns[0].type.pg_type.oid == 25 - assert result_set.columns[1].name == "Price" - assert result_set.columns[1].type.pg_type.oid == 23 - assert len(result_set.rows) == 3 - assert result_set.rows[0].items[0].text_value == "Banana" - assert result_set.rows[0].items[1].text_value == "3" - assert result_set.rows[1].items[0].text_value == "Apple" - assert result_set.rows[1].items[1].text_value == "2" - assert result_set.rows[2].items[0].text_value == "Pear" - assert result_set.rows[2].items[1].text_value == "15" - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("pg_syntax", [False, True], ids=["yql_syntax", "pg_syntax"]) - def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("count_for_pg_binding") - bucket.create(ACL='public-read') - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - row = R'''{"a": 42, "b": 3.14, "c": "text"}''' - s3_client.put_object(Body=row, Bucket='count_for_pg_binding', Key='abc.json', ContentType='text/json') - - kikimr.control_plane.wait_bootstrap(1) - connection_response = client.create_storage_connection(unique_prefix + "abc", "count_for_pg_binding") - - aType = ydb.Column(name="a", type=ydb.Type(pg_type=ydb.PgType(oid=23))) - bType = ydb.Column(name="b", type=ydb.Type(pg_type=ydb.PgType(oid=701))) - cType = ydb.Column(name="c", type=ydb.Type(pg_type=ydb.PgType(oid=25))) - storage_binding_name = unique_prefix + "binding_for_count" - client.create_object_storage_binding( - name=storage_binding_name, - path="abc.json", - format="json_each_row", - connection_id=connection_response.result.connection_id, - columns=[aType, bType, cType], - ) - - sql = fR''' - SELECT COUNT(*) - FROM bindings.{storage_binding_name}; - ''' - - query_id = client.create_query( - "simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=pg_syntax - ).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 1 - assert len(result_set.rows) == 1 - if pg_syntax: - assert result_set.columns[0].type.pg_type.oid == 20 - assert result_set.rows[0].items[0].text_value == "1" - else: - assert result_set.columns[0].type.type_id == ydb.Type.UINT64 - assert result_set.rows[0].items[0].uint64_value == 1 - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_ast_in_failed_query_compilation(self, kikimr, s3, client, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("bindbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - kikimr.control_plane.wait_bootstrap(1) - connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id - - data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) - storage_binding_name = unique_prefix + "s3binding" - client.create_object_storage_binding( - name=storage_binding_name, path="/", format="raw", connection_id=connection_id, columns=[data_column] - ) - - sql = fR''' - SELECT some_unknown_column FROM bindings.`{storage_binding_name}`; - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.FAILED) - - ast = client.describe_query(query_id).result.query.ast.data - assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast" - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix): - kikimr.control_plane.wait_bootstrap(1) - connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket") - binding_response = client.create_object_storage_binding( - name=unique_prefix + "my_binding", - path="fruits.csv", - format="raw", - connection_id=connection_response.result.connection_id, - columns=[], - check_issues=False, - ) - assert "Only one column in schema supported in raw format" in str(binding_response.issues), str( - binding_response.issues - ) diff --git a/ydb/tests/fq/s3/test_bindings_1.py b/ydb/tests/fq/s3/test_bindings_1.py new file mode 100644 index 000000000000..cb4cbb8aaf5f --- /dev/null +++ b/ydb/tests/fq/s3/test_bindings_1.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import boto3 +import logging +import pytest + +import ydb.public.api.protos.ydb_value_pb2 as ydb +import ydb.public.api.protos.draft.fq_pb2 as fq +from ydb.tests.tools.fq_runner.kikimr_utils import yq_all + + +class TestBindings: + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("kikimr_settings", [{"bindings_mode": "BM_DROP_WITH_WARNING"}], indirect=True) + def test_s3_insert(self, kikimr, s3, client, yq_version, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("bindbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + kikimr.control_plane.wait_bootstrap(1) + connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id + + fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32)) + barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8)) + storage_binding_name = unique_prefix + "s3binding" + client.create_object_storage_binding( + name=storage_binding_name, + path="path1/", + format="csv_with_names", + connection_id=connection_id, + columns=[fooType, barType], + ) + + sql = fR''' + insert into bindings.`{storage_binding_name}` + select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + if yq_version == "v2": + issues = str(client.describe_query(query_id).result.query.issue) + assert ( + "message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon" + in issues + ) + assert "severity: 2" in issues + + sql = fR''' + select foo, bar from bindings.`{storage_binding_name}`; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + if yq_version == "v2": + issues = str(client.describe_query(query_id).result.query.issue) + assert ( + "message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon" + in issues + ) + assert "severity: 2" in issues + + data = client.get_result_data(query_id) + result_set = data.result.result_set + assert len(result_set.columns) == 2 + assert result_set.columns[0].name == "foo" + assert result_set.columns[0].type.type_id == ydb.Type.INT32 + assert result_set.columns[1].name == "bar" + assert result_set.columns[1].type.type_id == ydb.Type.UTF8 + assert len(result_set.rows) == 2 + assert result_set.rows[0].items[0].int32_value == 123 + assert result_set.rows[0].items[1].text_value == 'xxx' + assert result_set.rows[1].items[0].int32_value == 456 + assert result_set.rows[1].items[1].text_value == 'yyy' + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("bindbucket") + bucket.create(ACL='public-read') + + kikimr.control_plane.wait_bootstrap(1) + connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id + + fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8)) + barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32)) + storage_binding_name = unique_prefix + "s3binding" + client.create_object_storage_binding( + name=storage_binding_name, + path="path2/", + format="csv_with_names", + connection_id=connection_id, + columns=[fooType, barType], + ) + + sql = fR''' + insert into bindings.`{storage_binding_name}` + select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED]) + + describe_result = client.describe_query(query_id).result + describe_string = "{}".format(describe_result) + assert "Type mismatch between schema type" in describe_string, describe_string + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_pg_binding(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = R'''Fruit,Price +Banana,3 +Apple,2 +Pear,15''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='a/fruits.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket") + + fruitType = ydb.Column(name="Fruit", type=ydb.Type(pg_type=ydb.PgType(oid=25))) + priceType = ydb.Column(name="Price", type=ydb.Type(pg_type=ydb.PgType(oid=23))) + storage_binding_name = unique_prefix + "my_binding" + client.create_object_storage_binding( + name=storage_binding_name, + path="a/", + format="csv_with_names", + connection_id=connection_response.result.connection_id, + columns=[fruitType, priceType], + format_setting={"file_pattern": "*.csv"}, + ) + + sql = fR''' + SELECT * + FROM bindings.{storage_binding_name}; + ''' + + query_id = client.create_query( + "simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=True + ).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 2 + assert result_set.columns[0].name == "Fruit" + assert result_set.columns[0].type.pg_type.oid == 25 + assert result_set.columns[1].name == "Price" + assert result_set.columns[1].type.pg_type.oid == 23 + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].text_value == "Banana" + assert result_set.rows[0].items[1].text_value == "3" + assert result_set.rows[1].items[0].text_value == "Apple" + assert result_set.rows[1].items[1].text_value == "2" + assert result_set.rows[2].items[0].text_value == "Pear" + assert result_set.rows[2].items[1].text_value == "15" + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("pg_syntax", [False, True], ids=["yql_syntax", "pg_syntax"]) + def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("count_for_pg_binding") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + row = R'''{"a": 42, "b": 3.14, "c": "text"}''' + s3_client.put_object(Body=row, Bucket='count_for_pg_binding', Key='abc.json', ContentType='text/json') + + kikimr.control_plane.wait_bootstrap(1) + connection_response = client.create_storage_connection(unique_prefix + "abc", "count_for_pg_binding") + + aType = ydb.Column(name="a", type=ydb.Type(pg_type=ydb.PgType(oid=23))) + bType = ydb.Column(name="b", type=ydb.Type(pg_type=ydb.PgType(oid=701))) + cType = ydb.Column(name="c", type=ydb.Type(pg_type=ydb.PgType(oid=25))) + storage_binding_name = unique_prefix + "binding_for_count" + client.create_object_storage_binding( + name=storage_binding_name, + path="abc.json", + format="json_each_row", + connection_id=connection_response.result.connection_id, + columns=[aType, bType, cType], + ) + + sql = fR''' + SELECT COUNT(*) + FROM bindings.{storage_binding_name}; + ''' + + query_id = client.create_query( + "simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=pg_syntax + ).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert len(result_set.rows) == 1 + if pg_syntax: + assert result_set.columns[0].type.pg_type.oid == 20 + assert result_set.rows[0].items[0].text_value == "1" + else: + assert result_set.columns[0].type.type_id == ydb.Type.UINT64 + assert result_set.rows[0].items[0].uint64_value == 1 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_ast_in_failed_query_compilation(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("bindbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + kikimr.control_plane.wait_bootstrap(1) + connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id + + data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) + storage_binding_name = unique_prefix + "s3binding" + client.create_object_storage_binding( + name=storage_binding_name, path="/", format="raw", connection_id=connection_id, columns=[data_column] + ) + + sql = fR''' + SELECT some_unknown_column FROM bindings.`{storage_binding_name}`; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + + ast = client.describe_query(query_id).result.query.ast.data + assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast" + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix): + kikimr.control_plane.wait_bootstrap(1) + connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket") + binding_response = client.create_object_storage_binding( + name=unique_prefix + "my_binding", + path="fruits.csv", + format="raw", + connection_id=connection_response.result.connection_id, + columns=[], + check_issues=False, + ) + assert "Only one column in schema supported in raw format" in str(binding_response.issues), str( + binding_response.issues + ) diff --git a/ydb/tests/fq/s3/test_explicit_partitioning.py b/ydb/tests/fq/s3/test_explicit_partitioning_0.py similarity index 60% rename from ydb/tests/fq/s3/test_explicit_partitioning.py rename to ydb/tests/fq/s3/test_explicit_partitioning_0.py index 14759fc51d41..57ed5f7b2d22 100644 --- a/ydb/tests/fq/s3/test_explicit_partitioning.py +++ b/ydb/tests/fq/s3/test_explicit_partitioning_0.py @@ -865,564 +865,3 @@ def test_projection_enum_type_invalid_validation( assert "Projection column \\\"year\\\" has invalid type" in str(describe_result.query.issue), str( describe_result.query.issue ) - - @yq_all - @pytest.mark.parametrize( - "client, column_type, is_correct", - [ - ({"folder_id": "my_folder1"}, "year Int32", False), - ({"folder_id": "my_folder2"}, "year Int32 NOT NULL", False), - ({"folder_id": "my_folder3"}, "year Uint32", False), - ({"folder_id": "my_folder4"}, "year Uint32 NOT NULL", True), - ({"folder_id": "my_folder5"}, "year Int64", False), - ({"folder_id": "my_folder6"}, "year Int64 NOT NULL", False), - ({"folder_id": "my_folder7"}, "year Uint64", False), - ({"folder_id": "my_folder8"}, "year Uint64 NOT NULL", False), - ({"folder_id": "my_folder9"}, "year String NOT NULL", True), - ({"folder_id": "my_folder10"}, "year String", False), - ({"folder_id": "my_folder11"}, "year Utf8", False), - ({"folder_id": "my_folder12"}, "year Utf8 NOT NULL", True), - ({"folder_id": "my_folder13"}, "year Date", False), - ({"folder_id": "my_folder14"}, "year Date NOT NULL", True), - ({"folder_id": "my_folder15"}, "year Datetime", False), - ({"folder_id": "my_folder16"}, "year Datetime NOT NULL", True), - ], - indirect=["client"], - ) - @pytest.mark.parametrize("runtime_listing", ["false", "true"]) - def test_projection_date_type_validation( - self, kikimr, s3, client, column_type, is_correct, runtime_listing, unique_prefix - ): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("test_projection_date_type_invalid_validation") - bucket.create(ACL='public-read') - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = R'''Fruit,Price,Duration -Banana,3,100''' - s3_client.put_object( - Body=fruits, - Bucket='test_projection_date_type_invalid_validation', - Key='2022-03-05/fruits.csv', - ContentType='text/plain', - ) - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "fruitbucket" - client.create_storage_connection(storage_connection_name, "test_projection_date_type_invalid_validation") - - sql = ( - f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - ''' - + R''' - $projection = - @@ - { - "projection.enabled" : true, - - "projection.year.type" : "date", - "projection.year.min" : "2022-01-01", - "projection.year.max" : "2022-01-01", - "projection.year.format" : "%Y", - "projection.year.interval" : "1", - "projection.year.unit" : "DAYS", - - "storage.location.template" : "${year}-03-05" - } - @@; - ''' - + f''' - SELECT * - FROM `{storage_connection_name}`.`/` WITH - ( - format=csv_with_names, - schema= - ( - Fruit String NOT NULL, - {column_type} - ), - partitioned_by=(year), - projection=$projection - ) - ''' - ) - - query_id = client.create_query("simple", sql).result.query_id - if is_correct: - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 2 - assert result_set.columns[0].name == "Fruit" - assert result_set.columns[0].type.type_id == ydb.Type.STRING - assert len(result_set.rows) == 1 - assert result_set.rows[0].items[0].bytes_value == b"Banana" - else: - client.wait_query_status(query_id, fq.QueryMeta.FAILED) - describe_result = client.describe_query(query_id).result - logging.info("AST: {}".format(describe_result.query.ast.data)) - assert "Projection column \\\"year\\\" has invalid type" in str(describe_result.query.issue), str( - describe_result.query.issue - ) - - @yq_all - @pytest.mark.parametrize( - "client, column_type, is_correct", - [ - ({"folder_id": "my_folder1"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32), True), - ({"folder_id": "my_folder2"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32), True), - ({"folder_id": "my_folder3"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64), True), - ({"folder_id": "my_folder4"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE), False), - ({"folder_id": "my_folder5"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64), True), - ({"folder_id": "my_folder6"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING), True), - ({"folder_id": "my_folder7"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8), True), - ( - {"folder_id": "my_folder8"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))), - False, - ), - ( - {"folder_id": "my_folder9"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32))), - False, - ), - ( - {"folder_id": "my_folder10"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64))), - False, - ), - ( - {"folder_id": "my_folder11"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE))), - False, - ), - ( - {"folder_id": "my_folder12"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64))), - False, - ), - ( - {"folder_id": "my_folder13"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))), - False, - ), - ( - {"folder_id": "my_folder14"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))), - False, - ), - ], - indirect=["client"], - ) - def test_binding_projection_integer_type_validation( - self, kikimr, s3, client, column_type, is_correct, unique_prefix - ): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("test_binding_projection_integer_type_validation") - bucket.create(ACL='public-read') - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = R'''Fruit,Price,Duration -Banana,3,100''' - s3_client.put_object( - Body=fruits, - Bucket='test_binding_projection_integer_type_validation', - Key='2022-03-05/fruits.csv', - ContentType='text/plain', - ) - kikimr.control_plane.wait_bootstrap(1) - connection_response = client.create_storage_connection( - unique_prefix + "fruitbucket", "test_binding_projection_integer_type_validation" - ) - - year = ydb.Column(name="year", type=column_type) - fruitType = ydb.Column(name="Fruit", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) - binding_response = client.create_object_storage_binding( - name=unique_prefix + "my_binding", - path="/", - format="csv_with_names", - connection_id=connection_response.result.connection_id, - columns=[year, fruitType], - projection={ - "projection.enabled": "true", - "projection.year.type": "integer", - "projection.year.min": "2022", - "projection.year.max": "2022", - "storage.location.template": "${year}-03-05", - }, - partitioned_by=["year"], - check_issues=is_correct, - ) - if not is_correct: - assert "Column \\\"year\\\" from projection does not support" in str(binding_response.issues) - - @yq_all - @pytest.mark.parametrize( - "client, column_type, is_correct", - [ - ({"folder_id": "my_folder1"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32), False), - ({"folder_id": "my_folder2"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32), False), - ({"folder_id": "my_folder3"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64), False), - ({"folder_id": "my_folder4"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE), False), - ({"folder_id": "my_folder5"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64), False), - ({"folder_id": "my_folder6"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING), True), - ({"folder_id": "my_folder7"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8), False), - ( - {"folder_id": "my_folder8"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))), - False, - ), - ( - {"folder_id": "my_folder9"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32))), - False, - ), - ( - {"folder_id": "my_folder10"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64))), - False, - ), - ( - {"folder_id": "my_folder11"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE))), - False, - ), - ( - {"folder_id": "my_folder12"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64))), - False, - ), - ( - {"folder_id": "my_folder13"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))), - False, - ), - ( - {"folder_id": "my_folder14"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))), - False, - ), - ], - indirect=["client"], - ) - def test_binding_projection_enum_type_validation(self, kikimr, s3, client, column_type, is_correct, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("test_binding_projection_enum_type_validation") - bucket.create(ACL='public-read') - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = R'''Fruit,Price,Duration -Banana,3,100''' - s3_client.put_object( - Body=fruits, - Bucket='test_binding_projection_enum_type_validation', - Key='2022-03-05/fruits.csv', - ContentType='text/plain', - ) - kikimr.control_plane.wait_bootstrap(1) - connection_response = client.create_storage_connection( - unique_prefix + "fruitbucket", "test_binding_projection_enum_type_validation" - ) - - year = ydb.Column(name="year", type=column_type) - fruitType = ydb.Column(name="Fruit", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) - binding_response = client.create_object_storage_binding( - name=unique_prefix + "my_binding", - path="/", - format="csv_with_names", - connection_id=connection_response.result.connection_id, - columns=[year, fruitType], - projection={ - "projection.enabled": "true", - "projection.year.type": "enum", - "projection.year.values": "2022", - "storage.location.template": "${year}-03-05", - }, - partitioned_by=["year"], - check_issues=is_correct, - ) - if not is_correct: - assert "Column \\\"year\\\" from projection does not support" in str(binding_response.issues) - - @yq_all - @pytest.mark.parametrize( - "client, column_type, is_correct", - [ - ({"folder_id": "my_folder1"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32), False), - ({"folder_id": "my_folder2"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32), True), - ({"folder_id": "my_folder3"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64), False), - ({"folder_id": "my_folder4"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE), True), - ({"folder_id": "my_folder5"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATETIME), True), - ({"folder_id": "my_folder6"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64), False), - ({"folder_id": "my_folder7"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING), True), - ({"folder_id": "my_folder8"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8), True), - ( - {"folder_id": "my_folder9"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))), - False, - ), - ( - {"folder_id": "my_folder10"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32))), - False, - ), - ( - {"folder_id": "my_folder11"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64))), - False, - ), - ( - {"folder_id": "my_folder12"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE))), - False, - ), - ( - {"folder_id": "my_folder13"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATETIME))), - False, - ), - ( - {"folder_id": "my_folder14"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64))), - False, - ), - ( - {"folder_id": "my_folder15"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))), - False, - ), - ( - {"folder_id": "my_folder16"}, - ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))), - False, - ), - ], - indirect=["client"], - ) - def test_binding_projection_date_type_validation(self, kikimr, s3, client, column_type, is_correct, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("test_binding_projection_date_type_validation") - bucket.create(ACL='public-read') - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = R'''Fruit,Price,Duration -Banana,3,100''' - s3_client.put_object( - Body=fruits, - Bucket='test_binding_projection_date_type_validation', - Key='2022-03-05/fruits.csv', - ContentType='text/plain', - ) - kikimr.control_plane.wait_bootstrap(1) - connection_response = client.create_storage_connection( - unique_prefix + "fruitbucket", "test_binding_projection_date_type_validation" - ) - - year = ydb.Column(name="year", type=column_type) - fruitType = ydb.Column(name="Fruit", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) - binding_response = client.create_object_storage_binding( - name=unique_prefix + "my_binding", - path="/", - format="csv_with_names", - connection_id=connection_response.result.connection_id, - columns=[year, fruitType], - projection={ - "projection.enabled": "true", - "projection.year.type": "date", - "projection.year.min": "2022-01-01", - "projection.year.max": "2022-01-01", - "projection.year.format": "%Y", - "projection.year.interval": "1", - "projection.year.unit": "DAYS", - "storage.location.template": "${year}-03-05", - }, - partitioned_by=["year"], - check_issues=is_correct, - ) - if not is_correct: - assert "Column \\\"year\\\" from projection does not support" in str(binding_response.issues) - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("runtime_listing", ["false", "true"]) - def test_raw_format(self, kikimr, s3, client, runtime_listing, yq_version, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("raw_bucket") - bucket.create(ACL='public-read') - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - s3_client.put_object( - Body='text', - Bucket='raw_bucket', - Key='raw_format/year=2023/month=01/day=14/file1.txt', - ContentType='text/plain', - ) - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "rawbucket" - client.create_storage_connection(storage_connection_name, "raw_bucket") - - sql = ( - f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - ''' - + R''' - $projection = @@ { - "projection.enabled" : "true", - "storage.location.template" : "/${timestamp}", - "projection.timestamp.type" : "date", - "projection.timestamp.min" : "2023-01-14", - "projection.timestamp.max" : "2023-01-14", - "projection.timestamp.interval" : "1", - "projection.timestamp.format" : "year=%Y/month=%m/day=%d", - "projection.timestamp.unit" : "DAYS" - } - @@; - ''' - + fR''' - SELECT - data, - timestamp - FROM - `{storage_connection_name}`.`raw_format` - WITH - ( - format="raw", - schema=( - `data` String NOT NULL, - `timestamp` Date NOT NULL - ), - partitioned_by=(`timestamp`), - projection=$projection - ) - ''' - ) - - # temporary fix for dynamic listing - if yq_version == "v1": - sql = 'pragma dq.MaxTasksPerStage="10"; ' + sql - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - describe_result = client.describe_query(query_id).result - logging.info("AST: {}".format(describe_result.query.ast.data)) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - - assert len(result_set.columns) == 2 - assert result_set.columns[0].name == "data" - assert result_set.columns[0].type.type_id == ydb.Type.STRING - assert result_set.columns[1].name == "timestamp" - assert result_set.columns[1].type.type_id == ydb.Type.DATE - assert len(result_set.rows) == 1 - assert result_set.rows[0].items[0].bytes_value == b"text" - assert result_set.rows[0].items[1].uint32_value == 19371 - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("runtime_listing", ["false", "true"]) - def test_parquet(self, kikimr, s3, client, runtime_listing, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("parquets") - bucket.create(ACL='public-read-write') - bucket.objects.all().delete() - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "pb" - client.create_storage_connection(storage_connection_name, "parquets") - - sql = fR''' - insert into `{storage_connection_name}`.`part/x=1/` with (format="parquet") - select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]); - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - sql = fR''' - insert into `{storage_connection_name}`.`part/x=2/` with (format="parquet") - select * from AS_TABLE([<|foo:234, bar:"zzz"u|>,<|foo:567, bar:"ttt"u|>]); - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - sql = f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - - SELECT foo, bar, x FROM `{storage_connection_name}`.`part/` - WITH - ( - format="parquet", - schema=( - foo Int NOT NULL, - bar String NOT NULL, - x Int NOT NULL, - ), - partitioned_by=(`x`) - ) - ORDER BY foo - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 3 - assert result_set.columns[0].name == "foo" - print(str(result_set.columns[0])) - assert result_set.columns[0].type.type_id == ydb.Type.INT32 - assert result_set.columns[1].name == "bar" - assert result_set.columns[1].type.type_id == ydb.Type.STRING - assert result_set.columns[2].name == "x" - assert result_set.columns[2].type.type_id == ydb.Type.INT32 - assert len(result_set.rows) == 4 - assert result_set.rows[0].items[0].int32_value == 123 - assert result_set.rows[1].items[0].int32_value == 234 - assert result_set.rows[2].items[0].int32_value == 456 - assert result_set.rows[3].items[0].int32_value == 567 - assert result_set.rows[0].items[1].bytes_value == b"xxx" - assert result_set.rows[1].items[1].bytes_value == b"zzz" - assert result_set.rows[2].items[1].bytes_value == b"yyy" - assert result_set.rows[3].items[1].bytes_value == b"ttt" - assert result_set.rows[0].items[2].int32_value == 1 - assert result_set.rows[1].items[2].int32_value == 2 - assert result_set.rows[2].items[2].int32_value == 1 - assert result_set.rows[3].items[2].int32_value == 2 diff --git a/ydb/tests/fq/s3/test_explicit_partitioning_1.py b/ydb/tests/fq/s3/test_explicit_partitioning_1.py new file mode 100644 index 000000000000..ac320bbf34ee --- /dev/null +++ b/ydb/tests/fq/s3/test_explicit_partitioning_1.py @@ -0,0 +1,576 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import boto3 +import logging + +import pytest + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import ydb.public.api.protos.ydb_value_pb2 as ydb +import ydb.public.api.protos.draft.fq_pb2 as fq +from ydb.tests.tools.fq_runner.kikimr_utils import yq_all + + +class TestS3(TestYdsBase): + @yq_all + @pytest.mark.parametrize( + "client, column_type, is_correct", + [ + ({"folder_id": "my_folder1"}, "year Int32", False), + ({"folder_id": "my_folder2"}, "year Int32 NOT NULL", False), + ({"folder_id": "my_folder3"}, "year Uint32", False), + ({"folder_id": "my_folder4"}, "year Uint32 NOT NULL", True), + ({"folder_id": "my_folder5"}, "year Int64", False), + ({"folder_id": "my_folder6"}, "year Int64 NOT NULL", False), + ({"folder_id": "my_folder7"}, "year Uint64", False), + ({"folder_id": "my_folder8"}, "year Uint64 NOT NULL", False), + ({"folder_id": "my_folder9"}, "year String NOT NULL", True), + ({"folder_id": "my_folder10"}, "year String", False), + ({"folder_id": "my_folder11"}, "year Utf8", False), + ({"folder_id": "my_folder12"}, "year Utf8 NOT NULL", True), + ({"folder_id": "my_folder13"}, "year Date", False), + ({"folder_id": "my_folder14"}, "year Date NOT NULL", True), + ({"folder_id": "my_folder15"}, "year Datetime", False), + ({"folder_id": "my_folder16"}, "year Datetime NOT NULL", True), + ], + indirect=["client"], + ) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + def test_projection_date_type_validation( + self, kikimr, s3, client, column_type, is_correct, runtime_listing, unique_prefix + ): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("test_projection_date_type_invalid_validation") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = R'''Fruit,Price,Duration +Banana,3,100''' + s3_client.put_object( + Body=fruits, + Bucket='test_projection_date_type_invalid_validation', + Key='2022-03-05/fruits.csv', + ContentType='text/plain', + ) + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "test_projection_date_type_invalid_validation") + + sql = ( + f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + ''' + + R''' + $projection = + @@ + { + "projection.enabled" : true, + + "projection.year.type" : "date", + "projection.year.min" : "2022-01-01", + "projection.year.max" : "2022-01-01", + "projection.year.format" : "%Y", + "projection.year.interval" : "1", + "projection.year.unit" : "DAYS", + + "storage.location.template" : "${year}-03-05" + } + @@; + ''' + + f''' + SELECT * + FROM `{storage_connection_name}`.`/` WITH + ( + format=csv_with_names, + schema= + ( + Fruit String NOT NULL, + {column_type} + ), + partitioned_by=(year), + projection=$projection + ) + ''' + ) + + query_id = client.create_query("simple", sql).result.query_id + if is_correct: + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 2 + assert result_set.columns[0].name == "Fruit" + assert result_set.columns[0].type.type_id == ydb.Type.STRING + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].bytes_value == b"Banana" + else: + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + logging.info("AST: {}".format(describe_result.query.ast.data)) + assert "Projection column \\\"year\\\" has invalid type" in str(describe_result.query.issue), str( + describe_result.query.issue + ) + + @yq_all + @pytest.mark.parametrize( + "client, column_type, is_correct", + [ + ({"folder_id": "my_folder1"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32), True), + ({"folder_id": "my_folder2"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32), True), + ({"folder_id": "my_folder3"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64), True), + ({"folder_id": "my_folder4"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE), False), + ({"folder_id": "my_folder5"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64), True), + ({"folder_id": "my_folder6"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING), True), + ({"folder_id": "my_folder7"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8), True), + ( + {"folder_id": "my_folder8"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))), + False, + ), + ( + {"folder_id": "my_folder9"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32))), + False, + ), + ( + {"folder_id": "my_folder10"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64))), + False, + ), + ( + {"folder_id": "my_folder11"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE))), + False, + ), + ( + {"folder_id": "my_folder12"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64))), + False, + ), + ( + {"folder_id": "my_folder13"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))), + False, + ), + ( + {"folder_id": "my_folder14"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))), + False, + ), + ], + indirect=["client"], + ) + def test_binding_projection_integer_type_validation( + self, kikimr, s3, client, column_type, is_correct, unique_prefix + ): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("test_binding_projection_integer_type_validation") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = R'''Fruit,Price,Duration +Banana,3,100''' + s3_client.put_object( + Body=fruits, + Bucket='test_binding_projection_integer_type_validation', + Key='2022-03-05/fruits.csv', + ContentType='text/plain', + ) + kikimr.control_plane.wait_bootstrap(1) + connection_response = client.create_storage_connection( + unique_prefix + "fruitbucket", "test_binding_projection_integer_type_validation" + ) + + year = ydb.Column(name="year", type=column_type) + fruitType = ydb.Column(name="Fruit", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) + binding_response = client.create_object_storage_binding( + name=unique_prefix + "my_binding", + path="/", + format="csv_with_names", + connection_id=connection_response.result.connection_id, + columns=[year, fruitType], + projection={ + "projection.enabled": "true", + "projection.year.type": "integer", + "projection.year.min": "2022", + "projection.year.max": "2022", + "storage.location.template": "${year}-03-05", + }, + partitioned_by=["year"], + check_issues=is_correct, + ) + if not is_correct: + assert "Column \\\"year\\\" from projection does not support" in str(binding_response.issues) + + @yq_all + @pytest.mark.parametrize( + "client, column_type, is_correct", + [ + ({"folder_id": "my_folder1"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32), False), + ({"folder_id": "my_folder2"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32), False), + ({"folder_id": "my_folder3"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64), False), + ({"folder_id": "my_folder4"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE), False), + ({"folder_id": "my_folder5"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64), False), + ({"folder_id": "my_folder6"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING), True), + ({"folder_id": "my_folder7"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8), False), + ( + {"folder_id": "my_folder8"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))), + False, + ), + ( + {"folder_id": "my_folder9"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32))), + False, + ), + ( + {"folder_id": "my_folder10"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64))), + False, + ), + ( + {"folder_id": "my_folder11"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE))), + False, + ), + ( + {"folder_id": "my_folder12"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64))), + False, + ), + ( + {"folder_id": "my_folder13"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))), + False, + ), + ( + {"folder_id": "my_folder14"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))), + False, + ), + ], + indirect=["client"], + ) + def test_binding_projection_enum_type_validation(self, kikimr, s3, client, column_type, is_correct, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("test_binding_projection_enum_type_validation") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = R'''Fruit,Price,Duration +Banana,3,100''' + s3_client.put_object( + Body=fruits, + Bucket='test_binding_projection_enum_type_validation', + Key='2022-03-05/fruits.csv', + ContentType='text/plain', + ) + kikimr.control_plane.wait_bootstrap(1) + connection_response = client.create_storage_connection( + unique_prefix + "fruitbucket", "test_binding_projection_enum_type_validation" + ) + + year = ydb.Column(name="year", type=column_type) + fruitType = ydb.Column(name="Fruit", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) + binding_response = client.create_object_storage_binding( + name=unique_prefix + "my_binding", + path="/", + format="csv_with_names", + connection_id=connection_response.result.connection_id, + columns=[year, fruitType], + projection={ + "projection.enabled": "true", + "projection.year.type": "enum", + "projection.year.values": "2022", + "storage.location.template": "${year}-03-05", + }, + partitioned_by=["year"], + check_issues=is_correct, + ) + if not is_correct: + assert "Column \\\"year\\\" from projection does not support" in str(binding_response.issues) + + @yq_all + @pytest.mark.parametrize( + "client, column_type, is_correct", + [ + ({"folder_id": "my_folder1"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32), False), + ({"folder_id": "my_folder2"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32), True), + ({"folder_id": "my_folder3"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64), False), + ({"folder_id": "my_folder4"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE), True), + ({"folder_id": "my_folder5"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATETIME), True), + ({"folder_id": "my_folder6"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64), False), + ({"folder_id": "my_folder7"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING), True), + ({"folder_id": "my_folder8"}, ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8), True), + ( + {"folder_id": "my_folder9"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))), + False, + ), + ( + {"folder_id": "my_folder10"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT32))), + False, + ), + ( + {"folder_id": "my_folder11"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UINT64))), + False, + ), + ( + {"folder_id": "my_folder12"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATE))), + False, + ), + ( + {"folder_id": "my_folder13"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.DATETIME))), + False, + ), + ( + {"folder_id": "my_folder14"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT64))), + False, + ), + ( + {"folder_id": "my_folder15"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))), + False, + ), + ( + {"folder_id": "my_folder16"}, + ydb.Type(optional_type=ydb.OptionalType(item=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))), + False, + ), + ], + indirect=["client"], + ) + def test_binding_projection_date_type_validation(self, kikimr, s3, client, column_type, is_correct, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("test_binding_projection_date_type_validation") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = R'''Fruit,Price,Duration +Banana,3,100''' + s3_client.put_object( + Body=fruits, + Bucket='test_binding_projection_date_type_validation', + Key='2022-03-05/fruits.csv', + ContentType='text/plain', + ) + kikimr.control_plane.wait_bootstrap(1) + connection_response = client.create_storage_connection( + unique_prefix + "fruitbucket", "test_binding_projection_date_type_validation" + ) + + year = ydb.Column(name="year", type=column_type) + fruitType = ydb.Column(name="Fruit", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) + binding_response = client.create_object_storage_binding( + name=unique_prefix + "my_binding", + path="/", + format="csv_with_names", + connection_id=connection_response.result.connection_id, + columns=[year, fruitType], + projection={ + "projection.enabled": "true", + "projection.year.type": "date", + "projection.year.min": "2022-01-01", + "projection.year.max": "2022-01-01", + "projection.year.format": "%Y", + "projection.year.interval": "1", + "projection.year.unit": "DAYS", + "storage.location.template": "${year}-03-05", + }, + partitioned_by=["year"], + check_issues=is_correct, + ) + if not is_correct: + assert "Column \\\"year\\\" from projection does not support" in str(binding_response.issues) + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + def test_raw_format(self, kikimr, s3, client, runtime_listing, yq_version, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("raw_bucket") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + s3_client.put_object( + Body='text', + Bucket='raw_bucket', + Key='raw_format/year=2023/month=01/day=14/file1.txt', + ContentType='text/plain', + ) + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "rawbucket" + client.create_storage_connection(storage_connection_name, "raw_bucket") + + sql = ( + f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + ''' + + R''' + $projection = @@ { + "projection.enabled" : "true", + "storage.location.template" : "/${timestamp}", + "projection.timestamp.type" : "date", + "projection.timestamp.min" : "2023-01-14", + "projection.timestamp.max" : "2023-01-14", + "projection.timestamp.interval" : "1", + "projection.timestamp.format" : "year=%Y/month=%m/day=%d", + "projection.timestamp.unit" : "DAYS" + } + @@; + ''' + + fR''' + SELECT + data, + timestamp + FROM + `{storage_connection_name}`.`raw_format` + WITH + ( + format="raw", + schema=( + `data` String NOT NULL, + `timestamp` Date NOT NULL + ), + partitioned_by=(`timestamp`), + projection=$projection + ) + ''' + ) + + # temporary fix for dynamic listing + if yq_version == "v1": + sql = 'pragma dq.MaxTasksPerStage="10"; ' + sql + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + describe_result = client.describe_query(query_id).result + logging.info("AST: {}".format(describe_result.query.ast.data)) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + + assert len(result_set.columns) == 2 + assert result_set.columns[0].name == "data" + assert result_set.columns[0].type.type_id == ydb.Type.STRING + assert result_set.columns[1].name == "timestamp" + assert result_set.columns[1].type.type_id == ydb.Type.DATE + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].bytes_value == b"text" + assert result_set.rows[0].items[1].uint32_value == 19371 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + def test_parquet(self, kikimr, s3, client, runtime_listing, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("parquets") + bucket.create(ACL='public-read-write') + bucket.objects.all().delete() + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "pb" + client.create_storage_connection(storage_connection_name, "parquets") + + sql = fR''' + insert into `{storage_connection_name}`.`part/x=1/` with (format="parquet") + select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]); + ''' + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + sql = fR''' + insert into `{storage_connection_name}`.`part/x=2/` with (format="parquet") + select * from AS_TABLE([<|foo:234, bar:"zzz"u|>,<|foo:567, bar:"ttt"u|>]); + ''' + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + + SELECT foo, bar, x FROM `{storage_connection_name}`.`part/` + WITH + ( + format="parquet", + schema=( + foo Int NOT NULL, + bar String NOT NULL, + x Int NOT NULL, + ), + partitioned_by=(`x`) + ) + ORDER BY foo + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 3 + assert result_set.columns[0].name == "foo" + print(str(result_set.columns[0])) + assert result_set.columns[0].type.type_id == ydb.Type.INT32 + assert result_set.columns[1].name == "bar" + assert result_set.columns[1].type.type_id == ydb.Type.STRING + assert result_set.columns[2].name == "x" + assert result_set.columns[2].type.type_id == ydb.Type.INT32 + assert len(result_set.rows) == 4 + assert result_set.rows[0].items[0].int32_value == 123 + assert result_set.rows[1].items[0].int32_value == 234 + assert result_set.rows[2].items[0].int32_value == 456 + assert result_set.rows[3].items[0].int32_value == 567 + assert result_set.rows[0].items[1].bytes_value == b"xxx" + assert result_set.rows[1].items[1].bytes_value == b"zzz" + assert result_set.rows[2].items[1].bytes_value == b"yyy" + assert result_set.rows[3].items[1].bytes_value == b"ttt" + assert result_set.rows[0].items[2].int32_value == 1 + assert result_set.rows[1].items[2].int32_value == 2 + assert result_set.rows[2].items[2].int32_value == 1 + assert result_set.rows[3].items[2].int32_value == 2 diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py new file mode 100644 index 000000000000..5bd19f303a95 --- /dev/null +++ b/ydb/tests/fq/s3/test_s3_0.py @@ -0,0 +1,456 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import boto3 +import logging +import os +import pytest +import time +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_v2, yq_all + + +class TestS3(TestYdsBase): + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + def test_csv(self, kikimr, s3, client, runtime_listing, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''Fruit,Price,Weight +Banana,3,100 +Apple,2,22 +Pear,15,33''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + + SELECT * + FROM `{storage_connection_name}`.`fruits.csv` + WITH (format=csv_with_names, SCHEMA ( + Fruit String NOT NULL, + Price Int NOT NULL, + Weight Int NOT NULL + )); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 3 + assert result_set.columns[0].name == "Fruit" + assert result_set.columns[0].type.type_id == ydb.Type.STRING + assert result_set.columns[1].name == "Price" + assert result_set.columns[1].type.type_id == ydb.Type.INT32 + assert result_set.columns[2].name == "Weight" + assert result_set.columns[2].type.type_id == ydb.Type.INT32 + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].bytes_value == b"Banana" + assert result_set.rows[0].items[1].int32_value == 3 + assert result_set.rows[0].items[2].int32_value == 100 + assert result_set.rows[1].items[0].bytes_value == b"Apple" + assert result_set.rows[1].items[1].int32_value == 2 + assert result_set.rows[1].items[2].int32_value == 22 + assert result_set.rows[2].items[0].bytes_value == b"Pear" + assert result_set.rows[2].items[1].int32_value == 15 + assert result_set.rows[2].items[2].int32_value == 33 + assert sum(kikimr.control_plane.get_metering(1)) == 10 + + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_inference(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''Fruit,Price,Weight,Date +Banana,3,100,2024-01-02 +Apple,2,22,2024-03-04 +Pear,15,33,2024-05-06''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + SELECT * + FROM `{storage_connection_name}`.`fruits.csv` + WITH (format=csv_with_names, with_infer='true'); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 4 + assert result_set.columns[0].name == "Date" + assert result_set.columns[0].type.type_id == ydb.Type.DATE + assert result_set.columns[1].name == "Fruit" + assert result_set.columns[1].type.type_id == ydb.Type.UTF8 + assert result_set.columns[2].name == "Price" + assert result_set.columns[2].type.type_id == ydb.Type.INT64 + assert result_set.columns[3].name == "Weight" + assert result_set.columns[3].type.type_id == ydb.Type.INT64 + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].uint32_value == 19724 + assert result_set.rows[0].items[1].text_value == "Banana" + assert result_set.rows[0].items[2].int64_value == 3 + assert result_set.rows[0].items[3].int64_value == 100 + assert result_set.rows[1].items[0].uint32_value == 19786 + assert result_set.rows[1].items[1].text_value == "Apple" + assert result_set.rows[1].items[2].int64_value == 2 + assert result_set.rows[1].items[3].int64_value == 22 + assert result_set.rows[2].items[0].uint32_value == 19849 + assert result_set.rows[2].items[1].text_value == "Pear" + assert result_set.rows[2].items[2].int64_value == 15 + assert result_set.rows[2].items[3].int64_value == 33 + assert sum(kikimr.control_plane.get_metering(1)) == 10 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''Time,Fruit,Price +0,Banana,3 +1,Apple,2 +2,Pear,15''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = fR''' + SELECT COUNT(*) as count, + FROM `{storage_connection_name}`.`fruits.csv` + WITH (format=csv_with_names, SCHEMA ( + Time UInt64 NOT NULL, + Fruit String NOT NULL, + Price Int NOT NULL + )) + GROUP BY HOP(CAST(Time AS Timestamp?), "PT1M", "PT1M", "PT1M") + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].uint64_value == 3 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + def test_raw(self, kikimr, s3, client, runtime_listing, yq_version, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("rbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + s3_client.put_object(Body="text1", Bucket='rbucket', Key='file1.txt', ContentType='text/plain') + s3_client.put_object(Body="text3", Bucket='rbucket', Key='file3.txt', ContentType='text/plain') + s3_client.put_object(Body="text2", Bucket='rbucket', Key='file2.txt', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "rawbucket" + client.create_storage_connection(storage_connection_name, "rbucket") + + sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + + SELECT Data + FROM `{storage_connection_name}`.`*` + WITH (format=raw, SCHEMA ( + Data String NOT NULL + )) + ORDER BY Data DESC + ''' + + if yq_version == "v1": + sql = 'pragma dq.MaxTasksPerStage="10"; ' + sql + else: + sql = 'pragma ydb.MaxTasksPerStage="10"; ' + sql + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "Data" + assert result_set.columns[0].type.type_id == ydb.Type.STRING + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].bytes_value == b"text3" + assert result_set.rows[1].items[0].bytes_value == b"text2" + assert result_set.rows[2].items[0].bytes_value == b"text1" + assert sum(kikimr.control_plane.get_metering(1)) == 10 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("kikimr_params", [{"raw": 3, "": 4}], indirect=True) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + def test_limit(self, kikimr, s3, client, runtime_listing, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("lbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + s3_client.put_object(Body="text1", Bucket='lbucket', Key='file1.txt', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "limbucket" + client.create_storage_connection(storage_connection_name, "lbucket") + + sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + + SELECT Data + FROM `{storage_connection_name}`.`*` + WITH (format=raw, SCHEMA ( + Data String + )) + ORDER BY Data DESC + ''' + + query_id = client.create_query("simple", sql).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + assert "Size of object file1.txt = 5 and exceeds limit = 3 specified for format raw" in str( + client.describe_query(query_id).result + ) + + sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + + SELECT * + FROM `{storage_connection_name}`.`*` + WITH (format=csv_with_names, SCHEMA ( + Fruit String + )); + ''' + + query_id = client.create_query("simple", sql).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + assert "Size of object file1.txt = 5 and exceeds limit = 4 specified for format csv_with_names" in str( + client.describe_query(query_id).result + ) + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + def test_bad_format(self, kikimr, s3, client, runtime_listing, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("bbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + s3_client.put_object(Body="blah blah blah", Bucket='bbucket', Key='file1.txt', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "badbucket" + client.create_storage_connection(storage_connection_name, "bbucket") + + sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + + select * from `{storage_connection_name}`.`*.*` with (format=json_list, schema (data string)) limit 1; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + + @yq_v1 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True) + def test_checkpoints_on_join_s3_with_yds(self, kikimr, s3, client, unique_prefix): + # Prepare S3 + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket_name = "join_s3_with_yds" + bucket = resource.Bucket(bucket_name) + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + def put_kv(k, v): + json = '{}"key": {}, "value": "{}"{}'.format("{", k, v, "}") + s3_client.put_object(Body=json, Bucket=bucket_name, Key='a/b/c/{}.json'.format(k), ContentType='text/json') + + put_kv(1, "one") + put_kv(2, "two") + put_kv(3, "three") + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "s3_dict" + client.create_storage_connection(storage_connection_name, bucket_name) + + # Prepare YDS + self.init_topics("yds_dict") + yds_connection_name = unique_prefix + "yds" + client.create_yds_connection(name=yds_connection_name, database_id="FakeDatabaseId") + + # Run query + sql = f''' + PRAGMA dq.MaxTasksPerStage="2"; + + $s3_dict_raw = + SELECT cast(Data AS json) AS data + FROM `{storage_connection_name}`.`*` + WITH (format=raw, SCHEMA ( + Data String NOT NULL + )); + + $s3_dict = + SELECT + cast(JSON_VALUE(data, '$.key') AS int64) AS key, + cast(JSON_VALUE(data, '$.value') AS String) AS value + FROM $s3_dict_raw; + + $parsed_yson_topic = + SELECT + Yson::LookupInt64(yson_data, "key") AS key, + Yson::LookupString(yson_data, "val") AS val + FROM ( + SELECT + Yson::Parse(Data) AS yson_data + FROM `{yds_connection_name}`.`{self.input_topic}` WITH SCHEMA (Data String NOT NULL)); + + $joined_seq = + SELECT + s3_dict.value AS num, + yds_seq.val AS word + FROM $parsed_yson_topic AS yds_seq + INNER JOIN $s3_dict AS s3_dict + ON yds_seq.key = s3_dict.key; + + INSERT INTO `{yds_connection_name}`.`{self.output_topic}` + SELECT + Yson::SerializeText(Yson::From(TableRow())) + FROM $joined_seq; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.control_plane.wait_zero_checkpoint(query_id) + + yds_data = [ + '{"key" = 1; "val" = "January";}', + '{"key" = 2; "val" = "February";}', + '{"key" = 3; "val" = "March";}', + '{"key" = 1; "val" = "Monday";}', + '{"key" = 2; "val" = "Tuesday";}', + '{"key" = 3; "val" = "Wednesday";}', + '{"key" = 1; "val" = "Gold";}', + '{"key" = 2; "val" = "Silver";}', + '{"key" = 3; "val" = "Bronze";}', + ] + self.write_stream(yds_data) + + expected = [ + '{"num" = "one"; "word" = "January"}', + '{"num" = "two"; "word" = "February"}', + '{"num" = "three"; "word" = "March"}', + '{"num" = "one"; "word" = "Monday"}', + '{"num" = "two"; "word" = "Tuesday"}', + '{"num" = "three"; "word" = "Wednesday"}', + '{"num" = "one"; "word" = "Gold"}', + '{"num" = "two"; "word" = "Silver"}', + '{"num" = "three"; "word" = "Bronze"}', + ] + assert self.read_stream(len(expected)) == expected + + # Check that checkpointing is finished + def wait_checkpoints(require_query_is_on=False): + deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900) + while True: + completed = kikimr.control_plane.get_completed_checkpoints(query_id, require_query_is_on) + if completed >= 3: + break + assert time.time() < deadline, "Completed: {}".format(completed) + time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2)) + + logging.debug("Wait checkpoints") + wait_checkpoints(True) + logging.debug("Wait checkpoints success") + + kikimr.control_plane.kikimr_cluster.nodes[1].stop() + kikimr.control_plane.kikimr_cluster.nodes[1].start() + kikimr.control_plane.wait_bootstrap(1) + + logging.debug("Wait checkpoints after restore") + wait_checkpoints(False) + logging.debug("Wait checkpoints after restore success") + + client.abort_query(query_id) + client.wait_query(query_id) diff --git a/ydb/tests/fq/s3/test_s3.py b/ydb/tests/fq/s3/test_s3_1.py similarity index 51% rename from ydb/tests/fq/s3/test_s3.py rename to ydb/tests/fq/s3/test_s3_1.py index 8c265ef90437..2a117867c7a5 100644 --- a/ydb/tests/fq/s3/test_s3.py +++ b/ydb/tests/fq/s3/test_s3_1.py @@ -3,458 +3,15 @@ import boto3 import logging -import os import pytest import time import ydb.public.api.protos.draft.fq_pb2 as fq import ydb.public.api.protos.ydb_value_pb2 as ydb -import ydb.tests.library.common.yatest_common as yatest_common from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase -from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_v2, yq_all +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all class TestS3(TestYdsBase): - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("runtime_listing", ["false", "true"]) - def test_csv(self, kikimr, s3, client, runtime_listing, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("fbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = '''Fruit,Price,Weight -Banana,3,100 -Apple,2,22 -Pear,15,33''' - s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "fruitbucket" - client.create_storage_connection(storage_connection_name, "fbucket") - - sql = f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - - SELECT * - FROM `{storage_connection_name}`.`fruits.csv` - WITH (format=csv_with_names, SCHEMA ( - Fruit String NOT NULL, - Price Int NOT NULL, - Weight Int NOT NULL - )); - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 3 - assert result_set.columns[0].name == "Fruit" - assert result_set.columns[0].type.type_id == ydb.Type.STRING - assert result_set.columns[1].name == "Price" - assert result_set.columns[1].type.type_id == ydb.Type.INT32 - assert result_set.columns[2].name == "Weight" - assert result_set.columns[2].type.type_id == ydb.Type.INT32 - assert len(result_set.rows) == 3 - assert result_set.rows[0].items[0].bytes_value == b"Banana" - assert result_set.rows[0].items[1].int32_value == 3 - assert result_set.rows[0].items[2].int32_value == 100 - assert result_set.rows[1].items[0].bytes_value == b"Apple" - assert result_set.rows[1].items[1].int32_value == 2 - assert result_set.rows[1].items[2].int32_value == 22 - assert result_set.rows[2].items[0].bytes_value == b"Pear" - assert result_set.rows[2].items[1].int32_value == 15 - assert result_set.rows[2].items[2].int32_value == 33 - assert sum(kikimr.control_plane.get_metering(1)) == 10 - - @yq_v2 - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_inference(self, kikimr, s3, client, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("fbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = '''Fruit,Price,Weight,Date -Banana,3,100,2024-01-02 -Apple,2,22,2024-03-04 -Pear,15,33,2024-05-06''' - s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "fruitbucket" - client.create_storage_connection(storage_connection_name, "fbucket") - - sql = f''' - SELECT * - FROM `{storage_connection_name}`.`fruits.csv` - WITH (format=csv_with_names, with_infer='true'); - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 4 - assert result_set.columns[0].name == "Date" - assert result_set.columns[0].type.type_id == ydb.Type.DATE - assert result_set.columns[1].name == "Fruit" - assert result_set.columns[1].type.type_id == ydb.Type.UTF8 - assert result_set.columns[2].name == "Price" - assert result_set.columns[2].type.type_id == ydb.Type.INT64 - assert result_set.columns[3].name == "Weight" - assert result_set.columns[3].type.type_id == ydb.Type.INT64 - assert len(result_set.rows) == 3 - assert result_set.rows[0].items[0].uint32_value == 19724 - assert result_set.rows[0].items[1].text_value == "Banana" - assert result_set.rows[0].items[2].int64_value == 3 - assert result_set.rows[0].items[3].int64_value == 100 - assert result_set.rows[1].items[0].uint32_value == 19786 - assert result_set.rows[1].items[1].text_value == "Apple" - assert result_set.rows[1].items[2].int64_value == 2 - assert result_set.rows[1].items[3].int64_value == 22 - assert result_set.rows[2].items[0].uint32_value == 19849 - assert result_set.rows[2].items[1].text_value == "Pear" - assert result_set.rows[2].items[2].int64_value == 15 - assert result_set.rows[2].items[3].int64_value == 33 - assert sum(kikimr.control_plane.get_metering(1)) == 10 - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("fbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - fruits = '''Time,Fruit,Price -0,Banana,3 -1,Apple,2 -2,Pear,15''' - s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "fruitbucket" - client.create_storage_connection(storage_connection_name, "fbucket") - - sql = fR''' - SELECT COUNT(*) as count, - FROM `{storage_connection_name}`.`fruits.csv` - WITH (format=csv_with_names, SCHEMA ( - Time UInt64 NOT NULL, - Fruit String NOT NULL, - Price Int NOT NULL - )) - GROUP BY HOP(CAST(Time AS Timestamp?), "PT1M", "PT1M", "PT1M") - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 1 - assert len(result_set.rows) == 1 - assert result_set.rows[0].items[0].uint64_value == 3 - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("runtime_listing", ["false", "true"]) - def test_raw(self, kikimr, s3, client, runtime_listing, yq_version, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("rbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - s3_client.put_object(Body="text1", Bucket='rbucket', Key='file1.txt', ContentType='text/plain') - s3_client.put_object(Body="text3", Bucket='rbucket', Key='file3.txt', ContentType='text/plain') - s3_client.put_object(Body="text2", Bucket='rbucket', Key='file2.txt', ContentType='text/plain') - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "rawbucket" - client.create_storage_connection(storage_connection_name, "rbucket") - - sql = f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - - SELECT Data - FROM `{storage_connection_name}`.`*` - WITH (format=raw, SCHEMA ( - Data String NOT NULL - )) - ORDER BY Data DESC - ''' - - if yq_version == "v1": - sql = 'pragma dq.MaxTasksPerStage="10"; ' + sql - else: - sql = 'pragma ydb.MaxTasksPerStage="10"; ' + sql - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - logging.debug(str(result_set)) - assert len(result_set.columns) == 1 - assert result_set.columns[0].name == "Data" - assert result_set.columns[0].type.type_id == ydb.Type.STRING - assert len(result_set.rows) == 3 - assert result_set.rows[0].items[0].bytes_value == b"text3" - assert result_set.rows[1].items[0].bytes_value == b"text2" - assert result_set.rows[2].items[0].bytes_value == b"text1" - assert sum(kikimr.control_plane.get_metering(1)) == 10 - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("kikimr_params", [{"raw": 3, "": 4}], indirect=True) - @pytest.mark.parametrize("runtime_listing", ["false", "true"]) - def test_limit(self, kikimr, s3, client, runtime_listing, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("lbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - s3_client.put_object(Body="text1", Bucket='lbucket', Key='file1.txt', ContentType='text/plain') - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "limbucket" - client.create_storage_connection(storage_connection_name, "lbucket") - - sql = f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - - SELECT Data - FROM `{storage_connection_name}`.`*` - WITH (format=raw, SCHEMA ( - Data String - )) - ORDER BY Data DESC - ''' - - query_id = client.create_query("simple", sql).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.FAILED) - assert "Size of object file1.txt = 5 and exceeds limit = 3 specified for format raw" in str( - client.describe_query(query_id).result - ) - - sql = f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - - SELECT * - FROM `{storage_connection_name}`.`*` - WITH (format=csv_with_names, SCHEMA ( - Fruit String - )); - ''' - - query_id = client.create_query("simple", sql).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.FAILED) - assert "Size of object file1.txt = 5 and exceeds limit = 4 specified for format csv_with_names" in str( - client.describe_query(query_id).result - ) - - @yq_all - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("runtime_listing", ["false", "true"]) - def test_bad_format(self, kikimr, s3, client, runtime_listing, unique_prefix): - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket = resource.Bucket("bbucket") - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - s3_client.put_object(Body="blah blah blah", Bucket='bbucket', Key='file1.txt', ContentType='text/plain') - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "badbucket" - client.create_storage_connection(storage_connection_name, "bbucket") - - sql = f''' - pragma s3.UseRuntimeListing="{runtime_listing}"; - - select * from `{storage_connection_name}`.`*.*` with (format=json_list, schema (data string)) limit 1; - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.FAILED) - - @yq_v1 - @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True) - def test_checkpoints_on_join_s3_with_yds(self, kikimr, s3, client, unique_prefix): - # Prepare S3 - resource = boto3.resource( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - s3_client = boto3.client( - "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" - ) - - bucket_name = "join_s3_with_yds" - bucket = resource.Bucket(bucket_name) - bucket.create(ACL='public-read') - bucket.objects.all().delete() - - def put_kv(k, v): - json = '{}"key": {}, "value": "{}"{}'.format("{", k, v, "}") - s3_client.put_object(Body=json, Bucket=bucket_name, Key='a/b/c/{}.json'.format(k), ContentType='text/json') - - put_kv(1, "one") - put_kv(2, "two") - put_kv(3, "three") - - kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "s3_dict" - client.create_storage_connection(storage_connection_name, bucket_name) - - # Prepare YDS - self.init_topics("yds_dict") - yds_connection_name = unique_prefix + "yds" - client.create_yds_connection(name=yds_connection_name, database_id="FakeDatabaseId") - - # Run query - sql = f''' - PRAGMA dq.MaxTasksPerStage="2"; - - $s3_dict_raw = - SELECT cast(Data AS json) AS data - FROM `{storage_connection_name}`.`*` - WITH (format=raw, SCHEMA ( - Data String NOT NULL - )); - - $s3_dict = - SELECT - cast(JSON_VALUE(data, '$.key') AS int64) AS key, - cast(JSON_VALUE(data, '$.value') AS String) AS value - FROM $s3_dict_raw; - - $parsed_yson_topic = - SELECT - Yson::LookupInt64(yson_data, "key") AS key, - Yson::LookupString(yson_data, "val") AS val - FROM ( - SELECT - Yson::Parse(Data) AS yson_data - FROM `{yds_connection_name}`.`{self.input_topic}` WITH SCHEMA (Data String NOT NULL)); - - $joined_seq = - SELECT - s3_dict.value AS num, - yds_seq.val AS word - FROM $parsed_yson_topic AS yds_seq - INNER JOIN $s3_dict AS s3_dict - ON yds_seq.key = s3_dict.key; - - INSERT INTO `{yds_connection_name}`.`{self.output_topic}` - SELECT - Yson::SerializeText(Yson::From(TableRow())) - FROM $joined_seq; - ''' - - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id - client.wait_query_status(query_id, fq.QueryMeta.RUNNING) - kikimr.control_plane.wait_zero_checkpoint(query_id) - - yds_data = [ - '{"key" = 1; "val" = "January";}', - '{"key" = 2; "val" = "February";}', - '{"key" = 3; "val" = "March";}', - '{"key" = 1; "val" = "Monday";}', - '{"key" = 2; "val" = "Tuesday";}', - '{"key" = 3; "val" = "Wednesday";}', - '{"key" = 1; "val" = "Gold";}', - '{"key" = 2; "val" = "Silver";}', - '{"key" = 3; "val" = "Bronze";}', - ] - self.write_stream(yds_data) - - expected = [ - '{"num" = "one"; "word" = "January"}', - '{"num" = "two"; "word" = "February"}', - '{"num" = "three"; "word" = "March"}', - '{"num" = "one"; "word" = "Monday"}', - '{"num" = "two"; "word" = "Tuesday"}', - '{"num" = "three"; "word" = "Wednesday"}', - '{"num" = "one"; "word" = "Gold"}', - '{"num" = "two"; "word" = "Silver"}', - '{"num" = "three"; "word" = "Bronze"}', - ] - assert self.read_stream(len(expected)) == expected - - # Check that checkpointing is finished - def wait_checkpoints(require_query_is_on=False): - deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900) - while True: - completed = kikimr.control_plane.get_completed_checkpoints(query_id, require_query_is_on) - if completed >= 3: - break - assert time.time() < deadline, "Completed: {}".format(completed) - time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2)) - - logging.debug("Wait checkpoints") - wait_checkpoints(True) - logging.debug("Wait checkpoints success") - - kikimr.control_plane.kikimr_cluster.nodes[1].stop() - kikimr.control_plane.kikimr_cluster.nodes[1].start() - kikimr.control_plane.wait_bootstrap(1) - - logging.debug("Wait checkpoints after restore") - wait_checkpoints(False) - logging.debug("Wait checkpoints after restore success") - - client.abort_query(query_id) - client.wait_query(query_id) - @yq_v1 # v2 compute with multiple nodes is not supported yet @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) @pytest.mark.parametrize("kikimr_params", [{"compute": 3}], indirect=True) diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make index 994f072eeef6..502dc1455f1e 100644 --- a/ydb/tests/fq/s3/ya.make +++ b/ydb/tests/fq/s3/ya.make @@ -20,18 +20,21 @@ DEPENDS( ) TEST_SRCS( - test_bindings.py + test_bindings_0.py + test_bindings_1.py test_compressions.py test_early_finish.py test_empty.py - test_explicit_partitioning.py + test_explicit_partitioning_0.py + test_explicit_partitioning_1.py test_format_setting.py test_formats.py test_inflight.py test_insert.py test_public_metrics.py test_push_down.py - test_s3.py + test_s3_0.py + test_s3_1.py test_size_limit.py test_statistics.py test_test_connection.py @@ -48,9 +51,12 @@ DATA( arcadia/ydb/tests/fq/s3 ) -TIMEOUT(3600) -SIZE(LARGE) -TAG(ya:fat) +IF (SANITIZER_TYPE == "thread" OR SANITIZER_TYPE == "address") + SIZE(LARGE) + TAG(ya:fat) +ELSE() + SIZE(MEDIUM) +ENDIF() REQUIREMENTS(ram:16) diff --git a/ydb/tests/library/harness/kikimr_runner.py b/ydb/tests/library/harness/kikimr_runner.py index bbc10df0b067..8d17c731aee8 100644 --- a/ydb/tests/library/harness/kikimr_runner.py +++ b/ydb/tests/library/harness/kikimr_runner.py @@ -421,10 +421,13 @@ def unregister_and_stop_slots(self, slots): for i in slots: i.stop() - def __stop_node(self, node): + def __stop_node(self, node, kill=False): ret = None try: - node.stop() + if kill: + node.kill() + else: + node.stop() except daemon.DaemonError as exceptions: ret = exceptions else: @@ -434,16 +437,16 @@ def __stop_node(self, node): shutil.rmtree(self.__common_udfs_dir, ignore_errors=True) return ret - def stop(self): + def stop(self, kill=False): saved_exceptions = [] for slot in self.slots.values(): - exception = self.__stop_node(slot) + exception = self.__stop_node(slot, kill) if exception is not None: saved_exceptions.append(exception) for node in self.nodes.values(): - exception = self.__stop_node(node) + exception = self.__stop_node(node, kill) if exception is not None: saved_exceptions.append(exception) diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index a928e15cc89c..f1ed9a18d6d9 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -52,7 +52,7 @@ def start(self): def stop(self): if self.kikimr_cluster: - self.kikimr_cluster.stop() + self.kikimr_cluster.stop(kill=False) def endpoint(self, node_index=None): return "localhost:{}".format(