Skip to content

Commit 661e989

Browse files
nsofyansofyaSofya Novozhilova
authored
Clean. Prepare validation. (#2586)
Co-authored-by: nsofya <[email protected]> Co-authored-by: Sofya Novozhilova <[email protected]>
1 parent 152f011 commit 661e989

File tree

18 files changed

+210
-84
lines changed

18 files changed

+210
-84
lines changed

ydb/core/kqp/ut/common/columnshard.cpp

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,27 @@
33

44
namespace NKikimr {
55
namespace NKqp {
6+
7+
TString GetConfigProtoWithName(const TString & tierName) {
8+
return TStringBuilder() << "Name : \"" << tierName << "\"\n" <<
9+
R"(
10+
ObjectStorage : {
11+
Endpoint: "fake"
12+
Bucket: "fake"
13+
SecretableAccessKey: {
14+
Value: {
15+
Data: "secretAccessKey"
16+
}
17+
}
18+
SecretableSecretKey: {
19+
Value: {
20+
Data: "secretSecretKey"
21+
}
22+
}
23+
}
24+
)";
25+
}
26+
627
using namespace NYdb;
728

829
TTestHelper::TTestHelper(const TKikimrSettings& settings)
@@ -29,6 +50,38 @@ namespace NKqp {
2950
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3051
}
3152

53+
void TTestHelper::CreateTier(const TString& tierName) {
54+
auto result = Session.ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
55+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
56+
}
57+
58+
TString TTestHelper::CreateTieringRule(const TString& tierName, const TString& columnName) {
59+
const TString ruleName = tierName + "_" + columnName;
60+
const TString configTieringStr = TStringBuilder() << R"({
61+
"rules" : [
62+
{
63+
"tierName" : ")" << tierName << R"(",
64+
"durationForEvict" : "10d"
65+
}
66+
]
67+
})";
68+
auto result = Session.ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
69+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
70+
return ruleName;
71+
}
72+
73+
void TTestHelper::SetTiering(const TString& tableName, const TString& ruleName) {
74+
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET (TIERING = '" << ruleName << "')";
75+
auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync();
76+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
77+
}
78+
79+
void TTestHelper::ResetTiering(const TString& tableName) {
80+
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TIERING)";
81+
auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync();
82+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
83+
}
84+
3285
void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) {
3386
Y_UNUSED(opStatus);
3487
NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer());
@@ -62,7 +115,8 @@ namespace NKqp {
62115
}
63116
}
64117
for (auto shard : shards) {
65-
RebootTablet(*runtime, shard, sender);
118+
Kikimr.GetTestServer().GetRuntime()->Send(MakePipePeNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(
119+
new TEvents::TEvPoisonPill(), shard, false));
66120
}
67121
}
68122

ydb/core/kqp/ut/common/columnshard.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ namespace NKqp {
7171
TTestActorRuntime& GetRuntime();
7272
NYdb::NTable::TSession& GetSession();
7373
void CreateTable(const TColumnTableBase& table);
74+
void CreateTier(const TString& tierName);
75+
TString CreateTieringRule(const TString& tierName, const TString& columnName);
76+
void SetTiering(const TString& tableName, const TString& ruleName);
77+
void ResetTiering(const TString& tableName);
7478
void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
7579
void BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
7680
void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS);

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22
#include <ydb/core/kqp/ut/common/columnshard.h>
3+
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
34
#include <ydb/core/formats/arrow/arrow_helpers.h>
45
#include <ydb/core/tx/tx_proxy/proxy.h>
56
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -5258,6 +5259,52 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
52585259
}
52595260
}
52605261

