Skip to content

Commit a378163

Browse files
authored
YQ-4108 Add topic session thread num settings (#14532)
1 parent adbecd5 commit a378163

25 files changed

+180
-45
lines changed

ydb/core/fq/libs/actors/pending_fetcher.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
158158
const TString& tenantName,
159159
NActors::TMon* monitoring,
160160
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
161-
NYql::IPqGateway::TPtr defaultPqGateway
161+
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
162162
)
163163
: YqSharedResources(yqSharedResources)
164164
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -181,7 +181,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
181181
, Monitoring(monitoring)
182182
, ComputeConfig(config.GetCompute())
183183
, S3ActorsFactory(std::move(s3ActorsFactory))
184-
, DefaultPqGateway(std::move(defaultPqGateway))
184+
, PqGatewayFactory(std::move(pqGatewayFactory))
185185
{
186186
Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver));
187187
}
@@ -475,7 +475,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
475475
std::map<TString, Ydb::TypedValue>(task.parameters().begin(), task.parameters().end()),
476476
S3ActorsFactory,
477477
ComputeConfig.GetWorkloadManagerConfig(task.scope()),
478-
DefaultPqGateway
478+
PqGatewayFactory
479479
);
480480

481481
auto runActorId =
@@ -551,7 +551,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
551551
NActors::TMon* Monitoring;
552552
TComputeConfig ComputeConfig;
553553
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
554-
NYql::IPqGateway::TPtr DefaultPqGateway;
554+
NYql::IPqGatewayFactory::TPtr PqGatewayFactory;
555555
};
556556

557557

@@ -572,7 +572,7 @@ NActors::IActor* CreatePendingFetcher(
572572
const TString& tenantName,
573573
NActors::TMon* monitoring,
574574
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
575-
NYql::IPqGateway::TPtr defaultPqGateway)
575+
NYql::IPqGatewayFactory::TPtr pqGatewayFactory)
576576
{
577577
return new TPendingFetcher(
578578
yqSharedResources,
@@ -591,7 +591,7 @@ NActors::IActor* CreatePendingFetcher(
591591
tenantName,
592592
monitoring,
593593
std::move(s3ActorsFactory),
594-
defaultPqGateway);
594+
std::move(pqGatewayFactory));
595595
}
596596

597597
TActorId MakePendingFetcherId(ui32 nodeId) {

ydb/core/fq/libs/actors/proxy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ NActors::IActor* CreatePendingFetcher(
5555
const TString& tenantName,
5656
NActors::TMon* monitoring,
5757
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
58-
NYql::IPqGateway::TPtr defaultPqGateway
58+
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
5959
);
6060

6161
NActors::IActor* CreateRunActor(

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
571571
SelfId(),
572572
Params.QueryId,
573573
Params.YqSharedResources->UserSpaceYdbDriver,
574+
Params.PqGatewayFactory->CreatePqGateway(),
574575
Params.Resources.topic_consumers(),
575576
PrepareReadRuleCredentials()
576577
)
@@ -1435,6 +1436,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
14351436
SelfId(),
14361437
Params.QueryId,
14371438
Params.YqSharedResources->UserSpaceYdbDriver,
1439+
Params.PqGatewayFactory->CreatePqGateway(),
14381440
Params.Resources.topic_consumers(),
14391441
PrepareReadRuleCredentials()
14401442
)
@@ -1970,14 +1972,8 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
19701972
}
19711973

19721974
{
1973-
NYql::TPqGatewayServices pqServices(
1974-
Params.YqSharedResources->UserSpaceYdbDriver,
1975-
Params.PqCmConnections,
1976-
Params.CredentialsFactory,
1977-
std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),
1978-
Params.FunctionRegistry
1979-
);
1980-
const auto pqGateway = Params.DefaultPqGateway ? Params.DefaultPqGateway : NYql::CreatePqNativeGateway(pqServices);
1975+
auto pqGateway = Params.PqGatewayFactory->CreatePqGateway();
1976+
pqGateway->UpdateClusterConfigs(std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()));
19811977
dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver));
19821978
}
19831979

ydb/core/fq/libs/compute/common/run_actor_params.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ TRunActorParams::TRunActorParams(
6161
std::map<TString, Ydb::TypedValue>&& queryParameters,
6262
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
6363
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
64-
NYql::IPqGateway::TPtr defaultPqGateway
64+
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
6565
)
6666
: YqSharedResources(yqSharedResources)
6767
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -118,7 +118,7 @@ TRunActorParams::TRunActorParams(
118118
, QueryParameters(std::move(queryParameters))
119119
, S3ActorsFactory(std::move(s3ActorsFactory))
120120
, WorkloadManager(workloadManager)
121-
, DefaultPqGateway(defaultPqGateway)
121+
, PqGatewayFactory(std::move(pqGatewayFactory))
122122
{
123123
}
124124

ydb/core/fq/libs/compute/common/run_actor_params.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ struct TRunActorParams { // TODO2 : Change name
8181
std::map<TString, Ydb::TypedValue>&& queryParameters,
8282
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
8383
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
84-
NYql::IPqGateway::TPtr defaultPqGateway
84+
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
8585
);
8686

8787
TRunActorParams(const TRunActorParams& params) = default;
@@ -147,7 +147,7 @@ struct TRunActorParams { // TODO2 : Change name
147147
std::map<TString, Ydb::TypedValue> QueryParameters;
148148
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
149149
::NFq::NConfig::TWorkloadManagerConfig WorkloadManager;
150-
NYql::IPqGateway::TPtr DefaultPqGateway;
150+
NYql::IPqGatewayFactory::TPtr PqGatewayFactory;
151151
};
152152

153153
} /* NFq */

