Skip to content

Commit 64f36f1

Browse files
authored
Base functional tests for vector index (#16709)
1 parent c172b3e commit 64f36f1

File tree

6 files changed

+444
-133
lines changed

6 files changed

+444
-133
lines changed

ydb/tests/stress/oltp_workload/tests/test_workload.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ def setup_class(cls):
1111
config = KikimrConfigGenerator(
1212
extra_feature_flags={
1313
"enable_parameterized_decimal": True,
14-
"enable_table_datetime64" : True
14+
"enable_table_datetime64": True,
15+
"enable_vector_index": True,
1516
}
1617
)
1718
cls.cluster = KiKiMR(config)

ydb/tests/stress/oltp_workload/workload/__init__.py

+3-132
Original file line numberDiff line numberDiff line change
@@ -3,140 +3,11 @@
33
import time
44
import threading
55

6-
from ydb.tests.stress.common.common import WorkloadBase
6+
from ydb.tests.stress.oltp_workload.workload.type.vector_index import WorkloadVectorIndex
7+
from ydb.tests.stress.oltp_workload.workload.type.insert_delete_all_types import WorkloadInsertDeleteAllTypes
78

89
ydb.interceptor.monkey_patch_event_handler()
910

10-
digits = 2 # should be consistent with format string below
11-
pk_types = {
12-
"Int64": "CAST({} AS Int64)",
13-
"Uint64": "CAST({} AS Uint64)",
14-
"Int32": "CAST({} AS Int32)",
15-
"Uint32": "CAST({} AS Uint32)",
16-
"Int16": "CAST({} AS Int16)",
17-
"Uint16": "CAST({} AS Uint16)",
18-
"Int8": "CAST({} AS Int8)",
19-
"Uint8": "CAST({} AS Uint8)",
20-
"Bool": "CAST({} AS Bool)",
21-
"Decimal(15,0)": "CAST('{}.0' AS Decimal(15,0))",
22-
"Decimal(22,9)": "CAST('{}.123' AS Decimal(22,9))",
23-
"Decimal(35,10)": "CAST('{}.123456' AS Decimal(35,10))",
24-
"DyNumber": "CAST('{}E1' AS DyNumber)",
25-
26-
"String": "'String {}'",
27-
"Utf8": "'Uft8 {}'",
28-
"Uuid": "CAST('{:2}345678-e89b-12d3-a456-556642440000' AS UUID)",
29-
30-
"Date": "CAST('20{:02}-01-01' AS Date)",
31-
"Datetime": "CAST('20{:02}-10-02T11:00:00Z' AS Datetime)",
32-
"Timestamp": "CAST(169624{:02}00000000 AS Timestamp)",
33-
"Interval": "CAST({} AS Interval)",
34-
"Date32": "CAST('20{:02}-01-01' AS Date32)",
35-
"Datetime64": "CAST('20{:02}-10-02T11:00:00Z' AS Datetime64)",
36-
"Timestamp64": "CAST(169624{:02}00000000 AS Timestamp64)",
37-
"Interval64": "CAST({} AS Interval64)"
38-
}
39-
40-
non_pk_types = {
41-
"Float": "CAST('{}.1' AS Float)",
42-
"Double": "CAST('{}.2' AS Double)",
43-
"Json": "CAST('{{\"another_key\":{}}}' AS Json)",
44-
"JsonDocument": "CAST('{{\"another_doc_key\":{}}}' AS JsonDocument)",
45-
"Yson": "CAST('[{}]' AS Yson)"
46-
}
47-
48-
null_types = {
49-
"Int64": "CAST({} AS Int64)",
50-
"Decimal(22,9)": "CAST('{}.123' AS Decimal(22,9))",
51-
"Decimal(35,10)": "CAST('{}.123456' AS Decimal(35,10))",
52-
"String": "'{}'",
53-
}
54-
55-
56-
def cleanup_type_name(type_name):
57-
return type_name.replace('(', '').replace(')', '').replace(',', '')
58-
59-
60-
class WorkloadInsertDeleteAllTypes(WorkloadBase):
61-
def __init__(self, client, prefix, stop):
62-
super().__init__(client, prefix, "insert_delete_all_types", stop)
63-
self.inserted = 0
64-
self.table_name = "table"
65-
self.lock = threading.Lock()
66-
67-
def get_stat(self):
68-
with self.lock:
69-
return f"Inserted: {self.inserted}"
70-
71-
def _loop(self):
72-
table_path = self.get_table_path(self.table_name)
73-
create_sql = f"""
74-
CREATE TABLE `{table_path}` (
75-
pk Uint64,
76-
{", ".join(["pk_" + cleanup_type_name(type_name) + " " + type_name for type_name in pk_types.keys()])},
77-
{", ".join(["null_pk_" + cleanup_type_name(type_name) + " " + type_name for type_name in null_types.keys()])},
78-
{", ".join(["col_" + cleanup_type_name(type_name) + " " + type_name for type_name in non_pk_types.keys()])},
79-
{", ".join(["null_col_" + cleanup_type_name(type_name) + " " + type_name for type_name in null_types.keys()])},
80-
PRIMARY KEY(
81-
{", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])},
82-
{", ".join(["null_pk_" + cleanup_type_name(type_name) for type_name in null_types.keys()])}
83-
)
84-
)
85-
"""
86-
87-
self.client.query(create_sql, True,)
88-
inflight = 10
89-
i = 0
90-
sum = 0
91-
while not self.is_stop_requested():
92-
value = i % 100
93-
insert_sql = f"""
94-
INSERT INTO `{table_path}` (
95-
pk,
96-
{", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])},
97-
{", ".join(["null_pk_" + cleanup_type_name(type_name) for type_name in null_types.keys()])},
98-
{", ".join(["col_" + cleanup_type_name(type_name) for type_name in non_pk_types.keys()])},
99-
{", ".join(["null_col_" + cleanup_type_name(type_name) for type_name in null_types.keys()])}
100-
)
101-
VALUES
102-
(
103-
{i},
104-
{", ".join([pk_types[type_name].format(value) for type_name in pk_types.keys()])},
105-
{", ".join(['NULL' for type_name in null_types.keys()])},
106-
{", ".join([non_pk_types[type_name].format(value) for type_name in non_pk_types.keys()])},
107-
{", ".join(['NULL' for type_name in null_types.keys()])}
108-
)
109-
;
110-
"""
111-
self.client.query(insert_sql, False,)
112-
sum += i
113-
114-
if (i >= inflight):
115-
self.client.query(
116-
f"""
117-
DELETE FROM `{table_path}`
118-
WHERE pk == {i - inflight} AND null_pk_Int64 IS NULL
119-
""",
120-
False,
121-
)
122-
sum -= (i - inflight)
123-
124-
actual = self.client.query(
125-
f"""
126-
SELECT COUNT(*) as cnt, SUM(pk) as sum FROM `{table_path}`
127-
""",
128-
False,
129-
)[0].rows[0]
130-
expected = {"cnt": inflight, "sum": sum}
131-
if actual != expected:
132-
raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}")
133-
i += 1
134-
with self.lock:
135-
self.inserted += 1
136-
137-
def get_workload_thread_funcs(self):
138-
return [self._loop]
139-
14011

