Skip to content

YQ-3561 FQrun supported connections / bindings #14552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ std::pair<TString, bool> ParseHttpEndpoint(const TString& endpoint) {
return std::make_pair(ToString(host), scheme != "http");
}

std::pair<TString, TIpPort> ParseGrpcEndpoint(const TString& endpoint) {
TStringBuf scheme;
TStringBuf address;
TStringBuf uri;
NHttp::CrackURL(endpoint, scheme, address, uri);

TString hostname;
TIpPort port;
NHttp::CrackAddress(TString(address), hostname, port);

return {hostname, port};
}

void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig,
const TString& name,
const TString& authToken,
Expand Down Expand Up @@ -230,8 +243,18 @@ void AddClustersFromConnections(
clusterCfg->SetKind(NYql::EGenericDataSourceKind::YDB);
clusterCfg->SetProtocol(NYql::EGenericProtocol::NATIVE);
clusterCfg->SetName(connectionName);
clusterCfg->SetDatabaseId(db.database_id());
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
if (const auto& databaseId = db.database_id()) {
clusterCfg->SetDatabaseId(databaseId);
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
} else {
const auto& [host, port] = ParseGrpcEndpoint(db.endpoint());

auto& endpoint = *clusterCfg->MutableEndpoint();
endpoint.set_host(host);
endpoint.set_port(port);
clusterCfg->SetUseSsl(db.secure());
clusterCfg->SetDatabaseName(db.database());
}
FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
clusters.emplace(connectionName, GenericProviderName);
break;
Expand Down
14 changes: 10 additions & 4 deletions ydb/core/fq/libs/actors/pending_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
const TString& tenantName,
NActors::TMon* monitoring,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
NYql::IPqGateway::TPtr defaultPqGateway
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
Expand All @@ -180,6 +181,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
, Monitoring(monitoring)
, ComputeConfig(config.GetCompute())
, S3ActorsFactory(std::move(s3ActorsFactory))
, DefaultPqGateway(std::move(defaultPqGateway))
{
Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver));
}
Expand Down Expand Up @@ -472,7 +474,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
NProtoInterop::CastFromProto(task.result_ttl()),
std::map<TString, Ydb::TypedValue>(task.parameters().begin(), task.parameters().end()),
S3ActorsFactory,
ComputeConfig.GetWorkloadManagerConfig(task.scope())
ComputeConfig.GetWorkloadManagerConfig(task.scope()),
DefaultPqGateway
);

auto runActorId =
Expand Down Expand Up @@ -548,6 +551,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
NActors::TMon* Monitoring;
TComputeConfig ComputeConfig;
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
NYql::IPqGateway::TPtr DefaultPqGateway;
};


Expand All @@ -567,7 +571,8 @@ NActors::IActor* CreatePendingFetcher(
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
const TString& tenantName,
NActors::TMon* monitoring,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory)
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
NYql::IPqGateway::TPtr defaultPqGateway)
{
return new TPendingFetcher(
yqSharedResources,
Expand All @@ -585,7 +590,8 @@ NActors::IActor* CreatePendingFetcher(
clientCounters,
tenantName,
monitoring,
std::move(s3ActorsFactory));
std::move(s3ActorsFactory),
defaultPqGateway);
}

TActorId MakePendingFetcherId(ui32 nodeId) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/actors/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <yql/essentials/providers/common/metrics/service_counters.h>
#include <ydb/library/yql/providers/pq/cm_client/client.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>

#include <ydb/public/lib/fq/scope.h>
Expand Down Expand Up @@ -53,7 +54,8 @@ NActors::IActor* CreatePendingFetcher(
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
const TString& tenantName,
NActors::TMon* monitoring,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
NYql::IPqGateway::TPtr defaultPqGateway
);