ydb/core/fq/libs/config/protos/common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ message TCommonConfig {
3232
bool ShowQueryTimeline = 16;
3333
uint64 MaxQueryTimelineSize = 17; // default: 200KB
3434
string PqReconnectPeriod = 18; // default: disabled
35+
uint32 TopicClientHandlersExecutorThreadsNum = 19; // default: 0 that means use default from TopicClientSettings (1)
36+
uint32 TopicClientCompressionExecutorThreadsNum = 20; // default: 0 that means use default from TopicClientSettings (2)
3537
}

ydb/core/fq/libs/init/init.cpp

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ namespace NFq {
5656

5757
using namespace NKikimr;
5858

59+
NYdb::NTopic::TTopicClientSettings GetCommonTopicClientSettings(const NFq::NConfig::TCommonConfig& config) {
60+
NYdb::NTopic::TTopicClientSettings settings;
61+
if (config.GetTopicClientHandlersExecutorThreadsNum()) {
62+
settings.DefaultHandlersExecutor(NYdb::NTopic::CreateThreadPoolExecutor(config.GetTopicClientHandlersExecutorThreadsNum()));
63+
}
64+
if (config.GetTopicClientCompressionExecutorThreadsNum()) {
65+
settings.DefaultCompressionExecutor(NYdb::NTopic::CreateThreadPoolExecutor(config.GetTopicClientCompressionExecutorThreadsNum()));
66+
}
67+
return settings;
68+
}
69+
5970
void Init(
6071
const NFq::NConfig::TConfig& protoConfig,
6172
ui32 nodeId,
@@ -67,7 +78,7 @@ void Init(
6778
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
6879
ui32 icPort,
6980
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
70-
NYql::IPqGateway::TPtr defaultPqGateway
81+
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
7182
)
7283
{
7384
Y_ABORT_UNLESS(iyqSharedResources, "No YQ shared resources created");
@@ -190,22 +201,26 @@ void Init(
190201
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize());
191202
}
192203

204+
auto commonTopicClientSettings = GetCommonTopicClientSettings(protoConfig.GetCommon());
205+
193206
if (protoConfig.GetRowDispatcher().GetEnabled()) {
194207
NYql::TPqGatewayServices pqServices(
195208
yqSharedResources->UserSpaceYdbDriver,
196209
nullptr,
197210
nullptr,
198211
std::make_shared<NYql::TPqGatewayConfig>(),
199-
nullptr);
200-
212+
nullptr,
213+
nullptr,
214+
commonTopicClientSettings
215+
);
201216
auto rowDispatcher = NFq::NewRowDispatcherService(
202217
protoConfig.GetRowDispatcher(),
203218
NKikimr::CreateYdbCredentialsProviderFactory,
204219
yqSharedResources,
205220
credentialsFactory,
206221
tenant,
207222
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
208-
defaultPqGateway ? defaultPqGateway : CreatePqNativeGateway(pqServices),
223+
pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : CreatePqNativeGateway(pqServices),
209224
appData->Mon,
210225
appData->Counters);
211226
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
@@ -224,9 +239,11 @@ void Init(
224239
pqCmConnections,
225240
credentialsFactory,
226241
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
227-
appData->FunctionRegistry
242+
appData->FunctionRegistry,
243+
nullptr,
244+
commonTopicClientSettings
228245
);
229-
auto pqGateway = defaultPqGateway ? defaultPqGateway : NYql::CreatePqNativeGateway(std::move(pqServices));
246+
auto pqGateway = pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : NYql::CreatePqNativeGateway(std::move(pqServices));
230247
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway,
231248
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());
232249

@@ -330,6 +347,15 @@ void Init(
330347
}
331348

332349
if (protoConfig.GetPendingFetcher().GetEnabled()) {
350+
NYql::TPqGatewayServices pqServices(
351+
yqSharedResources->UserSpaceYdbDriver,
352+
pqCmConnections,
353+
credentialsFactory,
354+
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
355+
appData->FunctionRegistry,
356+
nullptr,
357+
commonTopicClientSettings
358+
);
333359
auto fetcher = CreatePendingFetcher(
334360
yqSharedResources,
335361
NKikimr::CreateYdbCredentialsProviderFactory,
@@ -347,7 +373,7 @@ void Init(
347373
tenant,
348374
appData->Mon,
349375
s3ActorsFactory,
350-
defaultPqGateway
376+
pqGatewayFactory ? pqGatewayFactory : NYql::CreatePqNativeGatewayFactory(pqServices)
351377
);
352378

353379
actorRegistrator(MakePendingFetcherId(nodeId), fetcher);

ydb/core/fq/libs/init/init.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ void Init(
3838
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
3939
ui32 icPort,
4040
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
41-
NYql::IPqGateway::TPtr defaultPqGateway = nullptr
41+
NYql::IPqGatewayFactory::TPtr pqGatewayFactory = nullptr
4242
);
4343

4444
} // NFq

ydb/core/fq/libs/read_rule/read_rule_creator.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>
7272
NActors::TActorId owner,
7373
TString queryId,
7474
NYdb::TDriver ydbDriver,
75+
const NYql::IPqGateway::TPtr& pqGateway,
7576
const Fq::Private::TopicConsumer& topicConsumer,
7677
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider,
7778
ui64 index
@@ -80,6 +81,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>
8081
, QueryId(std::move(queryId))
8182
, TopicConsumer(topicConsumer)
8283
, YdbDriver(std::move(ydbDriver))
84+
, PqGateway(pqGateway)
8385
, TopicClient(YdbDriver, GetTopicClientSettings(std::move(credentialsProvider)))
8486
, Index(index)
8587
{
@@ -184,7 +186,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>
184186

185187
private:
186188
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider) {
187-
return NYdb::NTopic::TTopicClientSettings()
189+
return PqGateway->GetTopicClientSettings()
188190
.Database(TopicConsumer.database())
189191
.DiscoveryEndpoint(TopicConsumer.cluster_endpoint())
190192
.CredentialsProviderFactory(std::move(credentialsProvider))
@@ -197,6 +199,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>
197199
const TString QueryId;
198200
const Fq::Private::TopicConsumer TopicConsumer;
199201
NYdb::TDriver YdbDriver;
202+
NYql::IPqGateway::TPtr PqGateway;
200203
NYdb::NTopic::TTopicClient TopicClient;
201204
ui64 Index = 0;
202205
NYdb::NTopic::IRetryPolicy::IRetryState::TPtr RetryState;
@@ -211,12 +214,14 @@ class TReadRuleCreator : public TActorBootstrapped<TReadRuleCreator> {
211214
NActors::TActorId owner,
212215
TString queryId,
213216
NYdb::TDriver ydbDriver,
217+
const NYql::IPqGateway::TPtr& pqGateway,
214218
const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
215219
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials
216220
)
217221
: Owner(owner)
218222
, QueryId(std::move(queryId))
219223
, YdbDriver(std::move(ydbDriver))
224+
, PqGateway(pqGateway)
220225
, TopicConsumers(VectorFromProto(topicConsumers))
221226
, Credentials(std::move(credentials))
222227
{
@@ -233,7 +238,7 @@ class TReadRuleCreator : public TActorBootstrapped<TReadRuleCreator> {
233238
Results.reserve(TopicConsumers.size());
234239
for (size_t i = 0; i < TopicConsumers.size(); ++i) {
235240
LOG_D("Create read rule creation actor for `" << TopicConsumers[i].topic_path() << "` [" << i << "]");
236-
Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, TopicConsumers[i], Credentials[i], i)));
241+
Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, PqGateway, TopicConsumers[i], Credentials[i], i)));
237242
}
238243
}
239244

