Skip to content

Commit b56212c

Browse files
authored
get_metering(): added async support (v2) (#3856)
1 parent 7df3337 commit b56212c

File tree

7 files changed

+96
-93
lines changed

7 files changed

+96
-93
lines changed

ydb/tests/fq/plans/test_stats_mode.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,21 @@ def test_mode(self, kikimr, s3, client, stats_mode):
2626
kikimr.control_plane.wait_bootstrap(1)
2727
client.create_storage_connection("pb", "pbucket")
2828

29-
sql = R'''
29+
sql = '''
3030
insert into pb.`path/` with (format=csv_with_names)
3131
select * from AS_TABLE([<|foo:1, bar:"xxx"u|>,<|foo:2, bar:"yyy"u|>]);
3232
'''
3333
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
3434
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
3535

36-
sql = R'''
36+
sql = '''
3737
insert into pb.`path/` with (format=csv_with_names)
3838
select * from AS_TABLE([<|foo:3, bar:"xxx"u|>,<|foo:4, bar:"yyy"u|>]);
3939
'''
4040
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
4141
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
4242

43-
sql = R'''
43+
sql = '''
4444
select bar, count(foo) as foo_count, sum(foo) as foo_sum
4545
from pb.`path/` with (format=csv_with_names, schema(
4646
foo Int NOT NULL,
@@ -59,4 +59,4 @@ def test_mode(self, kikimr, s3, client, stats_mode):
5959
assert len(result_set.rows) == 2
6060
# assert result_set.rows[0].items[0].uint64_value == 1024 * 10
6161
# 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc
62-
# assert sum(kikimr.control_plane.get_metering()) == 11
62+
# assert sum(kikimr.control_plane.get_metering(1)) == 11

ydb/tests/fq/s3/test_insert.py

+30-30
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,21 @@ def test_insert(self, kikimr, s3, client, format, dataset_name, unique_prefix):
3838
storage_connection_name = unique_prefix + "ibucket"
3939
client.create_storage_connection(storage_connection_name, "insert_bucket")
4040

41-
sql = R'''
42-
insert into `{}`.`{}/` with (format={})
41+
sql = f'''
42+
insert into `{storage_connection_name}`.`{dataset_name}/` with (format={format})
4343
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
44-
'''.format(storage_connection_name, dataset_name, format)
44+
'''
4545

4646
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
4747
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
4848
prefix = client.describe_query(query_id).result.query.meta.last_job_id.split("-")[0] # cut _<query_id> part
4949

50-
sql = R'''
51-
select foo, bar from {0}.`{1}/{3}*` with (format={2}, schema(
50+
sql = f'''
51+
select foo, bar from {storage_connection_name}.`{dataset_name}/{prefix}*` with (format={format}, schema(
5252
foo Int NOT NULL,
5353
bar String NOT NULL
5454
))
55-
'''.format(storage_connection_name, dataset_name, format, prefix)
55+
'''
5656

5757
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
5858
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
@@ -70,7 +70,7 @@ def test_insert(self, kikimr, s3, client, format, dataset_name, unique_prefix):
7070
assert result_set.rows[0].items[1].bytes_value == b'xxx'
7171
assert result_set.rows[1].items[0].int32_value == 456
7272
assert result_set.rows[1].items[1].bytes_value == b'yyy'
73-
assert sum(kikimr.control_plane.get_metering()) == 20
73+
assert sum(kikimr.control_plane.get_metering(1)) == 20
7474

7575
@yq_all
7676
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -93,7 +93,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):
9393
aws_secret_access_key="secret_key"
9494
)
9595

96-
taxi = R'''VendorID'''
96+
taxi = '''VendorID'''
9797
for i in range(37):
9898
taxi += "\n" + str(i)
9999
s3_client.put_object(Body=taxi, Bucket='big_data_bucket', Key='src/taxi.csv', ContentType='text/plain')
@@ -119,7 +119,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):
119119

120120
client.create_storage_connection("ibucket", "insert_bucket")
121121

122-
sql = fR'''
122+
sql = f'''
123123
pragma s3.JsonListSizeLimit="10";
124124
INSERT INTO bindings.`{storage_sink_binding_name}`
125125
SELECT
@@ -130,7 +130,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):
130130
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
131131
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
132132

