Skip to content

Commit b62f247

Browse files
authored
YQ splitted fq/s3/test_s3.py tests (#5798)
1 parent f33e399 commit b62f247

10 files changed

+1364
-1293
lines changed

ydb/tests/fq/s3/conftest.py

+31-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,19 @@
2727
S3_PID_FILE = "s3.pid"
2828

2929

30+
class TestCounter:
31+
def __init__(self, tests_count_limit, error_string):
32+
self.tests_count_limit = tests_count_limit
33+
self.error_string = error_string
34+
self.number_tests = 0
35+
36+
def on_test_start(self):
37+
self.number_tests += 1
38+
assert self.number_tests <= self.tests_count_limit, \
39+
f"{self.error_string} exceeded limit {self.number_tests} vs {self.tests_count_limit}, " \
40+
"this may lead timeouts on CI, please split this file"
41+
42+
3043
@pytest.fixture(scope="module")
3144
def mvp_external_ydb_endpoint(request) -> str:
3245
return request.param["endpoint"] if request is not None and hasattr(request, 'param') else None
@@ -86,14 +99,21 @@ def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external
8699

87100

88101
@pytest.fixture(scope="module")
89-
def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint):
102+
def kikimr_starts_counter():
103+
return TestCounter(10, "Number kikimr restarts in one module")
104+
105+
106+
@pytest.fixture(scope="module")
107+
def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter):
108+
kikimr_starts_counter.on_test_start()
90109
kikimr_extensions = get_kikimr_extensions(s3, YQV1_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint)
91110
with start_kikimr(kikimr_params, kikimr_extensions) as kikimr:
92111
yield kikimr
93112

94113

95114
@pytest.fixture(scope="module")
96-
def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint):
115+
def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter):
116+
kikimr_starts_counter.on_test_start()
97117
kikimr_extensions = get_kikimr_extensions(s3, YQV2_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint)
98118
with start_kikimr(kikimr_params, kikimr_extensions) as kikimr:
99119
yield kikimr
@@ -115,8 +135,14 @@ def kikimr(yq_version: str, kikimr_yqv1, kikimr_yqv2):
115135
return kikimr
116136

117137

138+
@pytest.fixture(scope="module")
139+
def tests_counter():
140+
return TestCounter(200, "Number tests in one module")
141+
142+
118143
@pytest.fixture
119-
def client(kikimr, request=None):
144+
def client(kikimr, tests_counter, request=None):
145+
tests_counter.on_test_start()
120146
client = FederatedQueryClient(
121147
request.param["folder_id"] if request is not None else "my_folder", streaming_over_kikimr=kikimr
122148
)
@@ -128,8 +154,5 @@ def client(kikimr, request=None):
128154

129155
@pytest.fixture
130156
def unique_prefix(request: pytest.FixtureRequest):
131-
name_hash = hash(request.node.name)
132-
if name_hash >= 0:
133-
return f"p{name_hash}_"
134-
else:
135-
return f"n{-name_hash}_"
157+
name_hash = abs(hash(request.node.name))
158+
return f"h{name_hash}_{request.function.__name__}"

ydb/tests/fq/s3/test_bindings.py renamed to ydb/tests/fq/s3/test_bindings_0.py

-268
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# -*- coding: utf-8 -*-
33

44
import boto3
5-
import logging
65
import pytest
76

