Skip to content

Commit c22a37f

Browse files
authored
Merge 44bacec into 0d2f9d1
2 parents 0d2f9d1 + 44bacec commit c22a37f

26 files changed

+614
-60
lines changed

ydb/core/fq/libs/actors/clusters_from_connections.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,19 @@ std::pair<TString, bool> ParseHttpEndpoint(const TString& endpoint) {
8888
return std::make_pair(ToString(host), scheme != "http");
8989
}
9090

91+
std::pair<TString, TIpPort> ParseGrpcEndpoint(const TString& endpoint) {
92+
TStringBuf scheme;
93+
TStringBuf address;
94+
TStringBuf uri;
95+
NHttp::CrackURL(endpoint, scheme, address, uri);
96+
97+
TString hostname;
98+
TIpPort port;
99+
NHttp::CrackAddress(TString(address), hostname, port);
100+
101+
return {hostname, port};
102+
}
103+
91104
void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig,
92105
const TString& name,
93106
const TString& authToken,
@@ -230,8 +243,18 @@ void AddClustersFromConnections(
230243
clusterCfg->SetKind(NYql::EGenericDataSourceKind::YDB);
231244
clusterCfg->SetProtocol(NYql::EGenericProtocol::NATIVE);
232245
clusterCfg->SetName(connectionName);
233-
clusterCfg->SetDatabaseId(db.database_id());
234-
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
246+
if (const auto& databaseId = db.database_id()) {
247+
clusterCfg->SetDatabaseId(databaseId);
248+
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
249+
} else {
250+
const auto& [host, port] = ParseGrpcEndpoint(db.endpoint());
251+
252+
auto& endpoint = *clusterCfg->MutableEndpoint();
253+
endpoint.set_host(host);
254+
endpoint.set_port(port);
255+
clusterCfg->SetUseSsl(db.secure());
256+
clusterCfg->SetDatabaseName(db.database());
257+
}
235258
FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
236259
clusters.emplace(connectionName, GenericProviderName);
237260
break;

ydb/core/fq/libs/actors/pending_fetcher.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
157157
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
158158
const TString& tenantName,
159159
NActors::TMon* monitoring,
160-
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
160+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
161+
NYql::IPqGateway::TPtr defaultPqGateway
161162
)
162163
: YqSharedResources(yqSharedResources)
163164
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -180,6 +181,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
180181
, Monitoring(monitoring)
181182
, ComputeConfig(config.GetCompute())
182183
, S3ActorsFactory(std::move(s3ActorsFactory))
184+
, DefaultPqGateway(std::move(defaultPqGateway))
183185
{
184186
Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver));
185187
}
@@ -472,7 +474,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
472474
NProtoInterop::CastFromProto(task.result_ttl()),
473475
std::map<TString, Ydb::TypedValue>(task.parameters().begin(), task.parameters().end()),
474476
S3ActorsFactory,
475-
ComputeConfig.GetWorkloadManagerConfig(task.scope())
477+
ComputeConfig.GetWorkloadManagerConfig(task.scope()),
478+
DefaultPqGateway
476479
);
477480

478481
auto runActorId =
@@ -548,6 +551,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
548551
NActors::TMon* Monitoring;
549552
TComputeConfig ComputeConfig;
550553
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
554+
NYql::IPqGateway::TPtr DefaultPqGateway;
551555
};
552556

553557

@@ -567,7 +571,8 @@ NActors::IActor* CreatePendingFetcher(
567571
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
568572
const TString& tenantName,
569573
NActors::TMon* monitoring,
570-
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory)
574+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
575+
NYql::IPqGateway::TPtr defaultPqGateway)
571576
{
572577
return new TPendingFetcher(
573578
yqSharedResources,
@@ -585,7 +590,8 @@ NActors::IActor* CreatePendingFetcher(
585590
clientCounters,
586591
tenantName,
587592
monitoring,
588-
std::move(s3ActorsFactory));
593+
std::move(s3ActorsFactory),
594+
defaultPqGateway);
589595
}
590596

591597
TActorId MakePendingFetcherId(ui32 nodeId) {

ydb/core/fq/libs/actors/proxy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
1717
#include <yql/essentials/providers/common/metrics/service_counters.h>
1818
#include <ydb/library/yql/providers/pq/cm_client/client.h>
19+
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
1920
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
2021

2122
#include <ydb/public/lib/fq/scope.h>
@@ -53,7 +54,8 @@ NActors::IActor* CreatePendingFetcher(
5354
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
5455
const TString& tenantName,
5556
NActors::TMon* monitoring,
56-
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
57+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
58+
NYql::IPqGateway::TPtr defaultPqGateway
5759
);
5860

5961
NActors::IActor* CreateRunActor(

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1977,7 +1977,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
19771977
std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),
19781978
Params.FunctionRegistry
19791979
);
1980-
const auto pqGateway = NYql::CreatePqNativeGateway(pqServices);
1980+
const auto pqGateway = Params.DefaultPqGateway ? Params.DefaultPqGateway : NYql::CreatePqNativeGateway(pqServices);
19811981
dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver));
19821982
}
19831983