@@ -282,6 +287,7 @@ class TReadRuleCreator : public TActorBootstrapped<TReadRuleCreator> {
282287
const NActors::TActorId Owner;
283288
const TString QueryId;
284289
NYdb::TDriver YdbDriver;
290+
NYql::IPqGateway::TPtr PqGateway;
285291
const TVector<Fq::Private::TopicConsumer> TopicConsumers;
286292
const TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> Credentials;
287293
size_t ResultsGot = 0;
@@ -296,6 +302,7 @@ NActors::IActor* MakeReadRuleCreatorActor(
296302
NActors::TActorId owner,
297303
TString queryId,
298304
NYdb::TDriver ydbDriver,
305+
const NYql::IPqGateway::TPtr& pqGateway,
299306
const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
300307
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials
301308
)
@@ -304,6 +311,7 @@ NActors::IActor* MakeReadRuleCreatorActor(
304311
owner,
305312
std::move(queryId),
306313
std::move(ydbDriver),
314+
pqGateway,
307315
topicConsumers,
308316
std::move(credentials)
309317
);

ydb/core/fq/libs/read_rule/read_rule_creator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
#include <ydb-cpp-sdk/client/driver/driver.h>
55

66
#include <ydb/library/actors/core/actor.h>
7+
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
78

89
namespace NFq {
910

1011
NActors::IActor* MakeReadRuleCreatorActor(
1112
NActors::TActorId owner,
1213
TString queryId,
1314
NYdb::TDriver ydbDriver,
15+
const NYql::IPqGateway::TPtr& pqGateway,
1416
const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
1517
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials // For each topic
1618
);

0 commit comments

Comments
 (0)