Skip to content

Commit 130e37c

Browse files
authored
Merge 5468184 into 44b7587
2 parents 44b7587 + 5468184 commit 130e37c

File tree

11 files changed

+215
-72
lines changed

11 files changed

+215
-72
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include "common.h"
2+
3+
#include <ydb/core/kqp/ut/federated_query/common/common.h>
4+
#include <ydb-cpp-sdk/client/draft/ydb_scripting.h>
5+
#include <ydb-cpp-sdk/client/operation/operation.h>
6+
#include <ydb-cpp-sdk/client/proto/accessor.h>
7+
#include <ydb-cpp-sdk/client/table/table.h>
8+
#include <ydb-cpp-sdk/client/types/operation/operation.h>
9+
10+
#include <fmt/format.h>
11+
12+
namespace NKikimr::NKqp {
13+
14+
using namespace NYdb;
15+
using namespace NYdb::NQuery;
16+
using namespace NKikimr::NKqp::NFederatedQueryTest;
17+
using namespace fmt::literals;
18+
19+
TString Exec(const TString& cmd) {
20+
std::array<char, 128> buffer;
21+
TString result;
22+
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(cmd.c_str(), "r"), pclose);
23+
if (!pipe) {
24+
throw std::runtime_error("popen() failed!");
25+
}
26+
while (fgets(buffer.data(), static_cast<int>(buffer.size()), pipe.get()) != nullptr) {
27+
result += buffer.data();
28+
}
29+
return result;
30+
}
31+
32+
TString GetExternalPort(const TString& service, const TString& port) {
33+
auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose");
34+
auto composeFileYml = ArcadiaFromCurrentLocation(__SOURCE_FILE__, "docker-compose.yml");
35+
auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList<TString>();
36+
return result ? Strip(result.back()) : TString{};
37+
}
38+
39+
void WaitBucket(std::shared_ptr<TKikimrRunner> kikimr, const TString& externalDataSourceName) {
40+
auto db = kikimr->GetQueryClient();
41+
for (size_t i = 0; i < 100; i++) {
42+
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
43+
SELECT * FROM `{external_source}`.`/a/` WITH (
44+
format="json_each_row",
45+
schema(
46+
key Utf8 NOT NULL,
47+
value Utf8 NOT NULL
48+
)
49+
)
50+
)", "external_source"_a = externalDataSourceName)).ExtractValueSync();
51+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
52+
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());
53+
54+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
55+
if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) {
56+
return;
57+
}
58+
Sleep(TDuration::Seconds(1));
59+
}
60+
UNIT_FAIL("Bucket isn't ready");
61+
}
62+
63+
} // namespace NKikimr::NKqp
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
4+
5+
namespace NKikimr::NKqp {
6+
7+
TString Exec(const TString& cmd);
8+
9+
TString GetExternalPort(const TString& service, const TString& port);
10+
11+
void WaitBucket(std::shared_ptr<TKikimrRunner> kikimr, const TString& externalDataSourceName);
12+
13+
} // namespace NKikimr::NKqp

ydb/core/external_sources/s3/ut/s3_aws_credentials_ut.cpp

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "common.h"
2+
13
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
24
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
35
#include <ydb/core/kqp/ut/federated_query/common/common.h>
@@ -24,50 +26,6 @@ using namespace NYdb::NQuery;
2426
using namespace NKikimr::NKqp::NFederatedQueryTest;
2527
using namespace fmt::literals;
2628

