Skip to content

Commit acfae2f

Browse files
authored
Merge cb4d5b9 into 42b4fda
2 parents 42b4fda + cb4d5b9 commit acfae2f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+418
-225
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@
169169
#include <ydb/library/folder_service/folder_service.h>
170170
#include <ydb/library/folder_service/proto/config.pb.h>
171171

172+
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
173+
172174
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
173175
#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h>
174176
#include <ydb/library/yql/utils/actor_log/log.h>
@@ -2146,16 +2148,19 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
21462148

21472149
auto federatedQuerySetupFactory = NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config);
21482150

2151+
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
21492152
auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(),
21502153
Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources),
2151-
federatedQuerySetupFactory
2154+
federatedQuerySetupFactory, s3ActorsFactory
21522155
);
21532156
setup->LocalServices.push_back(std::make_pair(
21542157
NKqp::MakeKqpProxyID(NodeId),
21552158
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));
21562159

21572160
// Create finalize script service
2158-
auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory);
2161+
auto finalize = NKqp::CreateKqpFinalizeScriptService(
2162+
Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory, s3ActorsFactory
2163+
);
21592164
setup->LocalServices.push_back(std::make_pair(
21602165
NKqp::MakeKqpFinalizeScriptServiceId(NodeId),
21612166
TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId)));

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ PEERDIR(
138138
ydb/library/yql/providers/yt/codec/codegen
139139
ydb/library/yql/providers/yt/comp_nodes/llvm14
140140
ydb/library/yql/providers/pq/cm_client
141+
ydb/library/yql/providers/s3/actors
141142
ydb/library/yql/public/udf/service/exception_policy
142143
ydb/public/lib/base
143144
ydb/public/lib/deprecated/client

ydb/core/fq/libs/actors/pending_fetcher.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
155155
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
156156
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
157157
const TString& tenantName,
158-
NActors::TMon* monitoring
158+
NActors::TMon* monitoring,
159+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
159160
)
160161
: YqSharedResources(yqSharedResources)
161162
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -177,6 +178,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
177178
, InternalServiceId(MakeInternalServiceActorId())
178179
, Monitoring(monitoring)
179180
, ComputeConfig(config.GetCompute())
181+
, S3ActorsFactory(std::move(s3ActorsFactory))
180182
{
181183
Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver));
182184
}
@@ -441,7 +443,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
441443
task.operation_id(),
442444
computeConnection,
443445
NProtoInterop::CastFromProto(task.result_ttl()),
444-
std::map<TString, Ydb::TypedValue>(task.parameters().begin(), task.parameters().end())
446+
std::map<TString, Ydb::TypedValue>(task.parameters().begin(), task.parameters().end()),
447+
S3ActorsFactory
445448
);
446449

447450
auto runActorId =
@@ -518,6 +521,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
518521
TActorId InternalServiceId;
519522
NActors::TMon* Monitoring;
520523
TComputeConfig ComputeConfig;
524+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
521525
};
522526

523527

@@ -536,7 +540,8 @@ NActors::IActor* CreatePendingFetcher(
536540
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
537541
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
538542
const TString& tenantName,
539-
NActors::TMon* monitoring)
543+
NActors::TMon* monitoring,
544+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory)
540545
{
541546
return new TPendingFetcher(
542547
yqSharedResources,
@@ -553,7 +558,8 @@ NActors::IActor* CreatePendingFetcher(
553558
std::move(pqCmConnections),
554559
clientCounters,
555560
tenantName,
556-
monitoring);
561+
monitoring,
562+
std::move(s3ActorsFactory));
557563
}
558564

559565
TActorId MakePendingFetcherId(ui32 nodeId) {

ydb/core/fq/libs/actors/proxy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
1717
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
1818
#include <ydb/library/yql/providers/pq/cm_client/client.h>
19+
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
1920

2021
#include <ydb/public/lib/fq/scope.h>
2122

@@ -51,7 +52,8 @@ NActors::IActor* CreatePendingFetcher(
5152
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
5253
const ::NMonitoring::TDynamicCounterPtr& clientCounters,
5354
const TString& tenantName,
54-
NActors::TMon* monitoring
55+
NActors::TMon* monitoring,
56+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
5557
);
5658

5759
NActors::IActor* CreateRunActor(

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
3333
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
3434
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
35-
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
3635
#include <ydb/library/yql/providers/s3/proto/sink.pb.h>
3736
#include <ydb/library/yql/sql/settings/translation_settings.h>
3837
#include <ydb/library/yql/minikql/mkql_alloc.h>
@@ -1782,7 +1781,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
17821781
}
17831782
}
17841783