5262+
Y_UNIT_TEST(InvalidColumnInTieringRule) {
5263+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
5264+
5265+
TKikimrSettings runnerSettings;
5266+
runnerSettings.WithSampleTables = false;
5267+
TTestHelper testHelper(runnerSettings);
5268+
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
5269+
5270+
const TString tableName = "/Root/ColumnTableTest";
5271+
5272+
TVector<TTestHelper::TColumnSchema> schema = {
5273+
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
5274+
TTestHelper::TColumnSchema().SetName("id_second").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
5275+
TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32),
5276+
TTestHelper::TColumnSchema().SetName("created_at").SetType(NScheme::NTypeIds::Timestamp).SetNullable(false)
5277+
};
5278+
5279+
TTestHelper::TColumnTable testTable;
5280+
testTable.SetName(tableName).SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema).SetTTL("created_at", "Interval(\"PT1H\")");
5281+
testHelper.CreateTable(testTable);
5282+
testHelper.CreateTier("tier1");
5283+
5284+
{
5285+
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
5286+
tableInserter.AddRow().Add(1).Add(1).Add(7).Add((TInstant::Now() - TDuration::Days(30)).MilliSeconds());
5287+
tableInserter.AddRow().Add(1).Add(2).Add(7).Add((TInstant::Now() - TDuration::Days(30)).MilliSeconds());
5288+
testHelper.BulkUpsert(testTable, tableInserter);
5289+
}
5290+
5291+
while (csController->GetIndexations().Val() == 0) {
5292+
Cout << "Wait indexation..." << Endl;
5293+
Sleep(TDuration::Seconds(2));
5294+
}
5295+
5296+
// const auto ruleName = testHelper.CreateTieringRule("tier1", "created_att");
5297+
const auto ruleName = testHelper.CreateTieringRule("tier1", "created_at");
5298+
testHelper.SetTiering(tableName, ruleName);
5299+
5300+
while (csController->GetTieringUpdates().Val() == 0) {
5301+
Cout << "Wait tiering..." << Endl;
5302+
Sleep(TDuration::Seconds(2));
5303+
}
5304+
5305+
testHelper.RebootTablets(tableName);
5306+
}
5307+
52615308
Y_UNIT_TEST(AddColumnWithTtl) {
52625309
TKikimrSettings runnerSettings;
52635310
runnerSettings.WithSampleTables = false;
@@ -5290,7 +5337,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
52905337
UNIT_ASSERT_VALUES_EQUAL(description.GetTtlSettings()->GetDateTypeColumn().GetExpireAfter(), TDuration::Hours(1));
52915338
}
52925339
{
5293-
schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64));
52945340
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;";
52955341
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
52965342
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
@@ -5305,12 +5351,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
53055351
UNIT_ASSERT_VALUES_EQUAL(columns.size(), 5);
53065352
UNIT_ASSERT_VALUES_EQUAL(description.GetTtlSettings()->GetDateTypeColumn().GetExpireAfter(), TDuration::Hours(1));
53075353
}
5308-
{
5309-
schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64));
5310-
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << R"(` SET(TIERING = 'tiering1');)";
5311-
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
5312-
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
5313-
}
5354+
testHelper.SetTiering("/Root/ColumnTableTest", "tiering1");
53145355
{
53155356
auto settings = TDescribeTableSettings().WithTableStatistics(true);
53165357
auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync();
@@ -5322,7 +5363,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
53225363
UNIT_ASSERT_VALUES_EQUAL(description.GetTtlSettings()->GetDateTypeColumn().GetExpireAfter(), TDuration::Hours(1));
53235364
}
53245365
{
5325-
schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64));
53265366
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << R"(` RESET (TTL);)";
53275367
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
53285368
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
@@ -5337,12 +5377,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
53375377
UNIT_ASSERT_VALUES_EQUAL(*description.GetTiering(), "tiering1");
53385378
UNIT_ASSERT(!description.GetTtlSettings());
53395379
}
5340-
{
5341-
schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64));
5342-
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << R"(` RESET (TIERING);)";
5343-
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
5344-
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
5345-
}
5380+
testHelper.ResetTiering("/Root/ColumnTableTest");
53465381
{
53475382
auto settings = TDescribeTableSettings().WithTableStatistics(true);
53485383
auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync();
@@ -5416,7 +5451,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
54165451
TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
54175452
TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
54185453
};
5419-
5454+
54205455
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
54215456
TTestHelper::TColumnTable testTable;
54225457

@@ -5581,8 +5616,8 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
55815616
testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
55825617
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[[200u]]]");
55835618

5584-
// testHelper.RebootTablets(testTable.GetName());
5585-
// testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[[200u]]]");
5619+
testHelper.RebootTablets(testTable.GetName());
5620+
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[[200u]]]");
55865621
}
55875622

