Skip to content

Commit c84cdeb

Browse files
authored
Merge d44920d into 1aba0a7
2 parents 1aba0a7 + d44920d commit c84cdeb

File tree

9 files changed

+218
-22
lines changed

9 files changed

+218
-22
lines changed

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,10 @@ namespace {
187187
const TString createTableName = !isAtomicOperation
188188
? tableName
189189
: (TStringBuilder()
190-
<< "/Root/.tmp/sessions/"
190+
<< CanonizePath(AppData()->TenantName)
191+
<< "/.tmp/sessions/"
191192
<< sessionCtx->GetSessionId()
192-
<< tmpTableName);
193+
<< CanonizePath(tmpTableName));
193194

194195
create = exprCtx.ReplaceNode(std::move(create), *columns, exprCtx.NewList(pos, std::move(columnNodes)));
195196

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
2121
NYql::NConnector::IClient::TPtr connectorClient,
2222
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver,
2323
std::optional<NKikimrConfig::TAppConfig> appConfig,
24-
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory)
24+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
25+
const TString& domainRoot)
2526
{
2627
NKikimrConfig::TFeatureFlags featureFlags;
2728
featureFlags.SetEnableExternalDataSources(true);
@@ -53,7 +54,9 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
5354
.SetFeatureFlags(featureFlags)
5455
.SetFederatedQuerySetupFactory(federatedQuerySetupFactory)
5556
.SetKqpSettings({})
56-
.SetS3ActorsFactory(std::move(s3ActorsFactory));
57+
.SetS3ActorsFactory(std::move(s3ActorsFactory))
58+
.SetWithSampleTables(false)
59+
.SetDomainRoot(domainRoot);
5760

5861
settings = settings.SetAppConfig(appConfig.value());
5962

ydb/core/kqp/ut/federated_query/common/common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
1717
NYql::NConnector::IClient::TPtr connectorClient = nullptr,
1818
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver = nullptr,
1919
std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt,
20-
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory = nullptr);
20+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory = nullptr,
21+
const TString& domainRoot = "Root");
2122
}

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,6 +1792,175 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
17921792
Y_UNIT_TEST(ExecuteScriptWithThinFile) {
17931793
ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000);
17941794
}
1795+
1796+
std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) {
1797+
const TString bucket = "test_bucket3";
1798+
const TString object = "test_object";
1799+
1800+
NKikimrConfig::TAppConfig appConfig;
1801+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
1802+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
1803+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
1804+
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
1805+
appConfig.MutableFeatureFlags()->SetEnableTempTables(true);
1806+
auto kikimr = NTestUtils::MakeKikimrRunner(appConfig, "TestDomain");
1807+
1808+
CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
1809+
1810+
auto tc = kikimr->GetTableClient();
1811+
auto session = tc.CreateSession().GetValueSync().GetSession();
1812+
const TString query = fmt::format(R"(
1813+
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
1814+
SOURCE_TYPE="ObjectStorage",
1815+
LOCATION="{location}",
1816+
AUTH_METHOD="NONE"
1817+
);
1818+
CREATE EXTERNAL TABLE `{external_table}` (
1819+
key Utf8 NOT NULL,
1820+
value Utf8 NOT NULL
1821+
) WITH (
1822+
DATA_SOURCE="{external_source}",
1823+
LOCATION="{object}",
1824+
FORMAT="json_each_row"
1825+
);)",
1826+
"external_source"_a = externalDataSourceName,
1827+
"external_table"_a = externalTableName,
1828+
"location"_a = GetBucketLocation(bucket),
1829+
"object"_a = object
1830+
);
1831+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
1832+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1833+
1834+
return kikimr;
1835+
}
1836+
1837+
void ValidateResult(const TExecuteQueryResult& result) {
1838+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1839+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetResultSets().size(), 1, "Unexpected result sets count");
1840+
1841+
TResultSetParser resultSet(result.GetResultSet(0));
1842+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
1843+
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
1844+
1845+
UNIT_ASSERT(resultSet.TryNextRow());
1846+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
1847+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
1848+
1849+
UNIT_ASSERT(resultSet.TryNextRow());
1850+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
1851+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
1852+
1853+
}
1854+
1855+
void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) {
1856+
{
1857+
const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;";
1858+
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
1859+
}
1860+
1861+
{
1862+
const TString query = TStringBuilder() << "SELECT key, value FROM `" << olapTable << "` ORDER BY key;";
1863+
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
1864+
}
1865+
}
1866+
1867+
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) {
1868+
const TString externalDataSourceName = "external_data_source";
1869+
const TString externalTableName = "test_binding_resolve";
1870+
1871+
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
1872+
auto client = kikimr->GetQueryClient();
1873+
1874+
const TString oltpTable = "DestinationOltp";
1875+
{
1876+
const TString query = fmt::format(R"(
1877+
PRAGMA TablePathPrefix = "TestDomain";
1878+
CREATE TABLE `{destination}` (
1879+
PRIMARY KEY (key, value)
1880+
)
1881+
AS SELECT *
1882+
FROM `{external_source}`.`/` WITH (
1883+
format="json_each_row",
1884+
schema(
1885+
key Utf8 NOT NULL,
1886+
value Utf8 NOT NULL
1887+
)
1888+
);)",
1889+
"destination"_a = oltpTable,
1890+
"external_source"_a = externalDataSourceName
1891+
);
1892+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1893+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1894+
}
1895+
1896+
const TString olapTable = "DestinationOlap";
1897+
{
1898+
const TString query = fmt::format(R"(
1899+
PRAGMA TablePathPrefix = "TestDomain";
1900+
CREATE TABLE `{destination}` (
1901+
PRIMARY KEY (key, value)
1902+
)
1903+
WITH (STORE = COLUMN)
1904+
AS SELECT *
1905+
FROM `{external_source}`.`/` WITH (
1906+
format="json_each_row",
1907+
schema(
1908+
key Utf8 NOT NULL,
1909+
value Utf8 NOT NULL
1910+
)
1911+
);)",
1912+
"destination"_a = olapTable,
1913+
"external_source"_a = externalDataSourceName
1914+
);
1915+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1916+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1917+
}
1918+
1919+
ValidateTables(client, oltpTable, olapTable);
1920+
}
1921+
1922+
Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) {
1923+
const TString externalDataSourceName = "external_data_source";
1924+
const TString externalTableName = "test_binding_resolve";
1925+
1926+
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
1927+
auto client = kikimr->GetQueryClient();
1928+
1929+
const TString oltpTable = "DestinationOltp";
1930+
{
1931+
const TString query = fmt::format(R"(
1932+
PRAGMA TablePathPrefix = "TestDomain";
1933+
CREATE TABLE `{destination}` (
1934+
PRIMARY KEY (key, value)
1935+
)
1936+
AS SELECT *
1937+
FROM `{external_table}`;)",
1938+
"destination"_a = oltpTable,
1939+
"external_table"_a = externalTableName
1940+
);
1941+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1942+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1943+
}
1944+
1945+
const TString olapTable = "DestinationOlap";
1946+
{
1947+
const TString query = fmt::format(R"(
1948+
PRAGMA TablePathPrefix = "TestDomain";
1949+
CREATE TABLE `{destination}` (
1950+
PRIMARY KEY (key, value)
1951+
)
1952+
WITH (STORE = COLUMN)
1953+
AS SELECT *
1954+
FROM `{external_table}`;)",
1955+
"destination"_a = olapTable,
1956+
"external_table"_a = externalTableName
1957+
);
1958+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1959+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1960+
}
1961+
1962+
ValidateTables(client, oltpTable, olapTable);
1963+
}
17951964
}
17961965

