Skip to content

Commit a01e11a

Browse files
committed
Fixed naming
1 parent 669aa7d commit a01e11a

File tree

4 files changed

+552
-552
lines changed

4 files changed

+552
-552
lines changed

ydb/tests/fq/s3/conftest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def on_test_start(self):
3737
self.number_tests += 1
3838
assert self.number_tests <= self.tests_count_limit, \
3939
f"{self.error_string} exceeded limit {self.number_tests} vs {self.tests_count_limit}, " \
40-
"this may lead timeouts on CI, please split this class"
40+
"this may lead timeouts on CI, please split this file"
4141

4242

4343
@pytest.fixture(scope="module")

ydb/tests/fq/s3/test_bindings_0.py

+348
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import boto3
5+
import logging
6+
import pytest
7+
8+
import ydb.public.api.protos.ydb_value_pb2 as ydb
9+
import ydb.public.api.protos.draft.fq_pb2 as fq
10+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all
11+
from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
12+
13+
ValueByTypeExtractors = {
14+
ydb.Type.PrimitiveTypeId.INT32: lambda x: x.int32_value,
15+
ydb.Type.PrimitiveTypeId.UINT64: lambda x: x.uint64_value,
16+
ydb.Type.PrimitiveTypeId.UTF8: lambda x: x.text_value,
17+
}
18+
19+
20+
class TestBindings:
21+
@staticmethod
22+
def _preprocess_query(sql: str, yq_version: str) -> str:
23+
if yq_version == 'v1':
24+
return f"PRAGMA dq.EnableDqReplicate='true'; PRAGMA dq.MaxTasksPerOperation='110'; {sql}"
25+
26+
return sql
27+
28+
@staticmethod
29+
def _assert_connections(client: FederatedQueryClient, expected_connection_names: list[str]):
30+
actual_connections = [
31+
connection.content.name for connection in client.list_connections(fq.Acl.Visibility.SCOPE).result.connection
32+
]
33+
assert set(actual_connections) == set(expected_connection_names)
34+
35+
@staticmethod
36+
def _assert_bindings(client: FederatedQueryClient, expected_binding_names: list[str]):
37+
actual_bindings = [binding.name for binding in client.list_bindings(fq.Acl.Visibility.SCOPE).result.binding]
38+
assert set(actual_bindings) == set(expected_binding_names)
39+
40+
@staticmethod
41+
def _assert_query_results(client: FederatedQueryClient, sql: str, yq_version: str, expected_result_set):
42+
query_id = client.create_query(
43+
"simple", TestBindings._preprocess_query(sql, yq_version), type=fq.QueryContent.QueryType.ANALYTICS
44+
).result.query_id
45+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
46+
47+
expected_columns = expected_result_set['columns']
48+
expected_rows = expected_result_set['rows']
49+
50+
data = client.get_result_data(query_id)
51+
result_set = data.result.result_set
52+
assert len(result_set.columns) == len(expected_columns)
53+
for i, (column_name, column_type_id) in enumerate(expected_columns):
54+
assert result_set.columns[i].name == column_name
55+
assert result_set.columns[i].type.type_id == column_type_id
56+
57+
assert len(result_set.rows) == len(expected_rows)
58+
for i, row in enumerate(expected_rows):
59+
for j, expected_value in enumerate(row):
60+
value_extractor = ValueByTypeExtractors[result_set.columns[j].type.type_id]
61+
assert value_extractor(result_set.rows[i].items[j]) == expected_value
62+
63+
@yq_all
64+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
65+
@pytest.mark.parametrize(
66+
"kikimr_settings", [{"is_replace_if_exists": True}, {"is_replace_if_exists": False}], indirect=True
67+
)
68+
def test_binding_operations(self, kikimr, s3, client: FederatedQueryClient, yq_version, unique_prefix):
69+
resource = boto3.resource(
70+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
71+
)
72+
73+
bucket = resource.Bucket("bindbucket")
74+
bucket.create(ACL='public-read')
75+
76+
s3_client = boto3.client(
77+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
78+
)
79+
s3_client.put_object(
80+
Body=R'''{"a": 42, "b": "text"}''', Bucket='bindbucket', Key='abc.json', ContentType='text/json'
81+
)
82+
83+
expected_result_set = {
84+
'columns': [('a', ydb.Type.PrimitiveTypeId.INT32), ('b', ydb.Type.PrimitiveTypeId.UTF8)],
85+
'rows': [(42, "text")],
86+
}
87+
88+
# Test connection creation
89+
kikimr.control_plane.wait_bootstrap(1)
90+
storage_connection_name = unique_prefix + "connection_name"
91+
connection_id = client.create_storage_connection(
92+
storage_connection_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE
93+
).result.connection_id
94+
95+
self._assert_connections(client, expected_connection_names=[storage_connection_name])
96+
self._assert_query_results(
97+
client,
98+
fR'''
99+
SELECT *
100+
FROM `{storage_connection_name}`.`abc.json` WITH (
101+
FORMAT="json_each_row",
102+
SCHEMA (
103+
a Int32 NOT NULL,
104+
b Utf8 NOT NULL
105+
));''',
106+
yq_version,
107+
expected_result_set,
108+
)
109+
110+
# Test binding creation
111+
a_type = ydb.Column(name="a", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
112+
b_type = ydb.Column(name="b", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
113+
storage_binding_name = unique_prefix + "binding_name"
114+
binding_id = client.create_object_storage_binding(
115+
name=storage_binding_name,
116+
path="abc.json",
117+
format="json_each_row",
118+
connection_id=connection_id,
119+
visibility=fq.Acl.Visibility.SCOPE,
120+
columns=[a_type, b_type],
121+
).result.binding_id
122+
123+
self._assert_bindings(client, expected_binding_names=[storage_binding_name])
124+
self._assert_query_results(
125+
client, fR'SELECT * FROM bindings.`{storage_binding_name}`;', yq_version, expected_result_set
126+
)
127+
128+
# Test binding modification
129+
new_storage_binding_name = unique_prefix + "new_binding_name"
130+
client.modify_object_storage_binding(
131+
binding_id=binding_id,
132+
name=new_storage_binding_name,
133+
path="abc.json",
134+
format="json_each_row",
135+
connection_id=connection_id,
136+
visibility=fq.Acl.Visibility.SCOPE,
137+
columns=[a_type, b_type],
138+
).result
139+
140+
self._assert_bindings(client, expected_binding_names=[new_storage_binding_name])
141+
self._assert_query_results(
142+
client, fR'SELECT * FROM bindings.`{new_storage_binding_name}`;', yq_version, expected_result_set
143+
)
144+
145+
# Test connection modification
146+
new_storage_connection_name = unique_prefix + "new_connection_name"
147+
client.modify_object_storage_connection(
148+
connection_id, new_storage_connection_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE
149+
)
150+
151+
self._assert_connections(client, expected_connection_names=[new_storage_connection_name])
152+
self._assert_query_results(
153+
client, fR'SELECT * FROM bindings.`{new_storage_binding_name}`;', yq_version, expected_result_set
154+
)
155+
self._assert_query_results(
156+
client,
157+
fR'''
158+
SELECT *
159+
FROM `{new_storage_connection_name}`.`abc.json` WITH (
160+
FORMAT="json_each_row",
161+
SCHEMA (
162+
a Int32 NOT NULL,
163+
b Utf8 NOT NULL
164+
));''',
165+
yq_version,
166+
expected_result_set,
167+
)
168+
169+
# Test binding deletion
170+
client.delete_binding(binding_id)
171+
self._assert_bindings(client, expected_binding_names=[])
172+
173+
# Test connection deletion
174+
client.delete_connection(connection_id)
175+
self._assert_connections(client, expected_connection_names=[])
176+
177+
@yq_all
178+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
179+
@pytest.mark.parametrize(
180+
"kikimr_settings", [{"is_replace_if_exists": True}, {"is_replace_if_exists": False}], indirect=True
181+
)
182+
def test_modify_connection_with_a_lot_of_bindings(
183+
self, kikimr, s3, client: FederatedQueryClient, yq_version, unique_prefix
184+
):
185+
pytest.skip("Tiket: YQ-2972")
186+
187+
resource = boto3.resource(
188+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
189+
)
190+
191+
bucket = resource.Bucket("bindbucket")
192+
bucket.create(ACL='public-read')
193+
194+
s3_client = boto3.client(
195+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
196+
)
197+
s3_client.put_object(
198+
Body=R'''{"a": 42, "b": "text"}''', Bucket='bindbucket', Key='abc.json', ContentType='text/json'
199+
)
200+
201+
expected_result_set = {'columns': [('count', ydb.Type.PrimitiveTypeId.UINT64)], 'rows': [(100,)]}
202+
203+
# Test connection creation
204+
kikimr.control_plane.wait_bootstrap(1)
205+
storage_connection_name = unique_prefix + "connection_name"
206+
connection_id = client.create_storage_connection(
207+
storage_connection_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE
208+
).result.connection_id
209+
210+
self._assert_connections(client, expected_connection_names=[storage_connection_name])
211+
212+
# Test binding creation
213+
a_type = ydb.Column(name="a", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
214+
b_type = ydb.Column(name="b", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
215+
for i in range(100):
216+
client.create_object_storage_binding(
217+
name=f"{unique_prefix}binding_name_{i}",
218+
path="abc.json",
219+
format="json_each_row",
220+
connection_id=connection_id,
221+
visibility=fq.Acl.Visibility.SCOPE,
222+
columns=[a_type, b_type],
223+
)
224+
225+
self._assert_bindings(client, expected_binding_names=[f'{unique_prefix}binding_name_{i}' for i in range(100)])
226+
self._assert_query_results(
227+
client,
228+
f"SELECT COUNT(*) as count FROM ({' UNION ALL '.join(f'SELECT * FROM bindings.`{unique_prefix}binding_name_{i}`' for i in range(100))})",
229+
yq_version,
230+
expected_result_set,
231+
)
232+
233+
# Test connection modification
234+
new_storage_connection_name = unique_prefix + "new_connection_name"
235+
client.modify_object_storage_connection(
236+
connection_id, new_storage_connection_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE
237+
)
238+
239+
self._assert_connections(client, expected_connection_names=[new_storage_connection_name])
240+
self._assert_bindings(client, expected_binding_names=[f'{unique_prefix}binding_name_{i}' for i in range(100)])
241+
self._assert_query_results(
242+
client,
243+
f"SELECT COUNT(*) as count FROM ({' UNION ALL '.join(f'SELECT * FROM bindings.`{unique_prefix}binding_name_{i}`' for i in range(100))})",
244+
yq_version,
245+
expected_result_set,
246+
)
247+
248+
@yq_all
249+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
250+
def test_name_uniqueness_constraint(self, kikimr, client: FederatedQueryClient, unique_prefix):
251+
# Test connection & binding creation
252+
kikimr.control_plane.wait_bootstrap(1)
253+
storage_connection_name = unique_prefix + "connection_name"
254+
connection_id = client.create_storage_connection(
255+
storage_connection_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE
256+
).result.connection_id
257+
258+
self._assert_connections(client, expected_connection_names=[storage_connection_name])
259+
260+
# Test binding creation
261+
a_type = ydb.Column(name="a", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
262+
b_type = ydb.Column(name="b", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
263+
storage_binding_name = unique_prefix + "binding_name"
264+
binding_id = client.create_object_storage_binding(
265+
name=storage_binding_name,
266+
path="abc.json",
267+
format="json_each_row",
268+
connection_id=connection_id,
269+
visibility=fq.Acl.Visibility.SCOPE,
270+
columns=[a_type, b_type],
271+
).result.binding_id
272+
self._assert_bindings(client, expected_binding_names=[storage_binding_name])
273+
274+
# Test connection & binding creation with substring names
275+
storage_connection_substring_name = unique_prefix + "connection"
276+
connection_id_substring_name = client.create_storage_connection(
277+
storage_connection_substring_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE
278+
).result.connection_id
279+
280+
self._assert_connections(
281+
client, expected_connection_names=[storage_connection_name, storage_connection_substring_name]
282+
)
283+
284+
# Test binding creation
285+
storage_binding_name_substring = unique_prefix + "binding"
286+
client.create_object_storage_binding(
287+
name=storage_binding_name_substring,
288+
path="abc.json",
289+
format="json_each_row",
290+
connection_id=connection_id_substring_name,
291+
visibility=fq.Acl.Visibility.SCOPE,
292+
columns=[a_type, b_type],
293+
)
294+
self._assert_bindings(client, expected_binding_names=[storage_binding_name, storage_binding_name_substring])
295+
296+
# Test uniqueness constraint
297+
create_connection_result = client.create_storage_connection(
298+
storage_binding_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE, check_issues=False
299+
)
300+
assert len(create_connection_result.issues) == 1
301+
assert (
302+
create_connection_result.issues[0].message
303+
== "Binding with the same name already exists. Please choose another name"
304+
)
305+
assert create_connection_result.issues[0].severity == 1
306+
307+
create_binding_result = client.create_object_storage_binding(
308+
name=storage_connection_name,
309+
path="abc.json",
310+
format="json_each_row",
311+
connection_id=connection_id,
312+
visibility=fq.Acl.Visibility.SCOPE,
313+
columns=[a_type, b_type],
314+
check_issues=False,
315+
)
316+
assert len(create_binding_result.issues) == 1
317+
assert (
318+
create_binding_result.issues[0].message
319+
== "Connection with the same name already exists. Please choose another name"
320+
)
321+
assert create_binding_result.issues[0].severity == 1
322+
323+
modify_connection_result = client.modify_object_storage_connection(
324+
connection_id, storage_binding_name, "bindbucket", visibility=fq.Acl.Visibility.SCOPE, check_issues=False
325+
)
326+
assert len(modify_connection_result.issues) == 1
327+
assert (
328+
modify_connection_result.issues[0].message
329+
== "Binding with the same name already exists. Please choose another name"
330+
)
331+
assert modify_connection_result.issues[0].severity == 1
332+
333+
modify_binding_result = client.modify_object_storage_binding(
334+
binding_id,
335+
name=storage_connection_name,
336+
path="abc.json",
337+
format="json_each_row",
338+
connection_id=connection_id,
339+
visibility=fq.Acl.Visibility.SCOPE,
340+
columns=[a_type, b_type],
341+
check_issues=False,
342+
)
343+
assert len(modify_binding_result.issues) == 1
344+
assert (
345+
modify_binding_result.issues[0].message
346+
== "Connection with the same name already exists. Please choose another name"
347+
)
348+
assert modify_binding_result.issues[0].severity == 1

0 commit comments

Comments
 (0)