NActors::IActor* CreateRunActor(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1977,7 +1977,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),
Params.FunctionRegistry
);
const auto pqGateway = NYql::CreatePqNativeGateway(pqServices);
const auto pqGateway = Params.DefaultPqGateway ? Params.DefaultPqGateway : NYql::CreatePqNativeGateway(pqServices);
dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver));
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/compute/common/run_actor_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ TRunActorParams::TRunActorParams(
TDuration resultTtl,
std::map<TString, Ydb::TypedValue>&& queryParameters,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
NYql::IPqGateway::TPtr defaultPqGateway
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
Expand Down Expand Up @@ -117,6 +118,7 @@ TRunActorParams::TRunActorParams(
, QueryParameters(std::move(queryParameters))
, S3ActorsFactory(std::move(s3ActorsFactory))
, WorkloadManager(workloadManager)
, DefaultPqGateway(defaultPqGateway)
{
}

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/compute/common/run_actor_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h>
#include <ydb/library/yql/providers/pq/cm_client/client.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_gateway.h>
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>

Expand Down Expand Up @@ -79,7 +80,8 @@ struct TRunActorParams { // TODO2 : Change name
TDuration resultTtl,
std::map<TString, Ydb::TypedValue>&& queryParameters,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
NYql::IPqGateway::TPtr defaultPqGateway
);

TRunActorParams(const TRunActorParams& params) = default;
Expand Down Expand Up @@ -145,6 +147,7 @@ struct TRunActorParams { // TODO2 : Change name
std::map<TString, Ydb::TypedValue> QueryParameters;
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
::NFq::NConfig::TWorkloadManagerConfig WorkloadManager;
NYql::IPqGateway::TPtr DefaultPqGateway;
};

} /* NFq */
10 changes: 6 additions & 4 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void Init(
const IYqSharedResources::TPtr& iyqSharedResources,
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
ui32 icPort,
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
NYql::IPqGateway::TPtr defaultPqGateway
)
{
Y_ABORT_UNLESS(iyqSharedResources, "No YQ shared resources created");
Expand Down Expand Up @@ -204,7 +205,7 @@ void Init(
credentialsFactory,
tenant,
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
CreatePqNativeGateway(pqServices),
defaultPqGateway ? defaultPqGateway : CreatePqNativeGateway(pqServices),
appData->Mon,
appData->Counters);
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
Expand All @@ -225,7 +226,7 @@ void Init(
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
appData->FunctionRegistry
);
auto pqGateway = NYql::CreatePqNativeGateway(std::move(pqServices));
auto pqGateway = defaultPqGateway ? defaultPqGateway : NYql::CreatePqNativeGateway(std::move(pqServices));
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway,
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());

Expand Down Expand Up @@ -345,7 +346,8 @@ void Init(
clientCounters,
tenant,
appData->Mon,
s3ActorsFactory
s3ActorsFactory,
defaultPqGateway
);

actorRegistrator(MakePendingFetcherId(nodeId), fetcher);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/init/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <ydb/core/fq/libs/config/protos/audit.pb.h>

#include <ydb/library/yql/providers/pq/cm_client/client.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -36,7 +37,8 @@ void Init(
const IYqSharedResources::TPtr& yqSharedResources,
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
ui32 icPort,
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
NYql::IPqGateway::TPtr defaultPqGateway = nullptr
);

} // NFq
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NYql {
NThreading::TFuture<void> TDummyPqGateway::OpenSession(const TString& sessionId, const TString& username) {
with_lock (Mutex) {
Y_ENSURE(sessionId);
Y_ENSURE(username);
Y_UNUSED(username);

Y_ENSURE(!IsIn(OpenedSessions, sessionId), "Session " << sessionId << " is already opened in pq gateway");
OpenedSessions.insert(sessionId);
Expand Down
3 changes: 3 additions & 0 deletions ydb/tests/tools/fqrun/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ sync_dir

*.log
*.sql
*.conf
*.parquet
*.json
35 changes: 35 additions & 0 deletions ydb/tests/tools/fqrun/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# FQ run tool

Tool can be used to execute streaming queries by using FQ proxy infrastructure.

## Examples

### Queries

* Run select 42:
```(bash)
./fqrun -s "SELECT 42"
```

### Logs

* Setup log settings:
```(bash)
./fqrun -s "SELECT 42" --log-default=warn --log FQ_RUN_ACTOR=trace --log-file query.log
```

### Cluster

* Embedded UI:
```(bash)
./fqrun -M 32000
```

Monitoring endpoint: https://localhost:32000

* gRPC endpoint:
```(bash)
./fqrun -G 32000
```

Connect with ydb CLI: `ydb -e grpc://localhost:32000 -d /Root`
Loading
Loading