27-
TString Exec(const TString& cmd) {
28-
std::array<char, 128> buffer;
29-
TString result;
30-
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(cmd.c_str(), "r"), pclose);
31-
if (!pipe) {
32-
throw std::runtime_error("popen() failed!");
33-
}
34-
while (fgets(buffer.data(), static_cast<int>(buffer.size()), pipe.get()) != nullptr) {
35-
result += buffer.data();
36-
}
37-
return result;
38-
}
39-
40-
TString GetExternalPort(const TString& service, const TString& port) {
41-
auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose");
42-
auto composeFileYml = ArcadiaFromCurrentLocation(__SOURCE_FILE__, "docker-compose.yml");
43-
auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList<TString>();
44-
return result ? Strip(result.back()) : TString{};
45-
}
46-
47-
void WaitBucket(std::shared_ptr<TKikimrRunner> kikimr, const TString& externalDataSourceName) {
48-
auto db = kikimr->GetQueryClient();
49-
for (size_t i = 0; i < 100; i++) {
50-
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
51-
SELECT * FROM `{external_source}`.`/a/` WITH (
52-
format="json_each_row",
53-
schema(
54-
key Utf8 NOT NULL,
55-
value Utf8 NOT NULL
56-
)
57-
)
58-
)", "external_source"_a = externalDataSourceName)).ExtractValueSync();
59-
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
60-
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());
61-
62-
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
63-
if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) {
64-
return;
65-
}
66-
Sleep(TDuration::Seconds(1));
67-
}
68-
UNIT_FAIL("Bucket isn't ready");
69-
}
70-
7129
Y_UNIT_TEST_SUITE(S3AwsCredentials) {
7230
Y_UNIT_TEST(ExecuteScriptWithEqSymbol) {
7331
const TString externalDataSourceName = "/Root/external_data_source";
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#include "common.h"
2+
3+
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
4+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
5+
#include <ydb/core/kqp/ut/federated_query/common/common.h>
6+
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
7+
#include <yql/essentials/utils/log/log.h>
8+
#include <ydb-cpp-sdk/client/draft/ydb_scripting.h>
9+
#include <ydb-cpp-sdk/client/operation/operation.h>
10+
#include <ydb-cpp-sdk/client/proto/accessor.h>
11+
#include <ydb-cpp-sdk/client/table/table.h>
12+
#include <ydb-cpp-sdk/client/types/operation/operation.h>
13+
14+
#include <library/cpp/testing/unittest/registar.h>
15+
16+
#include <util/generic/strbuf.h>
17+
#include <util/generic/string.h>
18+
#include <util/system/env.h>
19+
20+
#include <fmt/format.h>
21+
22+
namespace NKikimr::NKqp {
23+
24+
using namespace NYdb;
25+
using namespace NYdb::NQuery;
26+
using namespace NKikimr::NKqp::NFederatedQueryTest;
27+
using namespace fmt::literals;
28+
29+
Y_UNIT_TEST_SUITE(S3Inset) {
30+
Y_UNIT_TEST(TestInsertEscaping) {
31+
const TString externalDataSourceName = "/Root/external_data_source";
32+
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
33+
auto kikimr = MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, s3ActorsFactory);
34+
35+
auto tc = kikimr->GetTableClient();
36+
auto session = tc.CreateSession().GetValueSync().GetSession();
37+
const TString query = fmt::format(R"(
38+
CREATE OBJECT id (TYPE SECRET) WITH (value=`minio`);
39+
CREATE OBJECT key (TYPE SECRET) WITH (value=`minio123`);
40+
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
41+
SOURCE_TYPE="ObjectStorage",
42+
LOCATION="{location}",
43+
AUTH_METHOD="AWS",
44+
AWS_ACCESS_KEY_ID_SECRET_NAME="id",
45+
AWS_SECRET_ACCESS_KEY_SECRET_NAME="key",
46+
AWS_REGION="ru-central-1"
47+
);
48+
)",
49+
"external_source"_a = externalDataSourceName,
50+
"location"_a = "localhost:" + GetExternalPort("minio", "9000") + "/datalake/"
51+
);
52+
53+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
54+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
55+
56+
WaitBucket(kikimr, externalDataSourceName);
57+
58+
auto db = kikimr->GetQueryClient();
59+
60+
// TODO: remove ';' from skip list
61+
TString path = TStringBuilder() << "exp_folder/some_" << EscapeC(GetSymbolsString(' ', '~', "*?{}`;")) << "\\`";
62+
63+
{
64+
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
65+
PRAGMA s3.AtomicUploadCommit = "true";
66+
INSERT INTO `{external_source}`.`{path}/` WITH (FORMAT = "csv_with_names")
67+
SELECT * FROM `{external_source}`.`/a/` WITH (
68+
format="json_each_row",
69+
schema(
70+
key Utf8 NOT NULL,
71+
value Utf8 NOT NULL
72+
)
73+
)
74+
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
75+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
76+
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());
77+
78+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
79+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
80+
}
81+
82+
{
83+
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
84+
SELECT * FROM `{external_source}`.`{path}/` WITH (
85+
format="csv_with_names",
86+
schema(
87+
key Utf8 NOT NULL,
88+
value Utf8 NOT NULL
89+
)
90+
)
91+
)", "external_source"_a = externalDataSourceName, "path"_a = path)).ExtractValueSync();
92+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
93+
UNIT_ASSERT(!scriptExecutionOperation.Metadata().ExecutionId.empty());
94+
95+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
96+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
97+
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
98+
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
99+
100+
TResultSetParser resultSet(results.ExtractResultSet());
101+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
102+
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
103+
UNIT_ASSERT(resultSet.TryNextRow());
104+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
105+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
106+
UNIT_ASSERT(resultSet.TryNextRow());
107+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
108+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
109+
}
110+
}
111+
}
112+
113+
} // namespace NKikimr::NKqp

ydb/core/external_sources/s3/ut/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ IF (OPENSOURCE)
5252
ENDIF()
5353

5454
SRCS(
55+
common.cpp
5556
s3_aws_credentials_ut.cpp
57+
s3_insert_ut.cpp
5658
)
5759