133-
sql = fR'''
133+
sql = f'''
134134
135135
SELECT
136136
count(*)
@@ -148,7 +148,7 @@ def test_big_json_list_insert(self, kikimr, s3, client, unique_prefix):
148148
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
149149
assert len(result_set.rows) == 1
150150
assert result_set.rows[0].items[0].uint64_value == 37
151-
assert sum(kikimr.control_plane.get_metering()) == 20
151+
assert sum(kikimr.control_plane.get_metering(1)) == 20
152152

153153
@yq_all
154154
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -167,7 +167,7 @@ def test_insert_csv_delimiter(self, kikimr, s3, client, unique_prefix):
167167
storage_connection_name = unique_prefix + "ibucket"
168168
client.create_storage_connection(storage_connection_name, "insert_bucket")
169169

170-
sql = fR'''
170+
sql = f'''
171171
insert into `{storage_connection_name}`.`csv_delim_out/` with (
172172
format=csv_with_names,
173173
csv_delimiter=";"
@@ -179,11 +179,11 @@ def test_insert_csv_delimiter(self, kikimr, s3, client, unique_prefix):
179179
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
180180
prefix = "" # client.describe_query(query_id).result.query.meta.last_job_id.split("-")[0] # cut _<query_id> part
181181

182-
sql = R'''
183-
select data from `{}`.`csv_delim_out/{}*` with (format=raw, schema(
182+
sql = f'''
183+
select data from `{storage_connection_name}`.`csv_delim_out/{prefix}*` with (format=raw, schema(
184184
data String NOT NULL
185185
))
186-
'''.format(storage_connection_name, prefix)
186+
'''
187187

188188
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
189189
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
@@ -196,7 +196,7 @@ def test_insert_csv_delimiter(self, kikimr, s3, client, unique_prefix):
196196
assert result_set.columns[0].type.type_id == ydb.Type.STRING
197197
assert len(result_set.rows) == 1
198198
assert result_set.rows[0].items[0].bytes_value == b'"bar";"foo"\n"xxx";123\n"yyy";456\n'
199-
assert sum(kikimr.control_plane.get_metering()) == 20
199+
assert sum(kikimr.control_plane.get_metering(1)) == 20
200200

201201
@yq_all
202202
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -215,23 +215,23 @@ def test_append(self, kikimr, s3, client, unique_prefix):
215215
storage_connection_name = unique_prefix + "abucket"
216216
client.create_storage_connection(storage_connection_name, "append_bucket")
217217

218-
sql = fR'''
218+
sql = f'''
219219
insert into `{storage_connection_name}`.`append/` with (format=json_each_row)
220220
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
221221
'''
222222

223223
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
224224
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
225225

226-
sql = fR'''
226+
sql = f'''
227227
insert into `{storage_connection_name}`.`append/` with (format=json_each_row)
228228
select * from AS_TABLE([<|foo:345, bar:"zzz"u|>]);
229229
'''
230230

231231
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
232232
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
233233

234-
sql = fR'''
234+
sql = f'''
235235
select foo, bar from `{storage_connection_name}`.`append/` with (format=json_each_row, schema(
236236
foo Int NOT NULL,
237237
bar String NOT NULL
@@ -256,7 +256,7 @@ def test_append(self, kikimr, s3, client, unique_prefix):
256256
assert result_set.rows[1].items[1].bytes_value == b'zzz'
257257
assert result_set.rows[2].items[0].int32_value == 456
258258
assert result_set.rows[2].items[1].bytes_value == b'yyy'
259-
assert sum(kikimr.control_plane.get_metering()) == 30
259+
assert sum(kikimr.control_plane.get_metering(1)) == 30
260260

261261
@yq_all
262262
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -275,15 +275,15 @@ def test_part_split(self, kikimr, s3, client, unique_prefix):
275275
storage_connection_name = unique_prefix + "sbucket"
276276
client.create_storage_connection(storage_connection_name, "split_bucket")
277277

278-
sql = fR'''
278+
sql = f'''
279279
insert into `{storage_connection_name}`.`part/` with (format=json_each_row, partitioned_by=(foo, bar))
280280
select * from AS_TABLE([<|foo:123, bar:"xxx"u, data:3.14|>,<|foo:456, bar:"yyy"u, data:2.72|>,<|foo:123, bar:"xxx"u, data:1.41|>]);
281281
'''
282282

283283
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
284284
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
285285

286-
sql = fR'''
286+
sql = f'''
287287
select data from `{storage_connection_name}`.`part/foo=123/bar=xxx/` with (format=json_each_row, schema(
288288
data Float NOT NULL,
289289
))
@@ -301,7 +301,7 @@ def test_part_split(self, kikimr, s3, client, unique_prefix):
301301
assert len(result_set.rows) == 2
302302
assert abs(result_set.rows[0].items[0].float_value - 3.14) < 0.01
303303
assert abs(result_set.rows[1].items[0].float_value - 1.41) < 0.01
304-
assert sum(kikimr.control_plane.get_metering()) == 20
304+
assert sum(kikimr.control_plane.get_metering(1)) == 20
305305

306306
@yq_all
307307
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -320,23 +320,23 @@ def test_part_merge(self, kikimr, s3, client, unique_prefix):
320320
storage_connection_name = unique_prefix + "mbucket"
321321
client.create_storage_connection(storage_connection_name, "merge_bucket")
322322

323-
sql = fR'''
323+
sql = f'''
324324
insert into `{storage_connection_name}`.`part/foo=123/bar=xxx/` with (format=json_each_row)
325325
select * from AS_TABLE([<|data:3.14|>,<|data:1.41|>]);
326326
'''
327327

328328
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
329329
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
330330

331-
sql = fR'''
331+
sql = f'''
332332
insert into `{storage_connection_name}`.`part/foo=456/bar=yyy/` with (format=json_each_row)
333333
select * from AS_TABLE([<|data:2.72|>]);
334334
'''
335335

336336
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
337337
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
338338

339-
sql = fR'''
339+
sql = f'''
340340
select foo, bar, data from `{storage_connection_name}`.`part` with (format=json_each_row, partitioned_by=(foo, bar), schema(
341341
foo Int NOT NULL,
342342
bar String NOT NULL,
@@ -367,7 +367,7 @@ def test_part_merge(self, kikimr, s3, client, unique_prefix):
367367
assert result_set.rows[2].items[0].int32_value == 456
368368
assert result_set.rows[2].items[1].bytes_value == b'yyy'
369369
assert abs(result_set.rows[2].items[2].float_value - 2.72) < 0.01
370-
assert sum(kikimr.control_plane.get_metering()) == 30
370+
assert sum(kikimr.control_plane.get_metering(1)) == 30
371371

372372
@yq_all
373373
@pytest.mark.parametrize("format", ["json_list", "json_each_row", "csv_with_names"])
@@ -403,15 +403,15 @@ def test_part_binding(self, kikimr, s3, client, format, unique_prefix):
403403
"file_pattern": "*{json,csv}"
404404
})
405405

406-
sql = fR'''
406+
sql = f'''
407407
insert into bindings.`{storage_binding_name}`
408408
select * from AS_TABLE([<|foo:123, bar:"xxx"u, data:3.14|>,<|foo:456, bar:"yyy"u, data:2.72|>,<|foo:123, bar:"xxx"u, data:1.41|>]);
409409
'''
410410

411411
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
412412
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
413413

414-
sql = fR'''
414+
sql = f'''
415415
select foo, bar, data from bindings.`{storage_binding_name}` order by foo, data
416416
'''
417417

@@ -438,7 +438,7 @@ def test_part_binding(self, kikimr, s3, client, format, unique_prefix):
438438
assert result_set.rows[2].items[0].int32_value == 456
439439
assert result_set.rows[2].items[1].text_value == 'yyy'
440440
assert abs(result_set.rows[2].items[2].double_value - 2.72) < 0.01
441-
assert sum(kikimr.control_plane.get_metering()) == 20
441+
assert sum(kikimr.control_plane.get_metering(1)) == 20
442442

443443
@yq_v1
444444
@pytest.mark.parametrize("format", ["json_each_row", "csv_with_names", "tsv_with_names", "parquet"])

ydb/tests/fq/s3/test_s3.py

+12-12
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def test_csv(self, kikimr, s3, client, runtime_listing, unique_prefix):
3636
aws_secret_access_key="secret_key"
3737
)
3838

39-
fruits = R'''Fruit,Price,Weight
39+
fruits = '''Fruit,Price,Weight
4040
Banana,3,100
4141
Apple,2,22
4242
Pear,15,33'''
@@ -81,7 +81,7 @@ def test_csv(self, kikimr, s3, client, runtime_listing, unique_prefix):
8181
assert result_set.rows[2].items[0].bytes_value == b"Pear"
8282
assert result_set.rows[2].items[1].int32_value == 15
8383
assert result_set.rows[2].items[2].int32_value == 33
84-
assert sum(kikimr.control_plane.get_metering()) == 10
84+
assert sum(kikimr.control_plane.get_metering(1)) == 10
8585

8686
@yq_all
8787
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -104,7 +104,7 @@ def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):
104104
aws_secret_access_key="secret_key"
105105
)
106106

107-
fruits = R'''Time,Fruit,Price
107+
fruits = '''Time,Fruit,Price
108108
0,Banana,3
109109
1,Apple,2
110110
2,Pear,15'''
@@ -195,7 +195,7 @@ def test_raw(self, kikimr, s3, client, runtime_listing, yq_version, unique_prefi
195195
assert result_set.rows[0].items[0].bytes_value == b"text3"
196196
assert result_set.rows[1].items[0].bytes_value == b"text2"
197197
assert result_set.rows[2].items[0].bytes_value == b"text1"
198-
assert sum(kikimr.control_plane.get_metering()) == 10
198+
assert sum(kikimr.control_plane.get_metering(1)) == 10
199199

200200
@yq_all
201201
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -491,7 +491,7 @@ def test_write_result(self, kikimr, s3, client, unique_prefix):
491491
assert result_set.columns[2].name == "Weight"
492492
assert result_set.columns[2].type.type_id == ydb.Type.INT64
493493
assert len(result_set.rows) == 9
494-
assert sum(kikimr.control_plane.get_metering()) == 10
494+
assert sum(kikimr.control_plane.get_metering(1)) == 10
495495

496496
@yq_all
497497
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -553,7 +553,7 @@ def test_precompute(self, kikimr, s3, client, runtime_listing, unique_prefix):
553553
assert result_set.rows[0].items[0].uint64_value == 1
554554
assert result_set.rows[1].items[0].uint64_value == 1
555555
assert result_set.rows[2].items[0].uint64_value == 1
556-
assert sum(kikimr.control_plane.get_metering()) == 10
556+
assert sum(kikimr.control_plane.get_metering(1)) == 10
557557

558558
@yq_all
559559
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -719,7 +719,7 @@ def test_simple_hits_47(self, kikimr, s3, client, runtime_listing, unique_prefix
719719
assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.DOUBLE
720720
assert len(result_set.rows) == 1
721721
assert result_set.rows[0].items[0].double_value == 3
722-
assert sum(kikimr.control_plane.get_metering()) == 10
722+
assert sum(kikimr.control_plane.get_metering(1)) == 10
723723

724724
@yq_all
725725
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -749,7 +749,7 @@ def test_i18n_unpartitioned(self, kikimr, s3, client, raw, path_pattern, runtime
749749
i18n_directory = 'dataset/こんにちは/'
750750
i18n_name = i18n_directory + 'fruitand&+ %непечатное.csv'
751751

752-
fruits = R'''Data
752+
fruits = '''Data
753753
101
754754
102
755755
103'''
@@ -788,7 +788,7 @@ def test_i18n_unpartitioned(self, kikimr, s3, client, raw, path_pattern, runtime
788788
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
789789
assert len(result_set.rows) == 1
790790
assert result_set.rows[0].items[0].uint64_value == 1 if raw else 3
791-
assert sum(kikimr.control_plane.get_metering()) == 10
791+
assert sum(kikimr.control_plane.get_metering(1)) == 10
792792

793793
@yq_all
794794
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -817,7 +817,7 @@ def test_i18n_partitioning(self, kikimr, s3, client, raw, partitioning, runtime_
817817

818818
i18n_name = 'fruit and &{+}% непечатное.csv'
819819

820-
fruits = R'''Data
820+
fruits = '''Data
821821
101
822822
102
823823
103'''
@@ -883,7 +883,7 @@ def test_i18n_partitioning(self, kikimr, s3, client, raw, partitioning, runtime_
883883
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
884884
assert len(result_set.rows) == 1
885885
assert result_set.rows[0].items[0].uint64_value == 2 if raw else 6
886-
assert sum(kikimr.control_plane.get_metering()) == 10
886+
assert sum(kikimr.control_plane.get_metering(1)) == 10
887887

888888
@yq_all
889889
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@@ -934,4 +934,4 @@ def test_huge_source(self, kikimr, s3, client, runtime_listing, unique_prefix):
934934
assert len(result_set.rows) == 1
935935
assert result_set.rows[0].items[0].uint64_value == 1024 * 10
936936
# 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc
937-
assert sum(kikimr.control_plane.get_metering()) == 21
937+
assert sum(kikimr.control_plane.get_metering(1)) == 21

0 commit comments

Comments
 (0)