Skip to content

Commit 1237379

Browse files
authored
YQ-4148 support yt parallel reading (#15342)
1 parent a790a6d commit 1237379

File tree

17 files changed

+143
-57
lines changed

17 files changed

+143
-57
lines changed

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ void TKqpComputeActor::DoBootstrap() {
7676
settings.ReadRanges.push_back(readRange);
7777
}
7878

79+
if (FederatedQuerySetup && FederatedQuerySetup->DqTaskTransformFactory) {
80+
execCtx.FuncProvider = FederatedQuerySetup->DqTaskTransformFactory({settings.TaskParams, settings.ReadRanges}, TBase::FunctionRegistry);
81+
}
82+
7983
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
8084
SetTaskRunner(taskRunner);
8185

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,10 +1024,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
10241024
for (ui32 i = 0; i < taskCount; i++) {
10251025
auto& task = TasksGraph.AddTask(stageInfo);
10261026

1027-
auto& input = task.Inputs[stageSource.GetInputIndex()];
1028-
input.ConnectionInfo = NYql::NDq::TSourceInput{};
1029-
input.SourceSettings = externalSource.GetSettings();
1030-
input.SourceType = externalSource.GetType();
1027+
if (!externalSource.GetEmbedded()) {
1028+
auto& input = task.Inputs[stageSource.GetInputIndex()];
1029+
input.ConnectionInfo = NYql::NDq::TSourceInput{};
1030+
input.SourceSettings = externalSource.GetSettings();
1031+
input.SourceType = externalSource.GetType();
1032+
}
10311033

10321034
if (structuredToken) {
10331035
task.Meta.SecureParams.emplace(sourceName, structuredToken);

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,30 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
9999

100100
TStageInfoMeta meta(tx);
101101

102-
for (auto& source : stage.GetSources()) {
103-
if (source.HasReadRangesSource()) {
104-
YQL_ENSURE(source.GetInputIndex() == 0);
105-
YQL_ENSURE(stage.SourcesSize() == 1);
106-
meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable());
107-
meta.TablePath = source.GetReadRangesSource().GetTable().GetPath();
108-
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read);
109-
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
102+
ui64 stageSourcesCount = 0;
103+
for (const auto& source : stage.GetSources()) {
104+
switch (source.GetTypeCase()) {
105+
case NKqpProto::TKqpSource::kReadRangesSource: {
106+
YQL_ENSURE(source.GetInputIndex() == 0);
107+
YQL_ENSURE(stage.SourcesSize() == 1);
108+
meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable());
109+
meta.TablePath = source.GetReadRangesSource().GetTable().GetPath();
110+
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read);
111+
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
112+
stageSourcesCount++;
113+
break;
114+
}
115+
116+
case NKqpProto::TKqpSource::kExternalSource: {
117+
if (!source.GetExternalSource().GetEmbedded()) {
118+
stageSourcesCount++;
119+
}
120+
break;
121+
}
122+
123+
default: {
124+
YQL_ENSURE(false, "unknown source type");
125+
}
110126
}
111127
}
112128

@@ -144,7 +160,7 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
144160
}
145161

146162
bool stageAdded = tasksGraph.AddStageInfo(
147-
TStageInfo(stageId, stage.InputsSize() + stage.SourcesSize(), stage.GetOutputsCount(), std::move(meta)));
163+
TStageInfo(stageId, stage.InputsSize() + stageSourcesCount, stage.GetOutputsCount(), std::move(meta)));
148164
YQL_ENSURE(stageAdded);
149165

150166
auto& stageInfo = tasksGraph.GetStageInfo(stageId);

ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
11
#include "kqp_federated_query_helpers.h"
22

3-
#include <ydb/library/actors/http/http_proxy.h>
4-
53
#include <ydb/core/base/counters.h>
64
#include <ydb/core/base/feature_flags.h>
5+
#include <ydb/core/protos/auth.pb.h>
76
#include <ydb/core/protos/config.pb.h>
87

98
#include <ydb/core/fq/libs/actors/database_resolver.h>
109
#include <ydb/core/fq/libs/actors/proxy.h>
1110
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
1211
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
12+
#include <ydb/library/actors/http/http_proxy.h>
1313

1414
#include <yt/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
1515
#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
1616
#include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
17+
#include <yt/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
1718

1819
#include <util/system/file.h>
1920
#include <util/stream/file.h>
2021