5860
PEERDIR(

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,16 @@
33
#include <library/cpp/testing/unittest/registar.h>
44

55
namespace NKikimr::NKqp::NFederatedQueryTest {
6+
TString GetSymbolsString(char start, char end, const TString& skip) {
7+
TStringBuilder result;
8+
for (char symbol = start; symbol <= end; ++symbol) {
9+
if (skip.Contains(symbol)) {
10+
continue;
11+
}
12+
result << symbol;
13+
}
14+
return result;
15+
}
616

717
NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
818
NYdb::NOperation::TOperationClient client(ydbDriver);

ydb/core/kqp/ut/federated_query/common/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
namespace NKikimr::NKqp::NFederatedQueryTest {
99
using namespace NKikimr::NKqp;
1010

11+
TString GetSymbolsString(char start, char end, const TString& skip = "");
12+
1113
NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(
1214
const NYdb::TOperation::TOperationId& operationId,
1315
const NYdb::TDriver& ydbDriver);

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,6 @@ using namespace NTestUtils;
2020
using namespace fmt::literals;
2121

2222
Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
23-
TString GetSymbolsString(char start, char end, const TString& skip = "") {
24-
TStringBuilder result;
25-
for (char symbol = start; symbol <= end; ++symbol) {
26-
if (skip.Contains(symbol)) {
27-
continue;
28-
}
29-
result << symbol;
30-
}
31-
return result;
32-
}
33-
3423
Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) {
3524
const TString externalDataSourceName = "/Root/external_data_source";
3625
const TString externalTableName = "/Root/test_binding_resolve";

ydb/library/yql/providers/common/http_gateway/yql_aws_signature.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,12 @@ TString TAwsSignature::HashSHA256(TStringBuf data) {
138138
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
139139
}
140140

141-
TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash) {
141+
TString TAwsSignature::UriEncode(const TStringBuf input, bool encodeSlash, bool encodePercent) {
142142
TStringStream result;
143143
for (const char ch : input) {
144144
if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || ch == '_' ||
145-
ch == '-' || ch == '~' || ch == '.') {
145+
ch == '-' || ch == '~' || ch == '.' || (ch == '/' && !encodeSlash) || (ch == '%' && !encodePercent)) {
146146
result << ch;
147-
} else if (ch == '/') {
148-
if (encodeSlash) {
149-
result << "%2F";
150-
} else {
151-
result << ch;
152-
}
153147
} else {
154148
result << "%" << HexEncode(&ch, 1);
155149
}
@@ -175,11 +169,10 @@ void TAwsSignature::PrepareCgiParameters() {
175169

176170
auto printSingleParam = [&canonicalCgi](const TString& key, const TVector<TString>& values) {
177171
auto it = values.begin();
178-
canonicalCgi << UriEncode(key, true) << "=" << UriEncode(*it, true);
172+
canonicalCgi << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
179173
while (++it != values.end()) {
180-
canonicalCgi << "&" << UriEncode(key, true) << "=" << UriEncode(*it, true);
174+
canonicalCgi << "&" << UriEncode(key, true, true) << "=" << UriEncode(*it, true, true);
181175
}
182-
183176
};
184177

185178
auto it = sortedCgi.begin();

ydb/library/yql/providers/common/http_gateway/yql_aws_signature.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ struct TAwsSignature {
3232

3333
static TString HashSHA256(TStringBuf data);
3434

35-
static TString UriEncode(const TStringBuf input, bool encodeSlash = false);
35+
static TString UriEncode(const TStringBuf input, bool encodeSlash = false, bool encodePercent = false);
3636

3737
void PrepareCgiParameters();
3838

ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ struct TCompleteMultipartUpload {
4949
}
5050

5151
TString BuildUrl() const {
52-
TUrlBuilder urlBuilder(Url);
52+
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
5353
urlBuilder.AddUrlParam("uploadId", UploadId);
5454
return urlBuilder.Build();
5555
}
@@ -87,7 +87,7 @@ struct TListMultipartUploads {
8787
// This requirement will be fixed in the curl library
8888
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
8989
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
90-
TUrlBuilder urlBuilder(Url);
90+
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
9191
if (KeyMarker) {
9292
urlBuilder.AddUrlParam("key-marker", KeyMarker);
9393
}
@@ -114,7 +114,7 @@ struct TAbortMultipartUpload {
114114
}
115115

116116
TString BuildUrl() const {
117-
TUrlBuilder urlBuilder(Url);
117+
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
118118
urlBuilder.AddUrlParam("uploadId", UploadId);
119119
return urlBuilder.Build();
120120
}
@@ -141,7 +141,7 @@ struct TListParts {
141141
// This requirement will be fixed in the curl library
142142
// https://github.com/curl/curl/commit/fc76a24c53b08cdf6eec8ba787d8eac64651d56e
143143
// https://github.com/curl/curl/commit/c87920353883ef9d5aa952e724a8e2589d76add5
144-
TUrlBuilder urlBuilder(Url);
144+
TUrlBuilder urlBuilder(NS3Util::UrlEscapeRet(Url));
145145
if (PartNumberMarker) {
146146
urlBuilder.AddUrlParam("part-number-marker", PartNumberMarker);
147147
}
@@ -682,4 +682,4 @@ THolder<NActors::IActor> MakeS3ApplicatorActor(
682682
);
683683
}
684684

685-
} // namespace NYql::NDq
685+
} // namespace NYql::NDq

0 commit comments

Comments
 (0)