Skip to content

Commit 3985116

Browse files
authored
tests/fq: add streamlookup (#7568)
1 parent 1ba20df commit 3985116

File tree

3 files changed

+298
-0
lines changed

3 files changed

+298
-0
lines changed

ydb/tests/fq/generic/test_streaming_join.py

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import pytest
22
import os
3+
import json
4+
import sys
35

46
import ydb.public.api.protos.draft.fq_pb2 as fq
57
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
@@ -8,6 +10,228 @@
810
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
911
from ydb.tests.fq.generic.utils.settings import Settings
1012

13+
TESTCASES = [
14+
# 0
15+
(
16+
R'''
17+
$input = SELECT * FROM myyds.`{input_topic}`;
18+
19+
$enriched = select
20+
e.Data as data, u.id as lookup
21+
from
22+
$input as e
23+
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
24+
on(e.Data = u.data)
25+
;
26+
27+
insert into myyds.`{output_topic}`
28+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
29+
''',
30+
[
31+
('ydb10', '{"data":"ydb10","lookup":1}'),
32+
('ydb20', '{"data":"ydb20","lookup":2}'),
33+
('ydb30', '{"data":"ydb30","lookup":3}'),
34+
('ydb40', '{"data":"ydb40","lookup":null}'),
35+
('ydb50', '{"data":"ydb50","lookup":null}'),
36+
('ydb10', '{"data":"ydb10","lookup":1}'),
37+
('ydb20', '{"data":"ydb20","lookup":2}'),
38+
('ydb30', '{"data":"ydb30","lookup":3}'),
39+
('ydb40', '{"data":"ydb40","lookup":null}'),
40+
('ydb50', '{"data":"ydb50","lookup":null}'),
41+
]
42+
* 10,
43+
),
44+
# 1
45+
(
46+
R'''
47+
$input = SELECT * FROM myyds.`{input_topic}`;
48+
49+
$enriched = select
50+
e.Data as data, CAST(e.Data AS Int32) as id, u.data as lookup
51+
from
52+
$input as e
53+
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
54+
on(CAST(e.Data AS Int32) = u.id)
55+
;
56+
57+
insert into myyds.`{output_topic}`
58+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
59+
''',
60+
[
61+
('1', '{"data":"1","id":1,"lookup":"ydb10"}'),
62+
('2', '{"data":"2","id":2,"lookup":"ydb20"}'),
63+
('3', '{"data":"3","id":3,"lookup":"ydb30"}'),
64+
('4', '{"data":"4","id":4,"lookup":null}'),
65+
('5', '{"data":"5","id":5,"lookup":null}'),
66+
('1', '{"data":"1","id":1,"lookup":"ydb10"}'),
67+
('2', '{"data":"2","id":2,"lookup":"ydb20"}'),
68+
('3', '{"data":"3","id":3,"lookup":"ydb30"}'),
69+
('4', '{"data":"4","id":4,"lookup":null}'),
70+
('5', '{"data":"5","id":5,"lookup":null}'),
71+
]
72+
* 3,
73+
),
74+
# 2
75+
(
76+
R'''
77+
$input = SELECT * FROM myyds.`{input_topic}`
78+
WITH (
79+
FORMAT=json_each_row,
80+
SCHEMA (
81+
id Int32,
82+
user Int32,
83+
)
84+
) ;
85+
86+
$enriched = select e.id as id,
87+
e.user as user_id,
88+
u.data as lookup
89+
from
90+
$input as e
91+
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
92+
on(e.user = u.id)
93+
;
94+
95+
insert into myyds.`{output_topic}`
96+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
97+
''',
98+
[
99+
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
100+
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
101+
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
102+
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
103+
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
104+
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
105+
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
106+
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
107+
]
108+
* 20,
109+
),
110+
# 3
111+
(
112+
R'''
113+
$input = SELECT * FROM myyds.`{input_topic}`
114+
WITH (
115+
FORMAT=json_each_row,
116+
SCHEMA (
117+
id Int32,
118+
ts String,
119+
ev_type String,
120+
user Int32,
121+
)
122+
) ;
123+
124+
$formatTime = DateTime::Format("%H:%M:%S");
125+
126+
$enriched = select e.id as id,
127+
$formatTime(DateTime::ParseIso8601(e.ts)) as ts,
128+
e.user as user_id,
129+
u.data as lookup
130+
from
131+
$input as e
132+
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
133+
on(e.user = u.id)
134+
;
135+
136+
insert into myyds.`{output_topic}`
137+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
138+
''',
139+
[
140+
(
141+
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
142+
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
143+
),
144+
(
145+
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
146+
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
147+
),
148+
(
149+
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
150+
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
151+
),
152+
(
153+
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
154+
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
155+
),
156+
(
157+
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
158+
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
159+
),
160+
(
161+
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
162+
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
163+
),
164+
(
165+
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
166+
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
167+
),
168+
]
169+
* 10,
170+
),
171+
# 4
172+
(
173+
R'''
174+
$input = SELECT * FROM myyds.`{input_topic}`
175+
WITH (
176+
FORMAT=json_each_row,
177+
SCHEMA (
178+
id Int32,
179+
ts String,
180+
ev_type String,
181+
user Int32,
182+
)
183+
) ;
184+
185+
$formatTime = DateTime::Format("%H:%M:%S");
186+
187+
$enriched = select e.id as id,
188+
$formatTime(DateTime::ParseIso8601(e.ts)) as ts,
189+
e.user as user_id,
190+
u.name as name,
191+
u.age as age
192+
from
193+
$input as e
194+
left join {streamlookup} ydb_conn_{table_name}.`users` as u
195+
on(e.user = u.id)
196+
;
197+
198+
insert into myyds.`{output_topic}`
199+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
200+
''',
201+
[
202+
(
203+
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
204+
'{"id":1,"ts":"11:33:44","user_id":2,"name":"Petr","age":25}',
205+
),
206+
(
207+
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
208+
'{"id":2,"ts":"11:22:33","user_id":1,"name":"Anya","age":15}',
209+
),
210+
(
211+
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
212+
'{"id":3,"ts":"11:33:55","user_id":100,"name":null,"age":null}',
213+
),
214+
(
215+
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
216+
'{"id":4,"ts":"11:33:56","user_id":3,"name":"Masha","age":17}',
217+
),
218+
(
219+
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
220+
'{"id":5,"ts":"11:33:57","user_id":3,"name":"Masha","age":17}',
221+
),
222+
(
223+
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
224+
'{"id":6,"ts":"11:22:38","user_id":1,"name":"Anya","age":15}',
225+
),
226+
(
227+
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
228+
'{"id":7,"ts":"11:33:49","user_id":2,"name":"Petr","age":25}',
229+
),
230+
]
231+
* 1000,
232+
),
233+
]
234+
11235

12236
class TestStreamingJoin(TestYdsBase):
13237
@yq_v1
@@ -59,3 +283,59 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
59283
status = describe_response.result.query.meta.status
60284
assert not describe_response.issues, str(describe_response.issues)
61285
assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status)
286+
287+
@yq_v1
288+
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
289+
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
290+
@pytest.mark.parametrize("streamlookup", [False, True])
291+
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
292+
def test_streamlookup(
293+
self, kikimr, testcase, streamlookup, fq_client: FederatedQueryClient, settings: Settings, yq_version
294+
):
295+
self.init_topics(f"pq_yq_streaming_test_lookup_{streamlookup}{testcase}_{yq_version}")
296+
fq_client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
297+
298+
table_name = 'join_table'
299+
ydb_conn_name = f'ydb_conn_{table_name}'
300+
301+
fq_client.create_ydb_connection(
302+
name=ydb_conn_name,
303+
database_id=settings.ydb.dbname,
304+
)
305+
306+
sql, messages = TESTCASES[testcase]
307+
sql = sql.format(
308+
input_topic=self.input_topic,
309+
output_topic=self.output_topic,
310+
table_name=table_name,
311+
streamlookup=R'/*+ streamlookup() */' if streamlookup else '',
312+
)
313+
314+
query_id = fq_client.create_query(
315+
f"streamlookup_{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
316+
).result.query_id
317+
fq_client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
318+
kikimr.compute_plane.wait_zero_checkpoint(query_id)
319+
320+
offset = 0
321+
while offset < len(messages):
322+
chunk = messages[offset : offset + 500]
323+
self.write_stream(map(lambda x: x[0], chunk))
324+
offset += 500
325+
326+
read_data = self.read_stream(len(messages))
327+
print(streamlookup, testcase, file=sys.stderr)
328+
print(sql, file=sys.stderr)
329+
print(*zip(messages, read_data), file=sys.stderr, sep="\n")
330+
for r, exp in zip(read_data, messages):
331+
r = json.loads(r)
332+
exp = json.loads(exp[1])
333+
assert r == exp
334+
335+
fq_client.abort_query(query_id)
336+
fq_client.wait_query(query_id)
337+
338+
describe_response = fq_client.describe_query(query_id)
339+
status = describe_response.result.query.meta.status
340+
assert not describe_response.issues, str(describe_response.issues)
341+
assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status)