14112
class WorkloadRunner:
14213
def __init__(self, client, path, duration):
@@ -160,9 +31,9 @@ def _cleanup(self):
16031

16132
def run(self):
16233
stop = threading.Event()
163-
16434
workloads = [
16535
WorkloadInsertDeleteAllTypes(self.client, self.name, stop),
36+
WorkloadVectorIndex(self.client, self.name, stop)
16637
]
16738

16839
for w in workloads:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import threading
2+
from ydb.tests.stress.common.common import WorkloadBase
3+
4+
5+
digits = 2 # should be consistent with format string below
6+
pk_types = {
7+
"Int64": "CAST({} AS Int64)",
8+
"Uint64": "CAST({} AS Uint64)",
9+
"Int32": "CAST({} AS Int32)",
10+
"Uint32": "CAST({} AS Uint32)",
11+
"Int16": "CAST({} AS Int16)",
12+
"Uint16": "CAST({} AS Uint16)",
13+
"Int8": "CAST({} AS Int8)",
14+
"Uint8": "CAST({} AS Uint8)",
15+
"Bool": "CAST({} AS Bool)",
16+
"Decimal(15,0)": "CAST('{}.0' AS Decimal(15,0))",
17+
"Decimal(22,9)": "CAST('{}.123' AS Decimal(22,9))",
18+
"Decimal(35,10)": "CAST('{}.123456' AS Decimal(35,10))",
19+
"DyNumber": "CAST('{}E1' AS DyNumber)",
20+
21+
"String": "'String {}'",
22+
"Utf8": "'Uft8 {}'",
23+
"Uuid": "CAST('{:2}345678-e89b-12d3-a456-556642440000' AS UUID)",
24+
25+
"Date": "CAST('20{:02}-01-01' AS Date)",
26+
"Datetime": "CAST('20{:02}-10-02T11:00:00Z' AS Datetime)",
27+
"Timestamp": "CAST(169624{:02}00000000 AS Timestamp)",
28+
"Interval": "CAST({} AS Interval)",
29+
"Date32": "CAST('20{:02}-01-01' AS Date32)",
30+
"Datetime64": "CAST('20{:02}-10-02T11:00:00Z' AS Datetime64)",
31+
"Timestamp64": "CAST(169624{:02}00000000 AS Timestamp64)",
32+
"Interval64": "CAST({} AS Interval64)"
33+
}
34+
35+
non_pk_types = {
36+
"Float": "CAST('{}.1' AS Float)",
37+
"Double": "CAST('{}.2' AS Double)",
38+
"Json": "CAST('{{\"another_key\":{}}}' AS Json)",
39+
"JsonDocument": "CAST('{{\"another_doc_key\":{}}}' AS JsonDocument)",
40+
"Yson": "CAST('[{}]' AS Yson)"
41+
}
42+
43+
null_types = {
44+
"Int64": "CAST({} AS Int64)",
45+
"Decimal(22,9)": "CAST('{}.123' AS Decimal(22,9))",
46+
"Decimal(35,10)": "CAST('{}.123456' AS Decimal(35,10))",
47+
"String": "'{}'",
48+
}
49+
50+
51+
def cleanup_type_name(type_name):
52+
return type_name.replace('(', '').replace(')', '').replace(',', '')
53+
54+
55+
class WorkloadInsertDeleteAllTypes(WorkloadBase):
56+
def __init__(self, client, prefix, stop):
57+
super().__init__(client, prefix, "insert_delete_all_types", stop)
58+
self.inserted = 0
59+
self.table_name = "table"
60+
self.lock = threading.Lock()
61+
62+
def get_stat(self):
63+
with self.lock:
64+
return f"Inserted: {self.inserted}"
65+
66+
def _loop(self):
67+
table_path = self.get_table_path(self.table_name)
68+
create_sql = f"""
69+
CREATE TABLE `{table_path}` (
70+
pk Uint64,
71+
{", ".join(["pk_" + cleanup_type_name(type_name) + " " + type_name for type_name in pk_types.keys()])},
72+
{", ".join(["null_pk_" + cleanup_type_name(type_name) + " " + type_name for type_name in null_types.keys()])},
73+
{", ".join(["col_" + cleanup_type_name(type_name) + " " + type_name for type_name in non_pk_types.keys()])},
74+
{", ".join(["null_col_" + cleanup_type_name(type_name) + " " + type_name for type_name in null_types.keys()])},
75+
PRIMARY KEY(
76+
{", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])},
77+
{", ".join(["null_pk_" + cleanup_type_name(type_name) for type_name in null_types.keys()])}
78+
)
79+
)
80+
"""
81+
82+
self.client.query(create_sql, True,)
83+
inflight = 10
84+
i = 0
85+
sum = 0
86+
while not self.is_stop_requested():
87+
value = i % 100
88+
insert_sql = f"""
89+
INSERT INTO `{table_path}` (
90+
pk,
91+
{", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])},
92+
{", ".join(["null_pk_" + cleanup_type_name(type_name) for type_name in null_types.keys()])},
93+
{", ".join(["col_" + cleanup_type_name(type_name) for type_name in non_pk_types.keys()])},
94+
{", ".join(["null_col_" + cleanup_type_name(type_name) for type_name in null_types.keys()])}
95+
)
96+
VALUES
97+
(
98+
{i},
99+
{", ".join([pk_types[type_name].format(value) for type_name in pk_types.keys()])},
100+
{", ".join(['NULL' for type_name in null_types.keys()])},
101+
{", ".join([non_pk_types[type_name].format(value) for type_name in non_pk_types.keys()])},
102+
{", ".join(['NULL' for type_name in null_types.keys()])}
103+
)
104+
;
105+
"""
106+
self.client.query(insert_sql, False,)
107+
sum += i
108+
109+
if (i >= inflight):
110+
self.client.query(
111+
f"""
112+
DELETE FROM `{table_path}`
113+
WHERE pk == {i - inflight} AND null_pk_Int64 IS NULL
114+
""",
115+
False,
116+
)
117+
sum -= (i - inflight)
118+
119+
actual = self.client.query(
120+
f"""
121+
SELECT COUNT(*) as cnt, SUM(pk) as sum FROM `{table_path}`
122+
""",
123+
False,
124+
)[0].rows[0]
125+
expected = {"cnt": inflight, "sum": sum}
126+
if actual != expected:
127+
raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}")
128+
i += 1
129+
with self.lock:
130+
self.inserted += 1
131+
132+
def get_workload_thread_funcs(self):
133+
return [self._loop]

0 commit comments

Comments
 (0)