87
import ydb.public.api.protos.ydb_value_pb2 as ydb
@@ -346,270 +345,3 @@ def test_name_uniqueness_constraint(self, kikimr, client: FederatedQueryClient,
346345
== "Connection with the same name already exists. Please choose another name"
347346
)
348347
assert modify_binding_result.issues[0].severity == 1
349-
350-
@yq_all
351-
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
352-
@pytest.mark.parametrize("kikimr_settings", [{"bindings_mode": "BM_DROP_WITH_WARNING"}], indirect=True)
353-
def test_s3_insert(self, kikimr, s3, client, yq_version, unique_prefix):
354-
resource = boto3.resource(
355-
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
356-
)
357-
358-
bucket = resource.Bucket("bindbucket")
359-
bucket.create(ACL='public-read')
360-
bucket.objects.all().delete()
361-
362-
kikimr.control_plane.wait_bootstrap(1)
363-
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id
364-
365-
fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
366-
barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
367-
storage_binding_name = unique_prefix + "s3binding"
368-
client.create_object_storage_binding(
369-
name=storage_binding_name,
370-
path="path1/",
371-
format="csv_with_names",
372-
connection_id=connection_id,
373-
columns=[fooType, barType],
374-
)
375-
376-
sql = fR'''
377-
insert into bindings.`{storage_binding_name}`
378-
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
379-
'''
380-
381-
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
382-
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
383-
if yq_version == "v2":
384-
issues = str(client.describe_query(query_id).result.query.issue)
385-
assert (
386-
"message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon"
387-
in issues
388-
)
389-
assert "severity: 2" in issues
390-
391-
sql = fR'''
392-
select foo, bar from bindings.`{storage_binding_name}`;
393-
'''
394-
395-
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
396-
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
397-
if yq_version == "v2":
398-
issues = str(client.describe_query(query_id).result.query.issue)
399-
assert (
400-
"message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon"
401-
in issues
402-
)
403-
assert "severity: 2" in issues
404-
405-
data = client.get_result_data(query_id)
406-
result_set = data.result.result_set
407-
assert len(result_set.columns) == 2
408-
assert result_set.columns[0].name == "foo"
409-
assert result_set.columns[0].type.type_id == ydb.Type.INT32
410-
assert result_set.columns[1].name == "bar"
411-
assert result_set.columns[1].type.type_id == ydb.Type.UTF8
412-
assert len(result_set.rows) == 2
413-
assert result_set.rows[0].items[0].int32_value == 123
414-
assert result_set.rows[0].items[1].text_value == 'xxx'
415-
assert result_set.rows[1].items[0].int32_value == 456
416-
assert result_set.rows[1].items[1].text_value == 'yyy'
417-
418-
@yq_all
419-
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
420-
def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix):
421-
resource = boto3.resource(
422-
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
423-
)
424-
425-
bucket = resource.Bucket("bindbucket")
426-
bucket.create(ACL='public-read')
427-
428-
kikimr.control_plane.wait_bootstrap(1)
429-
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id
430-
431-
fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
432-
barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
433-
storage_binding_name = unique_prefix + "s3binding"
434-
client.create_object_storage_binding(
435-
name=storage_binding_name,
436-
path="path2/",
437-
format="csv_with_names",
438-
connection_id=connection_id,
439-
columns=[fooType, barType],
440-
)
441-
442-
sql = fR'''
443-
insert into bindings.`{storage_binding_name}`
444-
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
445-
'''
446-
447-
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
448-
client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED])
449-
450-
describe_result = client.describe_query(query_id).result
451-
describe_string = "{}".format(describe_result)
452-
assert "Type mismatch between schema type" in describe_string, describe_string
453-
454-
@yq_all
455-
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
456-
def test_pg_binding(self, kikimr, s3, client, unique_prefix):
457-
resource = boto3.resource(
458-
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
459-
)
460-
461-
bucket = resource.Bucket("fbucket")
462-
bucket.create(ACL='public-read')
463-
464-
s3_client = boto3.client(
465-
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
466-
)
467-
468-
fruits = R'''Fruit,Price
469-
Banana,3
470-
Apple,2
471-
Pear,15'''
472-
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='a/fruits.csv', ContentType='text/plain')
473-
474-
kikimr.control_plane.wait_bootstrap(1)
475-
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")
476-
477-
fruitType = ydb.Column(name="Fruit", type=ydb.Type(pg_type=ydb.PgType(oid=25)))
478-
priceType = ydb.Column(name="Price", type=ydb.Type(pg_type=ydb.PgType(oid=23)))
479-
storage_binding_name = unique_prefix + "my_binding"
480-
client.create_object_storage_binding(
481-
name=storage_binding_name,
482-
path="a/",
483-
format="csv_with_names",
484-
connection_id=connection_response.result.connection_id,
485-
columns=[fruitType, priceType],
486-
format_setting={"file_pattern": "*.csv"},
487-
)
488-
489-
sql = fR'''
490-
SELECT *
491-
FROM bindings.{storage_binding_name};
492-
'''
493-
494-
query_id = client.create_query(
495-
"simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=True
496-
).result.query_id
497-
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
498-
499-
data = client.get_result_data(query_id)
500-
result_set = data.result.result_set
501-
logging.debug(str(result_set))
502-
assert len(result_set.columns) == 2
503-
assert result_set.columns[0].name == "Fruit"
504-
assert result_set.columns[0].type.pg_type.oid == 25
505-
assert result_set.columns[1].name == "Price"
506-
assert result_set.columns[1].type.pg_type.oid == 23
507-
assert len(result_set.rows) == 3
508-
assert result_set.rows[0].items[0].text_value == "Banana"
509-
assert result_set.rows[0].items[1].text_value == "3"
510-
assert result_set.rows[1].items[0].text_value == "Apple"
511-
assert result_set.rows[1].items[1].text_value == "2"
512-
assert result_set.rows[2].items[0].text_value == "Pear"
513-
assert result_set.rows[2].items[1].text_value == "15"
514-
515-
@yq_all
516-
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
517-
@pytest.mark.parametrize("pg_syntax", [False, True], ids=["yql_syntax", "pg_syntax"])
518-
def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax, unique_prefix):
519-
resource = boto3.resource(
520-
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
521-
)
522-
523-
bucket = resource.Bucket("count_for_pg_binding")
524-
bucket.create(ACL='public-read')
525-
526-
s3_client = boto3.client(
527-
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
528-
)
529-
530-
row = R'''{"a": 42, "b": 3.14, "c": "text"}'''
531-
s3_client.put_object(Body=row, Bucket='count_for_pg_binding', Key='abc.json', ContentType='text/json')
532-
533-
kikimr.control_plane.wait_bootstrap(1)
534-
connection_response = client.create_storage_connection(unique_prefix + "abc", "count_for_pg_binding")
535-
536-
aType = ydb.Column(name="a", type=ydb.Type(pg_type=ydb.PgType(oid=23)))
537-
bType = ydb.Column(name="b", type=ydb.Type(pg_type=ydb.PgType(oid=701)))
538-
cType = ydb.Column(name="c", type=ydb.Type(pg_type=ydb.PgType(oid=25)))
539-
storage_binding_name = unique_prefix + "binding_for_count"
540-
client.create_object_storage_binding(
541-
name=storage_binding_name,
542-
path="abc.json",
543-
format="json_each_row",
544-
connection_id=connection_response.result.connection_id,
545-
columns=[aType, bType, cType],
546-
)
547-
548-
sql = fR'''
549-
SELECT COUNT(*)
550-
FROM bindings.{storage_binding_name};
551-
'''
552-
553-
query_id = client.create_query(
554-
"simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=pg_syntax
555-
).result.query_id
556-
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
557-
558-
data = client.get_result_data(query_id)
559-
result_set = data.result.result_set
560-
logging.debug(str(result_set))
561-
assert len(result_set.columns) == 1
562-
assert len(result_set.rows) == 1
563-
if pg_syntax:
564-
assert result_set.columns[0].type.pg_type.oid == 20
565-
assert result_set.rows[0].items[0].text_value == "1"
566-
else:
567-
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
568-
assert result_set.rows[0].items[0].uint64_value == 1
569-
570-
@yq_all
571-
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
572-
def test_ast_in_failed_query_compilation(self, kikimr, s3, client, unique_prefix):
573-
resource = boto3.resource(
574-
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
575-
)
576-
577-
bucket = resource.Bucket("bindbucket")
578-
bucket.create(ACL='public-read')
579-
bucket.objects.all().delete()
580-
581-
kikimr.control_plane.wait_bootstrap(1)
582-
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id
583-
584-
data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))
585-
storage_binding_name = unique_prefix + "s3binding"
586-
client.create_object_storage_binding(
587-
name=storage_binding_name, path="/", format="raw", connection_id=connection_id, columns=[data_column]
588-
)
589-
590-
sql = fR'''
591-
SELECT some_unknown_column FROM bindings.`{storage_binding_name}`;
592-
'''
593-
594-
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
595-
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
596-
597-
ast = client.describe_query(query_id).result.query.ast.data
598-
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"
599-
600-
@yq_all
601-
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
602-
def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix):
603-
kikimr.control_plane.wait_bootstrap(1)
604-
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")
605-
binding_response = client.create_object_storage_binding(
606-
name=unique_prefix + "my_binding",
607-
path="fruits.csv",
608-
format="raw",
609-
connection_id=connection_response.result.connection_id,
610-
columns=[],
611-
check_issues=False,
612-
)
613-
assert "Only one column in schema supported in raw format" in str(binding_response.issues), str(
614-
binding_response.issues
615-
)

0 commit comments

Comments
 (0)