55885623
Y_UNIT_TEST(AddColumnErrors) {

ydb/core/kqp/ut/scheme/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ PEERDIR(
2323
ydb/core/kqp
2424
ydb/core/kqp/ut/common
2525
ydb/library/yql/sql/pg_dummy
26+
ydb/core/tx/columnshard/hooks/testing
2627
)
2728

2829
YQL_LAST_ABI_VERSION()

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,14 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const
102102
const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
103103
for (ui64 pathId : ttlBody.GetPathIds()) {
104104
NOlap::TTiering tiering;
105-
tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName);
105+
tiering.Add(NOlap::TTierInfo::MakeTtl(now - unixTime, columnName));
106106
pathTtls.emplace(pathId, std::move(tiering));
107107
}
108108
}
109-
if (!Self->SetupTtl(pathTtls, true)) {
109+
if (!Self->SetupTtl(pathTtls)) {
110110
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL not started");
111111
}
112+
Self->TablesManager.MutablePrimaryIndex().OnTieringModified(Self->Tiers, Self->TablesManager.GetTtl());
112113

113114
return TTxController::TProposeResult();
114115
}

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -718,8 +718,8 @@ void TColumnShard::SetupCompaction() {
718718
LOG_S_DEBUG("ActiveCompactions: " << BackgroundController.GetCompactionsCount() << " at tablet " << TabletID());
719719
}
720720

721-
bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, const bool force) {
722-
if (!AppDataVerified().ColumnShardConfig.GetTTLEnabled()) {
721+
bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
722+
if (!AppDataVerified().ColumnShardConfig.GetTTLEnabled() || !NYDBTest::TControllers::GetColumnShardController()->IsTTLEnabled()) {
723723
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_ttl")("reason", "disabled");
724724
return false;
725725
}
@@ -728,9 +728,6 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con
728728
ACFL_DEBUG("background", "ttl")("skip_reason", "in_progress");
729729
return false;
730730
}
731-
if (force) {
732-
TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl());
733-
}
734731
THashMap<ui64, NOlap::TTiering> eviction = pathTtls;
735732
for (auto&& i : eviction) {
736733
ACFL_DEBUG("background", "ttl")("path", i.first)("info", i.second.GetDebugString());

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ class TColumnShard
538538
void StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex);
539539
void SetupIndexation();
540540
void SetupCompaction();
541-
bool SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {}, const bool force = false);
541+
bool SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {});
542542
void SetupCleanup();
543543
void SetupCleanupInsertTable();
544544
void SetupGC();

ydb/core/tx/columnshard/columnshard_ttl.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,13 @@ class TTtl {
6565
PathTtls.erase(pathId);
6666
}
6767

68-
void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction) const {
68+
bool AddTtls(THashMap<ui64, NOlap::TTiering>& eviction) const {
6969
for (auto& [pathId, descr] : PathTtls) {
70-
eviction[pathId].Ttl = Convert(descr);
70+
if (!eviction[pathId].Add(Convert(descr))) {
71+
return false;
72+
}
7173
}
74+
return true;
7275
}
7376

7477
const THashSet<TString>& TtlColumns() const { return Columns; }

ydb/core/tx/columnshard/columnshard_ut_common.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ struct TTestSchema {
5151
: Name(name)
5252
{}
5353

54+
TString DebugString() const {
55+
return TStringBuilder() << "{Column=" << TtlColumn << ";EvictAfter=" << EvictAfter.value_or(TDuration::Zero()) << ";Name=" << Name << ";Codec=" << Codec << "};";
56+
}
57+
5458
NKikimrSchemeOp::EColumnCodec GetCodecId() const {
5559
if (Codec == "none") {
5660
return NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain;
@@ -140,6 +144,15 @@ struct TTestSchema {
140144
EvictAfter = ttl;
141145
return *this;
142146
}
147+
148+
TString DebugString() const {
149+
auto result = TStringBuilder() << "WaitEmptyAfter=" << WaitEmptyAfter << ";Tiers=";
150+
for (auto&& tier : Tiers) {
151+
result << "{" << tier.DebugString() << "}";
152+
}
153+
result << ";TTL=" << TStorageTier::DebugString();
154+
return result;
155+
}
143156
};
144157
using TTestColumn = NArrow::NTest::TTestColumn;
145158
static auto YdbSchema(const TTestColumn& firstKeyItem = TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp))) {

0 commit comments

Comments
 (0)