Skip to content

Commit 059614c

Browse files
authored
fix sink test (#1468)
1 parent d4c6fda commit 059614c

File tree

4 files changed

+137
-17
lines changed

4 files changed

+137
-17
lines changed

ydb/core/base/appdata_fwd.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ struct TAppData {
236236
bool EnableMvccSnapshotWithLegacyDomainRoot = false;
237237
bool UsePartitionStatsCollectorForTests = false;
238238
bool DisableCdcAutoSwitchingToReadyStateForTests = false;
239-
bool EnableOlapSink = false;
240239
TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks
241240
TVector<TString> DefaultUserSIDs;
242241
TString AllAuthenticatedUsers = "all-users@well-known";

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1711,7 +1711,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
17111711
}
17121712

17131713
if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))
1714-
|| (EnableOlapSink && HasOlapSink(stage))) {
1714+
|| (!EnableOlapSink && HasOlapSink(stage))) {
17151715
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
17161716
LOG_E(error);
17171717
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,

ydb/core/kqp/session_actor/kqp_tx.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
207207
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
208208
for (const auto &tx : physicalQuery.GetTransactions()) {
209209
for (const auto &stage : tx.GetStages()) {
210+
for (const auto &source : stage.GetSources()) {
211+
if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource){
212+
return true;
213+
}
214+
}
210215
for (const auto &tableOp : stage.GetTableOps()) {
211216
switch (tableOp.GetTypeCase()) {
212217
case NKqpProto::TKqpPhyTableOperation::kReadRange:

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 131 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5549,11 +5549,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
55495549
testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[110]]");
55505550
}
55515551

5552-
Y_UNIT_TEST(OlapReplace_FromSelect) {
5552+
Y_UNIT_TEST(OlapReplace_FromSelectSimple) {
5553+
NKikimrConfig::TAppConfig appConfig;
5554+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
55535555
auto settings = TKikimrSettings()
5556+
.SetAppConfig(appConfig)
55545557
.SetWithSampleTables(false);
55555558
TKikimrRunner kikimr(settings);
5556-
kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
55575559
Tests::NCommon::TLoggerInit(kikimr).Initialize();
55585560
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
55595561

@@ -5662,18 +5664,76 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
56625664
output,
56635665
R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])");
56645666
}
5667+
}
5668+
5669+
Y_UNIT_TEST(OlapReplace_BadTransactions) {
5670+
NKikimrConfig::TAppConfig appConfig;
5671+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
5672+
auto settings = TKikimrSettings()
5673+
.SetAppConfig(appConfig)
5674+
.SetWithSampleTables(false);
5675+
TKikimrRunner kikimr(settings);
5676+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
5677+
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
5678+
5679+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
56655680

5681+
const TString query = R"(
5682+
CREATE TABLE `/Root/ColumnShard` (
5683+
Col1 Uint64 NOT NULL,
5684+
Col2 String,
5685+
Col3 Int32 NOT NULL,
5686+
PRIMARY KEY (Col1)
5687+
)
5688+
PARTITION BY HASH(Col1)
5689+
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
5690+
5691+
CREATE TABLE `/Root/DataShard` (
5692+
Col1 Uint64 NOT NULL,
5693+
Col2 String,
5694+
Col3 Int32 NOT NULL,
5695+
PRIMARY KEY (Col1)
5696+
)
5697+
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
5698+
)";
5699+
5700+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
5701+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
5702+
5703+
auto client = kikimr.GetQueryClient();
56665704
{
56675705
auto prepareResult = client.ExecuteQuery(R"(
5668-
REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES
5706+
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
56695707
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
56705708
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
56715709
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
5710+
}
5711+
{
5712+
auto prepareResult = client.ExecuteQuery(R"(
5713+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
5714+
(10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13);
5715+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
5716+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
5717+
}
5718+
5719+
{
5720+
// column -> row
5721+
const TString sql = R"(
5722+
REPLACE INTO `/Root/DataShard`
5723+
SELECT * FROM `/Root/ColumnShard`
5724+
)";
5725+
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
5726+
UNIT_ASSERT(!insertResult.IsSuccess());
5727+
UNIT_ASSERT_C(
5728+
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
5729+
insertResult.GetIssues().ToString());
5730+
}
56725731

5732+
{
56735733
// row -> column
56745734
const TString sql = R"(
5675-
REPLACE INTO `/Root/ColumnShard3`
5676-
SELECT * FROM `/Root/DataShard1`
5735+
REPLACE INTO `/Root/ColumnShard`
5736+
SELECT * FROM `/Root/DataShard`
56775737
)";
56785738
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
56795739
UNIT_ASSERT(!insertResult.IsSuccess());
@@ -5683,10 +5743,53 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
56835743
}
56845744