ydb/tests/fq/generic/ydb/01_basic.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,23 @@ set -ex
1818
(2, "ydb20"),
1919
(3, "ydb30");
2020
COMMIT;
21+
CREATE TABLE users (age Int32, id Int32, ip STRING, name STRING, region Int32, PRIMARY KEY(id));
22+
COMMIT;
23+
INSERT INTO users (age, id, ip, name, region) VALUES
24+
(15, 1, "95.106.17.32", "Anya", 213),
25+
(25, 2, "88.78.248.151", "Petr", 225),
26+
(17, 3, "93.94.183.63", "Masha", 1),
27+
(5, 4, "::ffff:193.34.173.188", "Alena", 225),
28+
(15, 5, "93.170.111.29", "Irina", 2),
29+
(13, 6, "93.170.111.28", "Inna", 21),
30+
(33, 7, "::ffff:193.34.173.173", "Ivan", 125),
31+
(45, 8, "::ffff:133.34.173.188", "Asya", 225),
32+
(27, 9, "::ffff:133.34.172.188", "German", 125),
33+
(41, 10, "::ffff:133.34.173.185", "Olya", 225),
34+
(35, 11, "::ffff:193.34.163.188", "Slava", 2),
35+
(56, 12, "2a02:1812:1713:4f00:517e:1d79:c88b:704", "Elena", 2),
36+
(18, 17, "ivalid ip", "newUser", 12);
37+
COMMIT;
2138
'
2239

2340
retVal=$?

ydb/tests/tools/fq_runner/kikimr_runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def fill_gateways_cfg(self, gateways):
146146
gateways['yql_core'] = {}
147147
gateways['yql_core']['flags'] = []
148148
gateways['yql_core']['flags'].append({'name': "_EnableMatchRecognize"})
149+
gateways['yql_core']['flags'].append({'name': "_EnableStreamLookupJoin"})
149150

150151
def fill_storage_config(self, storage, directory):
151152
storage['endpoint'] = os.getenv("YDB_ENDPOINT")

0 commit comments

Comments
 (0)