21-
#include <ydb/core/protos/auth.pb.h>
22-
2322
namespace NKikimr::NKqp {
2423
NYql::IYtGateway::TPtr MakeYtGateway(const NMiniKQL::IFunctionRegistry* functionRegistry, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) {
2524
NYql::TYtNativeServices ytServices;
@@ -79,6 +78,7 @@ namespace NKikimr::NKqp {
7978

8079
YtGatewayConfig = queryServiceConfig.GetYt();
8180
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);
81+
DqTaskTransformFactory = NYql::CreateYtDqTaskTransformFactory(true);
8282

8383
// Initialize Token Accessor
8484
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
@@ -135,7 +135,8 @@ namespace NKikimr::NKqp {
135135
SolomonGatewayConfig,
136136
SolomonGateway,
137137
nullptr,
138-
S3ReadActorFactoryConfig};
138+
S3ReadActorFactoryConfig,
139+
DqTaskTransformFactory};
139140

140141
// Init DatabaseAsyncResolver only if all requirements are met
141142
if (DatabaseResolverActorId && MdbEndpointGenerator &&

ydb/core/kqp/federated_query/kqp_federated_query_helpers.h

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
#pragma once
22

3-
#include <ydb/library/actors/core/actorsystem.h>
4-
53
#include <ydb/core/base/appdata.h>
6-
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
7-
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
4+
#include <ydb/library/actors/core/actorsystem.h>
85
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
96
#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
107
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
118
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
129
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
1310
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
11+
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
12+
13+
#include <yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h>
14+
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
15+
1416
#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
1517

1618
namespace NKikimrConfig {
@@ -35,6 +37,7 @@ namespace NKikimr::NKqp {
3537
NYql::ISolomonGateway::TPtr SolomonGateway;
3638
NMiniKQL::TComputationNodeFactory ComputationFactory;
3739
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
40+
NYql::TTaskTransformFactory DqTaskTransformFactory;
3841
};
3942

4043
struct IKqpFederatedQuerySetupFactory {
@@ -73,6 +76,7 @@ namespace NKikimr::NKqp {
7376
std::optional<NActors::TActorId> DatabaseResolverActorId;
7477
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
7578
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
79+
NYql::TTaskTransformFactory DqTaskTransformFactory;
7680
};
7781

7882
struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
@@ -89,7 +93,9 @@ namespace NKikimr::NKqp {
8993
NYql::IYtGateway::TPtr ytGateway,
9094
const NYql::TSolomonGatewayConfig& solomonGatewayConfig,
9195
const NYql::ISolomonGateway::TPtr& solomonGateway,
92-
NMiniKQL::TComputationNodeFactory computationFactories)
96+
NMiniKQL::TComputationNodeFactory computationFactory,
97+
const NYql::NDq::TS3ReadActorFactoryConfig& s3ReadActorFactoryConfig,
98+
NYql::TTaskTransformFactory dqTaskTransformFactory)
9399
: HttpGateway(httpGateway)
94100
, ConnectorClient(connectorClient)
95101
, CredentialsFactory(credentialsFactory)
@@ -100,13 +106,19 @@ namespace NKikimr::NKqp {
100106
, YtGateway(ytGateway)
101107
, SolomonGatewayConfig(solomonGatewayConfig)
102108
, SolomonGateway(solomonGateway)
103-
, ComputationFactories(computationFactories)
109+
, ComputationFactory(computationFactory)
110+
, S3ReadActorFactoryConfig(s3ReadActorFactoryConfig)
111+
, DqTaskTransformFactory(dqTaskTransformFactory)
104112
{
105113
}
106114

107115
std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
108116
return TKqpFederatedQuerySetup{
109-
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, YtGatewayConfig, YtGateway, SolomonGatewayConfig, SolomonGateway, ComputationFactories, S3ReadActorFactoryConfig};
117+
HttpGateway, ConnectorClient, CredentialsFactory,
118+
DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig,
119+
YtGatewayConfig, YtGateway, SolomonGatewayConfig,
120+
SolomonGateway, ComputationFactory, S3ReadActorFactoryConfig,
121+
DqTaskTransformFactory};
110122
}
111123

112124
private:
@@ -120,8 +132,9 @@ namespace NKikimr::NKqp {
120132
NYql::IYtGateway::TPtr YtGateway;
121133
NYql::TSolomonGatewayConfig SolomonGatewayConfig;
122134
NYql::ISolomonGateway::TPtr SolomonGateway;
123-
NMiniKQL::TComputationNodeFactory ComputationFactories;
135+
NMiniKQL::TComputationNodeFactory ComputationFactory;
124136
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
137+
NYql::TTaskTransformFactory DqTaskTransformFactory;
125138
};
126139

127140
IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(

ydb/core/kqp/federated_query/ya.make

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ SRCS(
77

88
PEERDIR(
99
ydb/core/base
10-
ydb/core/fq/libs/db_id_async_resolver_impl
1110
ydb/core/fq/libs/grpc
11+
ydb/core/fq/libs/db_id_async_resolver_impl
1212
ydb/library/db_pool/protos
1313
ydb/library/yql/providers/common/http_gateway
1414
ydb/library/yql/providers/generic/connector/libcpp
1515
ydb/library/yql/providers/solomon/gateway
16+
yql/essentials/core/dq_integration/transform
1617
yt/yql/providers/yt/gateway/native
1718
yt/yql/providers/yt/lib/yt_download
19+
yt/yql/providers/yt/mkql_dq
1820
)
1921

2022
YQL_LAST_ABI_VERSION()

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
806806
tableOp.MutableReadOlapRange()->SetReadType(NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS);
807807
} else if (auto maybeDqSourceWrapBase = node.Maybe<TDqSourceWrapBase>()) {
808808
stageCost += GetDqSourceWrapBaseCost(maybeDqSourceWrapBase.Cast(), TypesCtx);
809+
} else if (auto maybeDqReadWrapBase = node.Maybe<TDqReadWrapBase>()) {
810+
FillDqRead(maybeDqReadWrapBase.Cast(), stageProto, ctx);
809811
} else {
810812
YQL_ENSURE(!node.Maybe<TKqpReadTable>());
811813
}
@@ -1079,42 +1081,69 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10791081
if (dataSourceCategory == NYql::KikimrProviderName || dataSourceCategory == NYql::YdbProviderName || dataSourceCategory == NYql::KqpReadRangesSourceName) {
10801082
FillKqpSource(source, protoSource, allowSystemColumns, tablesMap);
10811083
} else {
1082-
// Delegate source filling to dq integration of specific provider
1083-
const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory);
1084-
YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\"");
1085-
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
1086-
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
1087-
auto& externalSource = *protoSource->MutableExternalSource();
1088-
1089-
// Partitioning
1090-
TVector<TString> partitionParams;
1091-
TString clusterName;
1092-
// In runtime, number of tasks with Sources is limited by 2x of node count
1093-
// We prepare a lot of partitions and distribute them between these tasks
1094-
// Constraint of 1 task per partition is NOT valid anymore
1095-
auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
1096-
IDqIntegration::TPartitionSettings pSettings;
1097-
pSettings.MaxPartitions = maxTasksPerStage;
1098-
pSettings.CanFallback = false;
1099-
dqIntegration->Partition(source.Ref(), partitionParams, &clusterName, ctx, pSettings);
1100-
externalSource.SetTaskParamKey(TString(dataSourceCategory));
1101-
for (const TString& partitionParam : partitionParams) {
1102-
externalSource.AddPartitionedTaskParams(partitionParam);
1103-
}
1084+
FillDqInput(source.Ptr(), protoSource, dataSourceCategory, ctx, true);
1085+
}
1086+
}
11041087