56855745
{
5686-
// column -> row
5746+
// column & row read
56875747
const TString sql = R"(
5688-
REPLACE INTO `/Root/DataShard2`
5689-
SELECT * FROM `/Root/ColumnSource`
5748+
SELECT * FROM `/Root/DataShard`;
5749+
SELECT * FROM `/Root/ColumnShard`;
5750+
)";
5751+
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
5752+
UNIT_ASSERT(!insertResult.IsSuccess());
5753+
UNIT_ASSERT_C(
5754+
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
5755+
insertResult.GetIssues().ToString());
5756+
}
5757+
5758+
{
5759+
// column & row write
5760+
const TString sql = R"(
5761+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
5762+
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
5763+
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
5764+
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
5765+
)";
5766+
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
5767+
UNIT_ASSERT(!insertResult.IsSuccess());
5768+
UNIT_ASSERT_C(
5769+
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
5770+
insertResult.GetIssues().ToString());
5771+
}
5772+
5773+
{
5774+
// column read & row write
5775+
const TString sql = R"(
5776+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
5777+
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
5778+
SELECT * FROM `/Root/ColumnShard`;
5779+
)";
5780+
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
5781+
UNIT_ASSERT(!insertResult.IsSuccess());
5782+
UNIT_ASSERT_C(
5783+
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
5784+
insertResult.GetIssues().ToString());
5785+
}
5786+
5787+
{
5788+
// column write & row read
5789+
const TString sql = R"(
5790+
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
5791+
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
5792+
SELECT * FROM `/Root/DataShard`;
56905793
)";
56915794
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
56925795
UNIT_ASSERT(!insertResult.IsSuccess());
@@ -5697,13 +5800,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
56975800
}
56985801

56995802
Y_UNIT_TEST(OlapReplace_FromSelectLarge) {
5803+
NKikimrConfig::TAppConfig appConfig;
5804+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
57005805
auto settings = TKikimrSettings()
5806+
.SetAppConfig(appConfig)
57015807
.SetWithSampleTables(false);
57025808

57035809
TTestHelper testHelper(settings);
57045810

57055811
TKikimrRunner& kikimr = testHelper.GetKikimr();
5706-
testHelper.GetRuntime().GetAppData(0).EnableOlapSink = true;
57075812
Tests::NCommon::TLoggerInit(kikimr).Initialize();
57085813

57095814
TVector<TTestHelper::TColumnSchema> schema = {
@@ -5712,11 +5817,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
57125817
};
57135818

57145819
TTestHelper::TColumnTable testTable1;
5715-
testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema);
5820+
testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema).SetMinPartitionsCount(1000);
57165821
testHelper.CreateTable(testTable1);
57175822

57185823
TTestHelper::TColumnTable testTable2;
5719-
testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema);
5824+
testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema).SetMinPartitionsCount(1000);
57205825
testHelper.CreateTable(testTable2);
57215826

57225827
{
@@ -5750,10 +5855,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
57505855
}
57515856

57525857
Y_UNIT_TEST(OlapReplace_Simple) {
5858+
NKikimrConfig::TAppConfig appConfig;
5859+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
57535860
auto settings = TKikimrSettings()
5861+
.SetAppConfig(appConfig)
57545862
.SetWithSampleTables(false);
57555863
TKikimrRunner kikimr(settings);
5756-
kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
57575864
Tests::NCommon::TLoggerInit(kikimr).Initialize();
57585865
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
57595866

@@ -5801,10 +5908,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
58015908
}
58025909

58035910
Y_UNIT_TEST(OlapReplace_InsertUpsertError) {
5911+
NKikimrConfig::TAppConfig appConfig;
5912+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
58045913
auto settings = TKikimrSettings()
5914+
.SetAppConfig(appConfig)
58055915
.SetWithSampleTables(false);
5916+
58065917
TKikimrRunner kikimr(settings);
5807-
kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
58085918
Tests::NCommon::TLoggerInit(kikimr).Initialize();
58095919
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
58105920

@@ -5855,10 +5965,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
58555965
}
58565966

58575967
Y_UNIT_TEST(OlapReplace_Duplicates) {
5968+
NKikimrConfig::TAppConfig appConfig;
5969+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
58585970
auto settings = TKikimrSettings()
5971+
.SetAppConfig(appConfig)
58595972
.SetWithSampleTables(false);
5973+
58605974
TKikimrRunner kikimr(settings);
5861-
kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
58625975
Tests::NCommon::TLoggerInit(kikimr).Initialize();
58635976
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
58645977

@@ -5903,10 +6016,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
59036016
}
59046017

59056018
Y_UNIT_TEST(OlapReplace_DisableOlapSink) {
6019+
NKikimrConfig::TAppConfig appConfig;
6020+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);
59066021
auto settings = TKikimrSettings()
6022+
.SetAppConfig(appConfig)
59076023
.SetWithSampleTables(false);
6024+
59086025
TKikimrRunner kikimr(settings);
5909-
kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = false;
59106026
Tests::NCommon::TLoggerInit(kikimr).Initialize();
59116027
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
59126028

0 commit comments

Comments
 (0)