Skip to content

Commit 64afe2b

Browse files
authored
Merge f80b373 into 1f915f6
2 parents 1f915f6 + f80b373 commit 64afe2b

File tree

5 files changed

+267
-0
lines changed

5 files changed

+267
-0
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import logging
2+
from .base import ColumnFamilyTestBase, ColumnTableHelper
3+
from typing import Callable
4+
from ydb.tests.library.common.helpers import plain_or_under_sanitizer
5+
from ydb.tests.olap.scenario.helpers.thread_helper import TestThread
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class TestAlterCompression(ColumnFamilyTestBase):
11+
class_name = "alter_compression"
12+
single_upsert_row_count: int = 10**5
13+
count_upsert: int = 10
14+
15+
@classmethod
16+
def setup_class(cls):
17+
super(TestAlterCompression, cls).setup_class()
18+
19+
def test_all_supported_compression(self):
20+
test_name: str = "all_supported_compression"
21+
test_dir: str = f"{self.ydb_client.database}/{self.class_name}/{test_name}"
22+
tables_path: list[str] = [
23+
f"{test_dir}/off_compression",
24+
f"{test_dir}/lz4_compression",
25+
f"{test_dir}/zstd_compression",
26+
]
27+
add_defaut_family: Callable[[str], str] = lambda settings: f"FAMILY default ({settings})"
28+
tables_family: list[str] = [
29+
add_defaut_family('COMPRESSION = "off"'),
30+
add_defaut_family('COMPRESSION = "lz4"'),
31+
add_defaut_family('COMPRESSION = "zstd"'),
32+
]
33+
34+
for i in range(2, 22):
35+
tables_path.append(f"{test_dir}/zstd_{i}_compression")
36+
tables_family.append(add_defaut_family(f'COMPRESSION = "zstd", COMPRESSION_LEVEL = {i}'))
37+
38+
assert len(tables_path) == len(tables_family)
39+
40+
tables: list[ColumnTableHelper] = []
41+
for i in range(len(tables_path)):
42+
self.ydb_client.query(
43+
f"""
44+
CREATE TABLE `{tables_path[i]}` (
45+
value Uint64 NOT NULL,
46+
value1 Uint64,
47+
PRIMARY KEY(value),
48+
{tables_family[i]}
49+
)
50+
WITH (STORE = COLUMN)
51+
"""
52+
)
53+
logger.info(f"Table {tables_path[i]} created")
54+
tables.append(ColumnTableHelper(self.ydb_client, tables_path[i]))
55+
56+
def upsert_and_wait_portions(table: ColumnTableHelper, number_rows_for_insert: int, count_upsert: int):
57+
prev_number_rows: int = table.get_row_count()
58+
for _ in range(count_upsert):
59+
self.ydb_client.query(
60+
"""
61+
$row_count = %i;
62+
$prev_count = %i;
63+
$rows= ListMap(ListFromRange(0, $row_count), ($i) -> {
64+
return <|
65+
value: $i + $prev_count,
66+
value1: $i + $prev_count,
67+
|>;
68+
});
69+
UPSERT INTO `%s`
70+
SELECT * FROM AS_TABLE($rows);
71+
"""
72+
% (number_rows_for_insert, prev_number_rows, table.path)
73+
)
74+
prev_number_rows += number_rows_for_insert
75+
logger.info(
76+
f"{prev_number_rows} rows inserted in {table.path}. portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}"
77+
)
78+
assert table.get_row_count() == prev_number_rows
79+
80+
if not self.wait_for(
81+
lambda: len(table.get_portion_stat_by_tier()) != 0, plain_or_under_sanitizer(70, 140)
82+
):
83+
raise Exception("not all portions have been updated")
84+
85+
if not self.wait_for(
86+
lambda: table.get_portion_stat_by_tier()['__DEFAULT']['Rows'] == self.single_upsert_row_count * self.count_upsert, plain_or_under_sanitizer(70, 140)
87+
):
88+
raise Exception("not all portions have been updated")
89+
90+
assert len(tables) == len(tables_path)
91+
92+
tasks: list[TestThread] = []
93+
for table in tables:
94+
tasks.append(TestThread(target=upsert_and_wait_portions, args=[table, self.single_upsert_row_count, self.count_upsert]))
95+
96+
for task in tasks:
97+
task.start()
98+
99+
for task in tasks:
100+
task.join()
101+
102+
volumes_without_compression: tuple[int, int] = tables[0].get_volumes_column("value")
103+
for table in tables:
104+
assert table.get_portion_stat_by_tier()['__DEFAULT']['Rows'] == self.single_upsert_row_count * self.count_upsert
105+
assert self.count_upsert * self.single_upsert_row_count * 8 == volumes_without_compression[0]
106+
107+
for i in range(1, len(tables_path)):
108+
volumes: tuple[int, int] = tables[i].get_volumes_column("value")
109+
koef: float = volumes_without_compression[1] / volumes[1]
110+
logging.info(
111+
f"compression in `{tables[i].path}` {volumes_without_compression[1]} / {volumes[1]}: {koef}"
112+
)
113+
assert koef > 1
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import yatest.common
2+
import os
3+
import time
4+
import ydb
5+
import logging
6+
7+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
8+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
9+
from ydb.tests.library.harness.util import LogLevels
10+
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class YdbClient:
16+
def __init__(self, endpoint, database):
17+
self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None)
18+
self.database = database
19+
self.session_pool = ydb.QuerySessionPool(self.driver)
20+
21+
def stop(self):
22+
self.session_pool.stop()
23+
self.driver.stop()
24+
25+
def wait_connection(self, timeout=5):
26+
self.driver.wait(timeout, fail_fast=True)
27+
28+
def query(self, statement):
29+
return self.session_pool.execute_with_retries(statement)
30+
31+
32+
class ColumnTableHelper:
33+
def __init__(self, ydb_client: YdbClient, path: str):
34+
self.ydb_client = ydb_client
35+
self.path = path
36+
37+
def get_row_count(self) -> int:
38+
count_row: int = 0
39+
result_set = self.ydb_client.query(f"SELECT COUNT(*) AS Rows FROM `{self.path}`")
40+
for result in result_set:
41+
for row in result.rows:
42+
count_row += row["Rows"]
43+
return count_row
44+
45+
def get_portion_stat_by_tier(self) -> dict[str, dict[str, int]]:
46+
results = self.ydb_client.query(
47+
f"select TierName, sum(Rows) as Rows, count(*) as Portions from `{self.path}/.sys/primary_index_portion_stats` group by TierName"
48+
)
49+
return {
50+
row["TierName"]: {"Rows": row["Rows"], "Portions": row["Portions"]}
51+
for result_set in results
52+
for row in result_set.rows
53+
}
54+
55+
def get_blob_stat_by_tier(self) -> dict[str, (int, int)]:
56+
stmt = f"""
57+
select TierName, count(*) as Portions, sum(BlobSize) as BlobSize, sum(BlobCount) as BlobCount from (
58+
select TabletId, PortionId, TierName, sum(BlobRangeSize) as BlobSize, count(*) as BlobCount from `{self.path}/.sys/primary_index_stats` group by TabletId, PortionId, TierName
59+
) group by TierName
60+
"""
61+
results = self.ydb_client.query(stmt)
62+
return {
63+
row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]}
64+
for result_set in results
65+
for row in result_set.rows
66+
}
67+
68+
def _coollect_volumes_column(self, column_name: str) -> tuple[int, int]:
69+
query = f'SELECT * FROM `{self.path}/.sys/primary_index_stats` WHERE Activity == 1 AND EntityName = \"{column_name}\"'
70+
result_set = self.ydb_client.query(query)
71+
raw_bytes, bytes = 0, 0
72+
for result in result_set:
73+
for rows in result.rows:
74+
raw_bytes += rows["RawBytes"]
75+
bytes += rows["BlobRangeSize"]
76+
return raw_bytes, bytes
77+
78+
def get_volumes_column(self, column_name: str) -> tuple[int, int]:
79+
pred_raw_bytes, pred_bytes = 0, 0
80+
raw_bytes, bytes = self._coollect_volumes_column(column_name)
81+
while pred_raw_bytes != raw_bytes and pred_bytes != bytes:
82+
pred_raw_bytes = raw_bytes
83+
pred_bytes = bytes
84+
time.sleep(10)
85+
raw_bytes, bytes = self._coollect_volumes_column(column_name)
86+
logging.info(f"Table `{self.path}`, volumes `{column_name}` ({raw_bytes}, {bytes})")
87+
return raw_bytes, bytes
88+
89+
90+
class ColumnFamilyTestBase(object):
91+
@classmethod
92+
def setup_class(cls):
93+
cls._setup_ydb()
94+
95+
@classmethod
96+
def teardown_class(cls):
97+
cls.ydb_client.stop()
98+
cls.cluster.stop()
99+
100+
@classmethod
101+
def _setup_ydb(cls):
102+
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
103+
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
104+
config = KikimrConfigGenerator(
105+
extra_feature_flags={"enable_olap_compression": True},
106+
column_shard_config={
107+
"lag_for_compaction_before_tierings_ms": 0,
108+
"compaction_actualization_lag_ms": 0,
109+
"optimizer_freshness_check_duration_ms": 0,
110+
"small_portion_detect_size_limit": 0,
111+
"max_read_staleness_ms": 5000,
112+
},
113+
)
114+
cls.cluster = KiKiMR(config)
115+
cls.cluster.start()
116+
node = cls.cluster.nodes[1]
117+
cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}")
118+
cls.ydb_client.wait_connection()
119+
120+
@staticmethod
121+
def wait_for(condition_func, timeout_seconds):
122+
t0 = time.time()
123+
while time.time() - t0 < timeout_seconds:
124+
if condition_func():
125+
return True
126+
time.sleep(1)
127+
return False
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
PY3TEST()
2+
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
3+
4+
TEST_SRCS(
5+
base.py
6+
alter_compression.py
7+
)
8+
9+
SIZE(MEDIUM)
10+
11+
PEERDIR(
12+
ydb/tests/library
13+
ydb/public/sdk/python
14+
ydb/public/sdk/python/enable_v3_new_behavior
15+
ydb/tests/olap/scenario/helpers
16+
)
17+
18+
DEPENDS(
19+
ydb/apps/ydbd
20+
)
21+
22+
END()
23+

ydb/tests/olap/column_family/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
RECURSE(
2+
compression
3+
)

ydb/tests/olap/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ RECURSE(
3131
docs
3232
load
3333
ttl_tiering
34+
column_family
3435
)

0 commit comments

Comments
 (0)