1105-
if (const auto& secureParams = FindOneSecureParam(source.Ptr(), TypesCtx, "source", SecretNames)) {
1088+
void FillDqInput(TExprNode::TPtr source, NKqpProto::TKqpSource* protoSource, TStringBuf dataSourceCategory, TExprContext& ctx, bool isDqSource) {
1089+
// Delegate source filling to dq integration of specific provider
1090+
const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory);
1091+
YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\"");
1092+
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
1093+
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
1094+
auto& externalSource = *protoSource->MutableExternalSource();
1095+
1096+
// Partitioning
1097+
TVector<TString> partitionParams;
1098+
TString clusterName;
1099+
// In runtime, number of tasks with Sources is limited by 2x of node count
1100+
// We prepare a lot of partitions and distribute them between these tasks
1101+
// Constraint of 1 task per partition is NOT valid anymore
1102+
auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
1103+
IDqIntegration::TPartitionSettings pSettings;
1104+
pSettings.MaxPartitions = maxTasksPerStage;
1105+
pSettings.CanFallback = false;
1106+
pSettings.DataSizePerJob = NYql::TDqSettings::TDefault::DataSizePerJob;
1107+
dqIntegration->Partition(*source, partitionParams, &clusterName, ctx, pSettings);
1108+
externalSource.SetTaskParamKey(TString(dataSourceCategory));
1109+
for (const TString& partitionParam : partitionParams) {
1110+
externalSource.AddPartitionedTaskParams(partitionParam);
1111+
}
1112+
1113+
if (isDqSource) {
1114+
if (const auto& secureParams = FindOneSecureParam(source, TypesCtx, "source", SecretNames)) {
11061115
externalSource.SetSourceName(secureParams->first);
11071116
externalSource.SetAuthInfo(secureParams->second);
11081117
}
11091118

11101119
google::protobuf::Any& settings = *externalSource.MutableSettings();
11111120
TString& sourceType = *externalSource.MutableType();
1112-
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType, maxTasksPerStage, ctx);
1121+
dqIntegration->FillSourceSettings(*source, settings, sourceType, maxTasksPerStage, ctx);
11131122
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings for its dq source node");
11141123
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings type for its dq source node");
1124+
} else {
1125+
// Source is embedded into stage as lambda
1126+
externalSource.SetType(TString(dataSourceCategory));
1127+
externalSource.SetEmbedded(true);
11151128
}
11161129
}
11171130