17971966
} // namespace NKikimr::NKqp

ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ namespace NTestUtils {
2121

2222
extern const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";
2323

24-
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig) {
25-
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory());
24+
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig, const TString& domainRoot) {
25+
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory(), domainRoot);
2626
}
2727

2828
Aws::S3::S3Client MakeS3Client() {

ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ namespace NTestUtils {
2929
extern const TString TEST_SCHEMA;
3030
extern const TString TEST_SCHEMA_IDS;
3131

32-
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt);
32+
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt, const TString& domainRoot = "Root");
3333

3434
Aws::S3::S3Client MakeS3Client();
3535

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class TTestFeatureFlagsHolder {
4444
FEATURE_FLAG_SETTER(EnableTopicDiskSubDomainQuota)
4545
FEATURE_FLAG_SETTER(EnablePQConfigTransactionsAtSchemeShard)
4646
FEATURE_FLAG_SETTER(EnableScriptExecutionOperations)
47+
FEATURE_FLAG_SETTER(EnableExternalDataSources)
4748
FEATURE_FLAG_SETTER(EnableForceImmediateEffectsExecution)
4849
FEATURE_FLAG_SETTER(EnableTopicSplitMerge)
4950
FEATURE_FLAG_SETTER(EnableTempTables)

ydb/tests/tools/kqprun/configuration/app_config.conf

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ FeatureFlags {
22
EnableExternalDataSources: true
33
EnableScriptExecutionOperations: true
44
EnableExternalSourceSchemaInference: true
5-
EnableResourcePools: true
65
}
76

87
KQPConfig {

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,9 @@ struct TExecutionOptions {
6060
};
6161

6262

63-
void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) {
63+
void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) {
6464
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
6565

66-
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl;
67-
NKqpRun::TKqpRunner runner(runnerOptions);
68-
6966
if (executionOptions.SchemeQuery) {
7067
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing scheme query..." << colors.Default() << Endl;
7168
if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery, executionOptions.TraceId)) {
@@ -136,20 +133,45 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
136133
ythrow yexception() << "Failed to print script results, reason:\n" << CurrentExceptionMessage();
137134
}
138135
}
136+
}
139137

140-
if (runnerOptions.YdbSettings.MonitoringEnabled) {
141-
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Started reading commands" << colors.Default() << Endl;
142-
while (true) {
143-
TString command;
144-
Cin >> command;
145138

146-
if (command == "exit") {
147-
break;
148-
}
149-
Cerr << colors.Red() << TInstant::Now().ToIsoStringLocal() << " Invalid command '" << command << "'" << colors.Default() << Endl;
139+
void RunAsDaemon() {
140+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
141+
142+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Started reading commands" << colors.Default() << Endl;
143+
while (true) {
144+
TString command;
145+
Cin >> command;
146+
147+
if (command == "exit") {
148+
break;
149+
}
150+
Cerr << colors.Red() << TInstant::Now().ToIsoStringLocal() << " Invalid command '" << command << "'" << colors.Default() << Endl;
151+
}
152+
}
153+
154+
155+
void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) {
156+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
157+
158+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl;
159+
NKqpRun::TKqpRunner runner(runnerOptions);
160+
161+
try {
162+
RunArgumentQueries(executionOptions, runner);
163+
} catch (const yexception& exception) {
164+
if (runnerOptions.YdbSettings.MonitoringEnabled) {
165+
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
166+
} else {
167+
throw exception;
150168
}
151169
}
152170

171+
if (runnerOptions.YdbSettings.MonitoringEnabled) {
172+
RunAsDaemon();
173+
}
174+
153175
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Finalization of kqp runner..." << colors.Default() << Endl;
154176
}
155177

0 commit comments

Comments
 (0)