1785-
Register(NYql::NDq::MakeS3ApplicatorActor(SelfId()
1784+
Register(Params.S3ActorsFactory->CreateS3ApplicatorActor(SelfId()
17861785
, Params.S3Gateway
17871786
, Params.QueryId
17881787
, Params.JobId

ydb/core/fq/libs/compute/common/run_actor_params.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ TRunActorParams::TRunActorParams(
5757
const TString& operationId,
5858
const NFq::NConfig::TYdbStorageConfig& computeConnection,
5959
TDuration resultTtl,
60-
std::map<TString, Ydb::TypedValue>&& queryParameters
60+
std::map<TString, Ydb::TypedValue>&& queryParameters,
61+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
6162
)
6263
: YqSharedResources(yqSharedResources)
6364
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -111,6 +112,7 @@ TRunActorParams::TRunActorParams(
111112
, ComputeConnection(computeConnection)
112113
, ResultTtl(resultTtl)
113114
, QueryParameters(std::move(queryParameters))
115+
, S3ActorsFactory(std::move(s3ActorsFactory))
114116
{
115117
}
116118

ydb/core/fq/libs/compute/common/run_actor_params.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h>
1515
#include <ydb/library/yql/providers/pq/cm_client/client.h>
1616
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_gateway.h>
17+
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
1718

1819
#include <ydb/public/lib/fq/scope.h>
1920

@@ -75,7 +76,8 @@ struct TRunActorParams { // TODO2 : Change name
7576
const TString& operationId,
7677
const ::NFq::NConfig::TYdbStorageConfig& computeConnection,
7778
TDuration resultTtl,
78-
std::map<TString, Ydb::TypedValue>&& queryParameters
79+
std::map<TString, Ydb::TypedValue>&& queryParameters,
80+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
7981
);
8082

8183
TRunActorParams(const TRunActorParams& params) = default;
@@ -138,6 +140,7 @@ struct TRunActorParams { // TODO2 : Change name
138140
::NFq::NConfig::TYdbStorageConfig ComputeConnection;
139141
TDuration ResultTtl;
140142
std::map<TString, Ydb::TypedValue> QueryParameters;
143+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
141144
};
142145

143146
} /* NFq */

ydb/core/fq/libs/compute/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ PEERDIR(
1414
ydb/core/fq/libs/grpc
1515
ydb/core/fq/libs/shared_resources
1616
ydb/library/yql/public/issue
17+
ydb/library/yql/providers/common/http_gateway
1718
ydb/library/yql/providers/dq/provider
1819
ydb/library/yql/providers/generic/connector/api/service/protos
1920
ydb/library/yql/providers/generic/connector/libcpp

ydb/core/fq/libs/init/init.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@
3939
#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h>
4040
#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
4141
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
42-
#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
43-
#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
42+
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
4443
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
4544
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
4645
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
@@ -194,6 +193,8 @@ void Init(
194193
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize());
195194
}
196195

196+
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
197+
197198
if (protoConfig.GetPrivateApi().GetEnabled()) {
198199
const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
199200
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
@@ -224,12 +225,13 @@ void Init(
224225
RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
225226
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
226227
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
227-
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
228+
229+
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
228230
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
229-
RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
231+
s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
230232
httpGateway, s3HttpRetryPolicy);
231-
RegisterGenericProviderFactories(*asyncIoFactory, credentialsFactory, connectorClient);
232233

234+
RegisterGenericProviderFactories(*asyncIoFactory, credentialsFactory, connectorClient);
233235
RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSinkTracker"));
234236
RegisterDQSolomonWriteActorFactory(*asyncIoFactory, credentialsFactory);
235237
}
@@ -339,7 +341,8 @@ void Init(
339341
std::move(pqCmConnections),
340342
clientCounters,
341343
tenant,
342-
appData->Mon
344+
appData->Mon,
345+
s3ActorsFactory
343346
);
344347

345348
actorRegistrator(MakePendingFetcherId(nodeId), fetcher);

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
#include <ydb/core/kqp/runtime/kqp_sequencer_factory.h>
1111
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
1212
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
13-
#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
14-
#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
1513
#include <ydb/core/formats/arrow/protos/ssa.pb.h>
1614
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
1715

@@ -69,7 +67,9 @@ namespace NKqp {
6967

7068
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
7169
TIntrusivePtr<TKqpCounters> counters,
72-
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup) {
70+
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
71+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
72+
) {
7373
auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
7474
RegisterStreamLookupActorFactory(*factory, counters);
7575
RegisterKqpReadActor(*factory, counters);
@@ -78,8 +78,8 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
7878

7979
if (federatedQuerySetup) {
8080
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
81-
RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
82-
RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
81+
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
82+
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
8383

8484
if (federatedQuerySetup->ConnectorClient) {
8585
RegisterGenericProviderFactories(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->ConnectorClient);

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/scheme/scheme_tabledefs.h>
77
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
88
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
9+
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
910

1011
namespace NKikimr {
1112

@@ -59,7 +60,9 @@ IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vect
5960
const ui64 txId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
6061

6162
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
62-
TIntrusivePtr<TKqpCounters> counters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup);
63+
TIntrusivePtr<TKqpCounters> counters,
64+
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
65+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory);
6366

6467
} // namespace NKqp
6568
} // namespace NKikimr

ydb/core/kqp/compute_actor/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ PEERDIR(
2424
ydb/core/formats/arrow/protos
2525
ydb/library/yql/dq/actors/compute
2626
ydb/library/yql/providers/generic/actors
27-
ydb/library/yql/providers/s3/actors
2827
ydb/library/yql/public/issue
2928
)
3029

ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
2323
TScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
2424
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
2525
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
26-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
26+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
27+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactor)
2728
: ReplyActor(request->Sender)
2829
, ExecutionId(request->Get()->Description.ExecutionId)
2930
, Database(request->Get()->Description.Database)
@@ -33,6 +34,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
3334
, MaximalSecretsSnapshotWaitTime(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
3435
, FederatedQuerySetup(federatedQuerySetup)
3536
, Compressor(queryServiceConfig.GetQueryArtifactsCompressionMethod(), queryServiceConfig.GetQueryArtifactsCompressionMinSize())
37+
, S3ActorsFactor(std::move(s3ActorsFactor))
3638
{}
3739

3840
void CompressScriptArtifacts() const {
@@ -166,7 +168,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
166168
return;
167169
}
168170

169-
Register(NYql::NDq::MakeS3ApplicatorActor(
171+
Register(S3ActorsFactor->CreateS3ApplicatorActor(
170172
SelfId(),
171173
FederatedQuerySetup->HttpGateway,
172174
CreateGuidAsString(),
@@ -230,6 +232,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
230232
const TDuration MaximalSecretsSnapshotWaitTime;
231233
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
232234
const NFq::TCompressor Compressor;
235+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactor;
233236

234237
TString CustomerSuppliedId;
235238
std::vector<NKqpProto::TKqpExternalSink> Sinks;
@@ -244,8 +247,10 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
244247
IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
245248
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
246249
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
247-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup) {
248-
return new TScriptFinalizerActor(std::move(request), queryServiceConfig, metadataProviderConfig, federatedQuerySetup);
250+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
251+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory
252+
) {
253+
return new TScriptFinalizerActor(std::move(request), queryServiceConfig, metadataProviderConfig, federatedQuerySetup, std::move(s3ActorsFactory));
249254
}
250255

251256
} // namespace NKikimr::NKqp

ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
#include <ydb/core/kqp/common/events/script_executions.h>
44
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
5+
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
56

67

78
namespace NKikimr::NKqp {
89

910
IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
1011
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
1112
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
12-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
13+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
14+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory);
1315

1416
} // namespace NKikimr::NKqp

ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
1414
public:
1515
TKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
1616
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
17-
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory)
17+
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory,
18+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory)
1819
: QueryServiceConfig(queryServiceConfig)
1920
, MetadataProviderConfig(metadataProviderConfig)
2021
, FederatedQuerySetupFactory(federatedQuerySetupFactory)
22+
, S3ActorsFactory(std::move(s3ActorsFactory))
2123
{}
2224

2325
void Bootstrap(const TActorContext &ctx) {
@@ -81,7 +83,8 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
8183
std::move(request),
8284
QueryServiceConfig,
8385
MetadataProviderConfig,
84-
FederatedQuerySetup
86+
FederatedQuerySetup,
87+
S3ActorsFactory
8588
));
8689
}
8790

@@ -131,14 +134,17 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
131134
ui32 FinalizationRequestsInFlight = 0;
132135
std::queue<TString> WaitingFinalizationExecutions;
133136
std::unordered_map<TString, std::vector<TEvScriptFinalizeRequest::TPtr>> FinalizationRequestsQueue;
137+
138+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
134139
};
135140

136141
} // anonymous namespace
137142

138143
IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
139144
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
140-
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory) {
141-
return new TKqpFinalizeScriptService(queryServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory));
145+
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory,
146+
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory) {
147+
return new TKqpFinalizeScriptService(queryServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory), std::move(s3ActorsFactory));
142148
}
143149

144150
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)