1131+
void FillDqRead(const TDqReadWrapBase& readWrapBase, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx) {
1132+
for (const auto& flag : readWrapBase.Flags()) {
1133+
if (flag.Value() == "Solid") {
1134+
return;
1135+
}
1136+
}
1137+
1138+
const auto read = readWrapBase.Input();
1139+
const ui32 dataSourceChildIndex = 1;
1140+
YQL_ENSURE(read.Ref().ChildrenSize() > dataSourceChildIndex);
1141+
YQL_ENSURE(read.Ref().Child(dataSourceChildIndex)->IsCallable("DataSource"));
1142+
1143+
const TStringBuf dataSourceCategory = read.Ref().Child(dataSourceChildIndex)->Child(0)->Content();
1144+
FillDqInput(read.Ptr(), stageProto.AddSources(), dataSourceCategory, ctx, false);
1145+
}
1146+
11181147
THashMap<TStringBuf, ui32> CreateColumnToOrder(
11191148
const TVector<TStringBuf>& columns,
11201149
const TKikimrTableMetadataPtr& tableMeta,

ydb/core/kqp/session_actor/kqp_worker_common.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,15 @@ bool CanCacheQuery(const NKqpProto::TKqpPhyQuery& query) {
181181

182182
for (const auto& stage : tx.GetStages()) {
183183
for (const auto& source : stage.GetSources()) {
184-
// S3 provider stores S3 paths to read in AST, so we can't cache such queries
185-
if (source.HasExternalSource() && source.GetExternalSource().GetType() == "S3Source") {
184+
if (!source.HasExternalSource()) {
185+
continue;
186+
}
187+
const auto& externalSourceType = source.GetExternalSource().GetType();
188+
189+
// S3 provider stores S3 paths to read in AST,
190+
// YT provider opens read session during compilation,
191+
// so we can't cache such queries
192+
if (externalSourceType == "S3Source" || externalSourceType == YtProviderName) {
186193
return false;
187194
}
188195
}

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
6060
nullptr,
6161
appConfig->GetQueryServiceConfig().GetSolomon(),
6262
nullptr,
63+
nullptr,
64+
NYql::NDq::CreateReadActorFactoryConfig(appConfig->GetQueryServiceConfig().GetS3()),
6365
nullptr);
6466

6567
settings

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
5555
UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings));
5656
kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true);
5757

58-
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, {}, nullptr, nullptr, {}});
58+
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, {}, nullptr, nullptr, {}, nullptr});
5959
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
6060
federatedQuerySetup, nullptr, nullptr, NKikimrConfig::TQueryServiceConfig(), {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem, nullptr);
6161
}

ydb/core/protos/kqp_physical.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ message TKqpReadRangesSource {
351351
message TKqpExternalSource {
352352
string Type = 1;
353353
google.protobuf.Any Settings = 2;
354+
bool Embedded = 7;
354355

355356
// Partitioning
356357
string TaskParamKey = 3;

ydb/core/testlib/test_client.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1253,7 +1253,9 @@ namespace Tests {
12531253
Settings->YtGateway ? Settings->YtGateway : NKqp::MakeYtGateway(GetFunctionRegistry(), queryServiceConfig),
12541254
queryServiceConfig.GetSolomon(),
12551255
Settings->SolomonGateway ? Settings->SolomonGateway : NYql::CreateSolomonGateway(queryServiceConfig.GetSolomon()),
1256-
Settings->ComputationFactory
1256+
Settings->ComputationFactory,
1257+
NYql::NDq::CreateReadActorFactoryConfig(queryServiceConfig.GetS3()),
1258+
Settings->DqTaskTransformFactory
12571259
);
12581260
}
12591261

0 commit comments

Comments
 (0)