Skip to content

Commit 72e974f

Browse files
authored
24-3: Datashard + Columnshard Reads (#6858)
1 parent 68915a1 commit 72e974f

File tree

5 files changed

+106
-26
lines changed

5 files changed

+106
-26
lines changed

.github/config/muted_ya.txt

-2
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
99
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
1010
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
1111
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
12-
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
1312
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
14-
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
1513
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
1614
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
1715
ydb/core/kqp/ut/pg KqpPg.CreateIndex

ydb/core/kqp/common/kqp_tx.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
166166
for (const auto &input : stage.GetInputs()) {
167167
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
168168
}
169+
170+
for (const auto &tableOp : stage.GetTableOps()) {
171+
if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
172+
// always need snapshot for OLAP reads
173+
return true;
174+
}
175+
}
169176
}
170177
}
171178

ydb/core/kqp/session_actor/kqp_query_state.h

-4
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,6 @@ class TKqpQueryState : public TNonCopyable {
314314

315315
bool NeedPersistentSnapshot() const {
316316
auto type = GetType();
317-
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
318-
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
319-
return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
320-
}
321317
return (
322318
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
323319
type == NKikimrKqp::QUERY_TYPE_AST_SCAN

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -844,9 +844,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
844844
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
845845
HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
846846
HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
847-
if (HasOlapTable && HasOltpTable) {
847+
HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
848+
if (HasOlapTable && HasOltpTable && HasTableWrite) {
848849
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
849-
"Transactions between column and row tables are disabled at current time.");
850+
"Write transactions between column and row tables are disabled at current time.");
850851
return false;
851852
}
852853
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
@@ -2551,6 +2552,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
25512552

25522553
bool HasOlapTable = false;
25532554
bool HasOltpTable = false;
2555+
bool HasTableWrite = false;
25542556

25552557
TGUCSettings::TPtr GUCSettings;
25562558
};

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

+95-18
Original file line numberDiff line numberDiff line change
@@ -2790,7 +2790,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
27902790
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
27912791
UNIT_ASSERT(!insertResult.IsSuccess());
27922792
UNIT_ASSERT_C(
2793-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2793+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
27942794
insertResult.GetIssues().ToString());
27952795
}
27962796

@@ -2803,20 +2803,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28032803
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28042804
UNIT_ASSERT(!insertResult.IsSuccess());
28052805
UNIT_ASSERT_C(
2806-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2807-
insertResult.GetIssues().ToString());
2808-
}
2809-
2810-
{
2811-
// column & row read
2812-
const TString sql = R"(
2813-
SELECT * FROM `/Root/DataShard`;
2814-
SELECT * FROM `/Root/ColumnShard`;
2815-
)";
2816-
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
2817-
UNIT_ASSERT(!insertResult.IsSuccess());
2818-
UNIT_ASSERT_C(
2819-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2806+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28202807
insertResult.GetIssues().ToString());
28212808
}
28222809

@@ -2831,7 +2818,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28312818
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28322819
UNIT_ASSERT(!insertResult.IsSuccess());
28332820
UNIT_ASSERT_C(
2834-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2821+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28352822
insertResult.GetIssues().ToString());
28362823
}
28372824

@@ -2845,7 +2832,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28452832
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28462833
UNIT_ASSERT(!insertResult.IsSuccess());
28472834
UNIT_ASSERT_C(
2848-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2835+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28492836
insertResult.GetIssues().ToString());
28502837
}
28512838

@@ -2859,7 +2846,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28592846
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28602847
UNIT_ASSERT(!insertResult.IsSuccess());
28612848
UNIT_ASSERT_C(
2862-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2849+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28632850
insertResult.GetIssues().ToString());
28642851
}
28652852
}
@@ -3541,6 +3528,96 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
35413528
}
35423529
}
35433530

3531+
Y_UNIT_TEST(ReadDatashardAndColumnshard) {
3532+
NKikimrConfig::TAppConfig appConfig;
3533+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
3534+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
3535+
auto settings = TKikimrSettings()
3536+
.SetAppConfig(appConfig)
3537+
.SetWithSampleTables(false);
3538+
3539+
TKikimrRunner kikimr(settings);
3540+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
3541+
3542+
auto client = kikimr.GetQueryClient();
3543+
3544+
{
3545+
auto createTable = client.ExecuteQuery(R"sql(
3546+
CREATE TABLE `/Root/DataShard` (
3547+
Col1 Uint64 NOT NULL,
3548+
Col2 Int32,
3549+
Col3 String,
3550+
PRIMARY KEY (Col1)
3551+
) WITH (
3552+
STORE = ROW,
3553+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
3554+
);
3555+
CREATE TABLE `/Root/ColumnShard` (
3556+
Col1 Uint64 NOT NULL,
3557+
Col2 Int32,
3558+
Col3 String,
3559+
PRIMARY KEY (Col1)
3560+
) WITH (
3561+
STORE = COLUMN,
3562+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
3563+
);
3564+
)sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
3565+
UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
3566+
}
3567+
3568+
{
3569+
auto replaceValues = client.ExecuteQuery(R"sql(
3570+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
3571+
(1u, 1, "row");
3572+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3573+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
3574+
}
3575+
3576+
{
3577+
auto replaceValues = client.ExecuteQuery(R"sql(
3578+
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
3579+
(2u, 2, "column");
3580+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3581+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
3582+
}
3583+
3584+
{
3585+
auto it = client.StreamExecuteQuery(R"sql(
3586+
SELECT * FROM `/Root/ColumnShard`;
3587+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3588+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3589+
TString output = StreamResultToYson(it);
3590+
CompareYson(
3591+
output,
3592+
R"([[2u;[2];["column"]]])");
3593+
}
3594+
3595+
{
3596+
auto it = client.StreamExecuteQuery(R"sql(
3597+
SELECT * FROM `/Root/DataShard`
3598+
UNION ALL
3599+
SELECT * FROM `/Root/ColumnShard`;
3600+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3601+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3602+
TString output = StreamResultToYson(it);
3603+
CompareYson(
3604+
output,
3605+
R"([[1u;[1];["row"]];[2u;[2];["column"]]])");
3606+
}
3607+
3608+
{
3609+
auto it = client.StreamExecuteQuery(R"sql(
3610+
SELECT r.Col3, c.Col3 FROM `/Root/DataShard` AS r
3611+
JOIN `/Root/ColumnShard` AS c ON r.Col1 + 1 = c.Col1;
3612+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3613+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3614+
TString output = StreamResultToYson(it);
3615+
CompareYson(
3616+
output,
3617+
R"([[["row"];["column"]]])");
3618+
}
3619+
}
3620+
35443621
Y_UNIT_TEST(ReplaceIntoWithDefaultValue) {
35453622
NKikimrConfig::TAppConfig appConfig;
35463623
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);

0 commit comments

Comments
 (0)