ydb/core/fq/libs/compute/common/run_actor_params.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ TRunActorParams::TRunActorParams(
6060
TDuration resultTtl,
6161
std::map<TString, Ydb::TypedValue>&& queryParameters,
6262
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
63-
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager
63+
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
64+
NYql::IPqGateway::TPtr defaultPqGateway
6465
)
6566
: YqSharedResources(yqSharedResources)
6667
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -117,6 +118,7 @@ TRunActorParams::TRunActorParams(
117118
, QueryParameters(std::move(queryParameters))
118119
, S3ActorsFactory(std::move(s3ActorsFactory))
119120
, WorkloadManager(workloadManager)
121+
, DefaultPqGateway(defaultPqGateway)
120122
{
121123
}
122124

ydb/core/fq/libs/compute/common/run_actor_params.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
1414
#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h>
1515
#include <ydb/library/yql/providers/pq/cm_client/client.h>
16+
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
1617
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_gateway.h>
1718
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
1819

@@ -79,7 +80,8 @@ struct TRunActorParams { // TODO2 : Change name
7980
TDuration resultTtl,
8081
std::map<TString, Ydb::TypedValue>&& queryParameters,
8182
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
82-
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager
83+
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
84+
NYql::IPqGateway::TPtr defaultPqGateway
8385
);
8486

8587
TRunActorParams(const TRunActorParams& params) = default;
@@ -145,6 +147,7 @@ struct TRunActorParams { // TODO2 : Change name
145147
std::map<TString, Ydb::TypedValue> QueryParameters;
146148
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
147149
::NFq::NConfig::TWorkloadManagerConfig WorkloadManager;
150+
NYql::IPqGateway::TPtr DefaultPqGateway;
148151
};
149152

150153
} /* NFq */

ydb/core/fq/libs/init/init.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ void Init(
6666
const IYqSharedResources::TPtr& iyqSharedResources,
6767
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
6868
ui32 icPort,
69-
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
69+
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
70+
NYql::IPqGateway::TPtr defaultPqGateway
7071
)
7172
{
7273
Y_ABORT_UNLESS(iyqSharedResources, "No YQ shared resources created");
@@ -204,7 +205,7 @@ void Init(
204205
credentialsFactory,
205206
tenant,
206207
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
207-
CreatePqNativeGateway(pqServices),
208+
defaultPqGateway ? defaultPqGateway : CreatePqNativeGateway(pqServices),
208209
appData->Mon,
209210
appData->Counters);
210211
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
@@ -225,7 +226,7 @@ void Init(
225226
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
226227
appData->FunctionRegistry
227228
);
228-
auto pqGateway = NYql::CreatePqNativeGateway(std::move(pqServices));
229+
auto pqGateway = defaultPqGateway ? defaultPqGateway : NYql::CreatePqNativeGateway(std::move(pqServices));
229230
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway,
230231
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());
231232

@@ -345,7 +346,8 @@ void Init(
345346
clientCounters,
346347
tenant,
347348
appData->Mon,
348-
s3ActorsFactory
349+
s3ActorsFactory,
350+
defaultPqGateway
349351
);
350352

351353
actorRegistrator(MakePendingFetcherId(nodeId), fetcher);

ydb/core/fq/libs/init/init.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <ydb/core/fq/libs/config/protos/audit.pb.h>
1313

1414
#include <ydb/library/yql/providers/pq/cm_client/client.h>
15+
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
1516

1617
#include <ydb/library/actors/core/actor.h>
1718

@@ -36,7 +37,8 @@ void Init(
3637
const IYqSharedResources::TPtr& yqSharedResources,
3738
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
3839
ui32 icPort,
39-
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
40+
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
41+
NYql::IPqGateway::TPtr defaultPqGateway = nullptr
4042
);
4143

4244
} // NFq

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace NYql {
1111
NThreading::TFuture<void> TDummyPqGateway::OpenSession(const TString& sessionId, const TString& username) {
1212
with_lock (Mutex) {
1313
Y_ENSURE(sessionId);
14-
Y_ENSURE(username);
14+
Y_UNUSED(username);
1515

1616
Y_ENSURE(!IsIn(OpenedSessions, sessionId), "Session " << sessionId << " is already opened in pq gateway");
1717
OpenedSessions.insert(sessionId);

ydb/tests/tools/fqrun/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@ sync_dir
22

33
*.log
44
*.sql
5+
*.conf
6+
*.parquet
7+
*.json

ydb/tests/tools/fqrun/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# FQ run tool
2+
3+
Tool can be used to execute streaming queries by using FQ proxy infrastructure.
4+
5+
## Examples
6+
7+
### Queries
8+
9+
* Run select 42:
10+
```(bash)
11+
./fqrun -s "SELECT 42"
12+
```
13+
14+
### Logs
15+
16+
* Setup log settings:
17+
```(bash)
18+
./fqrun -s "SELECT 42" --log-default=warn --log FQ_RUN_ACTOR=trace --log-file query.log
19+
```
20+
21+
### Cluster
22+
23+
* Embedded UI:
24+
```(bash)
25+
./fqrun -M 32000
26+
```
27+
28+
Monitoring endpoint: https://localhost:32000
29+
30+
* gRPC endpoint:
31+
```(bash)
32+
./fqrun -G 32000
33+
```
34+
35+
Connect with ydb CLI: `ydb -e grpc://localhost:32000 -d /Root`

0 commit comments

Comments
 (0)