Skip to content

YDB FQ: support Managed Greenplum #5090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,44 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
Parsers[NYql::EDatabaseType::Greenplum] = [](
NJson::TJsonValue& databaseInfo,
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
bool useTls,
NConnector::NApi::EProtocol protocol
) {
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
TString aliveHost;

for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) {
const auto& hostMap = host.GetMap();

if (hostMap.at("health").GetString() != "ALIVE"){
// Host is not alive, skip it
continue;

}

// If the host is alive, add it to the list of alive hosts
aliveHost = hostMap.at("name").GetString();
break;
}

if (aliveHost == "") {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE Greenplum hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
.DatabaseType = NYql::EDatabaseType::Greenplum,
.MdbHost = aliveHost,
.UseTls = useTls,
.Protocol = protocol,
};

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
}
Expand Down Expand Up @@ -500,13 +538,20 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL }, databaseType)) {
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")
.AddPathComponent(databaseId)
.AddPathComponent("hosts")
.Build();
} else if (NYql::EDatabaseType::Greenplum == databaseType) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")
.AddPathComponent(databaseId)
.AddPathComponent("master-hosts")
.Build();
}

NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url);
Expand Down
63 changes: 63 additions & 0 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,69 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
issues
);
}
Y_UNIT_TEST(Greenplum_MasterNode) {
Test(
NYql::EDatabaseType::Greenplum,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-greenplum/v1/clusters/etn021us5r9rhld1vgbh/master-hosts",
"200",
R"({
"hosts": [
{
"resources": {
"resourcePresetId": "s3-c8-m32",
"diskSize": "395136991232",
"diskTypeId": "local-ssd"
},
"assignPublicIp": false,
"name": "rc1d-51jc89m9q72vcdkn.mdb.yandexcloud.net",
"clusterId": "c9qfrvbs21vo0a56s5hm",
"zoneId": "ru-central1-d",
"type": "MASTER",
"health": "ALIVE",
"subnetId": "fl8vtt2td9qbtlqdj5ji"
}
]
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{""},
TString{"rc1d-51jc89m9q72vcdkn.db.yandex.net"},
6432,
TString(""),
true},
{});
}

Y_UNIT_TEST(Greenplum_PermissionDenied) {
NYql::TIssues issues{
NYql::TIssue(
TStringBuilder{} << MakeErrorPrefix(
"mdb.api.cloud.yandex.net:443",
"/managed-greenplum/v1/clusters/etn021us5r9rhld1vgbh/master-hosts",
"etn021us5r9rhld1vgbh",
NYql::EDatabaseType::Greenplum)
<< NoPermissionStr)};

Test(
NYql::EDatabaseType::Greenplum,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-greenplum/v1/clusters/etn021us5r9rhld1vgbh/master-hosts",
"403",
R"(
{
"code": 7,
"message": "Permission denied",
"details": [
{
"@type": "type.googleapis.com/google.rpc.RequestInfo",
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
}
]
}
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues);
}

Y_UNIT_TEST(DataStreams_PermissionDenied) {
NYql::TIssues issues{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace NFq {
// Managed PostgreSQL provides the only port both for secure and insecure connections
constexpr ui32 POSTGRESQL_PORT = 6432;

constexpr ui32 GREENPLUM_PORT = 6432;

// TMdbEndpointGeneratorLegacy implements behavior required by YQL legacy ClickHouse provider
class TMdbEndpointGeneratorLegacy: public NYql::IMdbEndpointGenerator {
TEndpoint ToEndpoint(const NYql::IMdbEndpointGenerator::TParams& params) const override {
Expand Down Expand Up @@ -71,7 +73,15 @@ namespace NFq {
case NYql::NConnector::NApi::EProtocol::NATIVE:
return TEndpoint(fixedHost, POSTGRESQL_PORT);
default:
ythrow yexception() << "Unexpected protocol for PostgreSQL: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
ythrow yexception() << "Unexpected protocol for PostgreSQL " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
case NYql::EDatabaseType::Greenplum:
// https://cloud.yandex.ru/docs/managed-postgresql/operations/connect
switch (params.Protocol) {
case NYql::NConnector::NApi::EProtocol::NATIVE:
return TEndpoint(fixedHost, GREENPLUM_PORT);
default:
ythrow yexception() << "Unexpected protocol for Greenplum: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
default:
ythrow yexception() << "Unexpected database type: " << ToString(params.DatabaseType);
Expand Down
Loading