Skip to content

Commit d5bfcc0

Browse files
authored
YQ CTAS fix for script executions (#9228)
1 parent c09a984 commit d5bfcc0

File tree

4 files changed

+51
-25
lines changed

4 files changed

+51
-25
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
234234
break;
235235

236236
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
237-
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings);
237+
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings, SplitExpr);
238238
break;
239239

240240
default:

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,10 +1142,10 @@ class TKqpHost : public IKqpHost {
11421142
});
11431143
}
11441144

1145-
IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
1145+
IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) override {
11461146
return CheckedProcessQuery(*ExprCtx,
1147-
[this, &query, settings] (TExprContext& ctx) mutable {
1148-
return PrepareQueryInternal(query, nullptr, EKikimrQueryType::Script, settings, ctx);
1147+
[this, &query, settings, expr] (TExprContext& ctx) mutable {
1148+
return PrepareQueryInternal(query, expr, EKikimrQueryType::Script, settings, ctx);
11491149
});
11501150
}
11511151

ydb/core/kqp/host/kqp_host.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class IKqpHost : public TThrRefBase {
8989
virtual IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;
9090

9191
/* Federated queries */
92-
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
92+
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;
9393

9494
/* Scripting */
9595
virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0;

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,13 +1804,13 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
18041804
ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000);
18051805
}
18061806

1807-
std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) {
1807+
std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName, bool enableOltp) {
18081808
const TString bucket = "test_bucket3";
18091809
const TString object = "test_object";
18101810

18111811
NKikimrConfig::TAppConfig appConfig;
18121812
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
1813-
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
1813+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(enableOltp);
18141814
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
18151815
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
18161816
appConfig.MutableFeatureFlags()->SetEnableTempTables(true);
@@ -1863,8 +1863,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
18631863

18641864
}
18651865

1866-
void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) {
1867-
{
1866+
void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable, bool enableOltp) {
1867+
if (enableOltp) {
18681868
const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;";
18691869
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
18701870
}
@@ -1875,15 +1875,15 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
18751875
}
18761876
}
18771877

1878-
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) {
1878+
void DoCreateTableAsSelectFromExternalDataSource(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
18791879
const TString externalDataSourceName = "external_data_source";
18801880
const TString externalTableName = "test_binding_resolve";
18811881

1882-
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
1882+
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
18831883
auto client = kikimr->GetQueryClient();
18841884

18851885
const TString oltpTable = "DestinationOltp";
1886-
{
1886+
if (enableOltp) {
18871887
const TString query = fmt::format(R"(
18881888
PRAGMA TablePathPrefix = "TestDomain";
18891889
CREATE TABLE `{destination}` (
@@ -1900,8 +1900,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19001900
"destination"_a = oltpTable,
19011901
"external_source"_a = externalDataSourceName
19021902
);
1903-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1904-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1903+
requestRunner(query, client, kikimr->GetDriver());
19051904
}
19061905

19071906
const TString olapTable = "DestinationOlap";
@@ -1923,22 +1922,43 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19231922
"destination"_a = olapTable,
19241923
"external_source"_a = externalDataSourceName
19251924
);
1926-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1927-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1925+
requestRunner(query, client, kikimr->GetDriver());
19281926
}
19291927

1930-
ValidateTables(client, oltpTable, olapTable);
1928+
ValidateTables(client, oltpTable, olapTable, enableOltp);
1929+
}
1930+
1931+
void RunGenericQuery(const TString& query, TQueryClient& client, const TDriver&) {
1932+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1933+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1934+
}
1935+
1936+
void RunGenericScript(const TString& script, TQueryClient& client, const TDriver& driver) {
1937+
auto scriptExecutionOperation = client.ExecuteScript(script).ExtractValueSync();
1938+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
1939+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
1940+
1941+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), driver);
1942+
UNIT_ASSERT_VALUES_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToOneLineString());
19311943
}
19321944

1933-
Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) {
1945+
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericQuery) {
1946+
DoCreateTableAsSelectFromExternalDataSource(&RunGenericQuery, true);
1947+
}
1948+
1949+
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericScript) {
1950+
DoCreateTableAsSelectFromExternalDataSource(&RunGenericScript, false);
1951+
}
1952+
1953+
void DoCreateTableAsSelectFromExternalTable(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
19341954
const TString externalDataSourceName = "external_data_source";
19351955
const TString externalTableName = "test_binding_resolve";
19361956

1937-
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
1957+
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
19381958
auto client = kikimr->GetQueryClient();
19391959

19401960
const TString oltpTable = "DestinationOltp";
1941-
{
1961+
if (enableOltp) {
19421962
const TString query = fmt::format(R"(
19431963
PRAGMA TablePathPrefix = "TestDomain";
19441964
CREATE TABLE `{destination}` (
@@ -1949,8 +1969,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19491969
"destination"_a = oltpTable,
19501970
"external_table"_a = externalTableName
19511971
);
1952-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1953-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1972+
requestRunner(query, client, kikimr->GetDriver());
19541973
}
19551974

19561975
const TString olapTable = "DestinationOlap";
@@ -1966,11 +1985,18 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19661985
"destination"_a = olapTable,
19671986
"external_table"_a = externalTableName
19681987
);
1969-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1970-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1988+
requestRunner(query, client, kikimr->GetDriver());
19711989
}
19721990

1973-
ValidateTables(client, oltpTable, olapTable);
1991+
ValidateTables(client, oltpTable, olapTable, enableOltp);
1992+
}
1993+
1994+
Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericQuery) {
1995+
DoCreateTableAsSelectFromExternalTable(&RunGenericQuery, true);
1996+
}
1997+
1998+
Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericScript) {
1999+
DoCreateTableAsSelectFromExternalTable(&RunGenericScript, false);
19742000
}
19752001

19762002
Y_UNIT_TEST(OverridePlannerDefaults) {

0 commit comments

Comments
 (0)