Skip to content

YQ-4148 support yt parallel reading #15342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ void TKqpComputeActor::DoBootstrap() {
settings.ReadRanges.push_back(readRange);
}

if (FederatedQuerySetup && FederatedQuerySetup->DqTaskTransformFactory) {
execCtx.FuncProvider = FederatedQuerySetup->DqTaskTransformFactory({settings.TaskParams, settings.ReadRanges}, TBase::FunctionRegistry);
}

auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

Expand Down
10 changes: 6 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1017,10 +1017,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
for (ui32 i = 0; i < taskCount; i++) {
auto& task = TasksGraph.AddTask(stageInfo);

auto& input = task.Inputs[stageSource.GetInputIndex()];
input.ConnectionInfo = NYql::NDq::TSourceInput{};
input.SourceSettings = externalSource.GetSettings();
input.SourceType = externalSource.GetType();
if (!externalSource.GetEmbedded()) {
auto& input = task.Inputs[stageSource.GetInputIndex()];
input.ConnectionInfo = NYql::NDq::TSourceInput{};
input.SourceSettings = externalSource.GetSettings();
input.SourceType = externalSource.GetType();
}

if (structuredToken) {
task.Meta.SecureParams.emplace(sourceName, structuredToken);
Expand Down
34 changes: 25 additions & 9 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,30 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew

TStageInfoMeta meta(tx);

for (auto& source : stage.GetSources()) {
if (source.HasReadRangesSource()) {
YQL_ENSURE(source.GetInputIndex() == 0);
YQL_ENSURE(stage.SourcesSize() == 1);
meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable());
meta.TablePath = source.GetReadRangesSource().GetTable().GetPath();
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read);
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
ui64 stageSourcesCount = 0;
for (const auto& source : stage.GetSources()) {
switch (source.GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource: {
YQL_ENSURE(source.GetInputIndex() == 0);
YQL_ENSURE(stage.SourcesSize() == 1);
meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable());
meta.TablePath = source.GetReadRangesSource().GetTable().GetPath();
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read);
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
stageSourcesCount++;
break;
}

case NKqpProto::TKqpSource::kExternalSource: {
if (!source.GetExternalSource().GetEmbedded()) {
stageSourcesCount++;
}
break;
}

default: {
YQL_ENSURE(false, "unknown source type");
}
}
}

Expand Down Expand Up @@ -144,7 +160,7 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
}

bool stageAdded = tasksGraph.AddStageInfo(
TStageInfo(stageId, stage.InputsSize() + stage.SourcesSize(), stage.GetOutputsCount(), std::move(meta)));
TStageInfo(stageId, stage.InputsSize() + stageSourcesCount, stage.GetOutputsCount(), std::move(meta)));
YQL_ENSURE(stageAdded);

auto& stageInfo = tasksGraph.GetStageInfo(stageId);
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
#include "kqp_federated_query_helpers.h"

#include <ydb/library/actors/http/http_proxy.h>

#include <ydb/core/base/counters.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/protos/auth.pb.h>
#include <ydb/core/protos/config.pb.h>

#include <ydb/core/fq/libs/actors/database_resolver.h>
#include <ydb/core/fq/libs/actors/proxy.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
#include <ydb/library/actors/http/http_proxy.h>

#include <yt/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
#include <yt/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>

#include <util/system/file.h>
#include <util/stream/file.h>

#include <ydb/core/protos/auth.pb.h>

namespace NKikimr::NKqp {
NYql::IYtGateway::TPtr MakeYtGateway(const NMiniKQL::IFunctionRegistry* functionRegistry, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) {
NYql::TYtNativeServices ytServices;
Expand Down Expand Up @@ -79,6 +78,7 @@ namespace NKikimr::NKqp {

YtGatewayConfig = queryServiceConfig.GetYt();
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);
DqTaskTransformFactory = NYql::CreateYtDqTaskTransformFactory(true);

// Initialize Token Accessor
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
Expand Down Expand Up @@ -135,7 +135,8 @@ namespace NKikimr::NKqp {
SolomonGatewayConfig,
SolomonGateway,
nullptr,
S3ReadActorFactoryConfig};
S3ReadActorFactoryConfig,
DqTaskTransformFactory};

// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && MdbEndpointGenerator &&
Expand Down
29 changes: 21 additions & 8 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#pragma once

#include <ydb/library/actors/core/actorsystem.h>

#include <ydb/core/base/appdata.h>
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>

#include <yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h>
#include <yql/essentials/minikql/computation/mkql_computation_node.h>

#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>

namespace NKikimrConfig {
Expand All @@ -35,6 +37,7 @@ namespace NKikimr::NKqp {
NYql::ISolomonGateway::TPtr SolomonGateway;
NMiniKQL::TComputationNodeFactory ComputationFactory;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
NYql::TTaskTransformFactory DqTaskTransformFactory;
};

struct IKqpFederatedQuerySetupFactory {
Expand Down Expand Up @@ -73,6 +76,7 @@ namespace NKikimr::NKqp {
std::optional<NActors::TActorId> DatabaseResolverActorId;
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
NYql::TTaskTransformFactory DqTaskTransformFactory;
};

struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
Expand All @@ -89,7 +93,9 @@ namespace NKikimr::NKqp {
NYql::IYtGateway::TPtr ytGateway,
const NYql::TSolomonGatewayConfig& solomonGatewayConfig,
const NYql::ISolomonGateway::TPtr& solomonGateway,
NMiniKQL::TComputationNodeFactory computationFactories)
NMiniKQL::TComputationNodeFactory computationFactory,
const NYql::NDq::TS3ReadActorFactoryConfig& s3ReadActorFactoryConfig,
NYql::TTaskTransformFactory dqTaskTransformFactory)
: HttpGateway(httpGateway)
, ConnectorClient(connectorClient)
, CredentialsFactory(credentialsFactory)
Expand All @@ -100,13 +106,19 @@ namespace NKikimr::NKqp {
, YtGateway(ytGateway)
, SolomonGatewayConfig(solomonGatewayConfig)
, SolomonGateway(solomonGateway)
, ComputationFactories(computationFactories)
, ComputationFactory(computationFactory)
, S3ReadActorFactoryConfig(s3ReadActorFactoryConfig)
, DqTaskTransformFactory(dqTaskTransformFactory)
{
}

std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
return TKqpFederatedQuerySetup{
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, YtGatewayConfig, YtGateway, SolomonGatewayConfig, SolomonGateway, ComputationFactories, S3ReadActorFactoryConfig};
HttpGateway, ConnectorClient, CredentialsFactory,
DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig,
YtGatewayConfig, YtGateway, SolomonGatewayConfig,
SolomonGateway, ComputationFactory, S3ReadActorFactoryConfig,
DqTaskTransformFactory};
}

private:
Expand All @@ -120,8 +132,9 @@ namespace NKikimr::NKqp {
NYql::IYtGateway::TPtr YtGateway;
NYql::TSolomonGatewayConfig SolomonGatewayConfig;
NYql::ISolomonGateway::TPtr SolomonGateway;
NMiniKQL::TComputationNodeFactory ComputationFactories;
NMiniKQL::TComputationNodeFactory ComputationFactory;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
NYql::TTaskTransformFactory DqTaskTransformFactory;
};

IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/federated_query/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ SRCS(

PEERDIR(
ydb/core/base
ydb/core/fq/libs/db_id_async_resolver_impl
ydb/core/fq/libs/grpc
ydb/core/fq/libs/db_id_async_resolver_impl
ydb/library/db_pool/protos
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/generic/connector/libcpp
ydb/library/yql/providers/solomon/gateway
yql/essentials/core/dq_integration/transform
yt/yql/providers/yt/gateway/native
yt/yql/providers/yt/lib/yt_download
yt/yql/providers/yt/mkql_dq
)

YQL_LAST_ABI_VERSION()
Expand Down
77 changes: 53 additions & 24 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
tableOp.MutableReadOlapRange()->SetReadType(NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS);
} else if (auto maybeDqSourceWrapBase = node.Maybe<TDqSourceWrapBase>()) {
stageCost += GetDqSourceWrapBaseCost(maybeDqSourceWrapBase.Cast(), TypesCtx);
} else if (auto maybeDqReadWrapBase = node.Maybe<TDqReadWrapBase>()) {
FillDqRead(maybeDqReadWrapBase.Cast(), stageProto, ctx);
} else {
YQL_ENSURE(!node.Maybe<TKqpReadTable>());
}
Expand Down Expand Up @@ -1079,42 +1081,69 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
if (dataSourceCategory == NYql::KikimrProviderName || dataSourceCategory == NYql::YdbProviderName || dataSourceCategory == NYql::KqpReadRangesSourceName) {
FillKqpSource(source, protoSource, allowSystemColumns, tablesMap);
} else {
// Delegate source filling to dq integration of specific provider
const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory);
YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\"");
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
auto& externalSource = *protoSource->MutableExternalSource();

// Partitioning
TVector<TString> partitionParams;
TString clusterName;
// In runtime, number of tasks with Sources is limited by 2x of node count
// We prepare a lot of partitions and distribute them between these tasks
// Constraint of 1 task per partition is NOT valid anymore
auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
IDqIntegration::TPartitionSettings pSettings;
pSettings.MaxPartitions = maxTasksPerStage;
pSettings.CanFallback = false;
dqIntegration->Partition(source.Ref(), partitionParams, &clusterName, ctx, pSettings);
externalSource.SetTaskParamKey(TString(dataSourceCategory));
for (const TString& partitionParam : partitionParams) {
externalSource.AddPartitionedTaskParams(partitionParam);
}
FillDqInput(source.Ptr(), protoSource, dataSourceCategory, ctx, true);
}
}

if (const auto& secureParams = FindOneSecureParam(source.Ptr(), TypesCtx, "source", SecretNames)) {
void FillDqInput(TExprNode::TPtr source, NKqpProto::TKqpSource* protoSource, TStringBuf dataSourceCategory, TExprContext& ctx, bool isDqSource) {
// Delegate source filling to dq integration of specific provider
const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory);
YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\"");
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
auto& externalSource = *protoSource->MutableExternalSource();

// Partitioning
TVector<TString> partitionParams;
TString clusterName;
// In runtime, number of tasks with Sources is limited by 2x of node count
// We prepare a lot of partitions and distribute them between these tasks
// Constraint of 1 task per partition is NOT valid anymore
auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
IDqIntegration::TPartitionSettings pSettings;
pSettings.MaxPartitions = maxTasksPerStage;
pSettings.CanFallback = false;
pSettings.DataSizePerJob = NYql::TDqSettings::TDefault::DataSizePerJob;
dqIntegration->Partition(*source, partitionParams, &clusterName, ctx, pSettings);
externalSource.SetTaskParamKey(TString(dataSourceCategory));
for (const TString& partitionParam : partitionParams) {
externalSource.AddPartitionedTaskParams(partitionParam);
}

if (isDqSource) {
if (const auto& secureParams = FindOneSecureParam(source, TypesCtx, "source", SecretNames)) {
externalSource.SetSourceName(secureParams->first);
externalSource.SetAuthInfo(secureParams->second);
}

google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType, maxTasksPerStage, ctx);
dqIntegration->FillSourceSettings(*source, settings, sourceType, maxTasksPerStage, ctx);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings type for its dq source node");
} else {
// Source is embedded into stage as lambda
externalSource.SetType(TString(dataSourceCategory));
externalSource.SetEmbedded(true);
}
}

void FillDqRead(const TDqReadWrapBase& readWrapBase, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx) {
for (const auto& flag : readWrapBase.Flags()) {
if (flag.Value() == "Solid") {
return;
}
}

const auto read = readWrapBase.Input();
const ui32 dataSourceChildIndex = 1;
YQL_ENSURE(read.Ref().ChildrenSize() > dataSourceChildIndex);
YQL_ENSURE(read.Ref().Child(dataSourceChildIndex)->IsCallable("DataSource"));

const TStringBuf dataSourceCategory = read.Ref().Child(dataSourceChildIndex)->Child(0)->Content();
FillDqInput(read.Ptr(), stageProto.AddSources(), dataSourceCategory, ctx, false);
}

THashMap<TStringBuf, ui32> CreateColumnToOrder(
const TVector<TStringBuf>& columns,
const TKikimrTableMetadataPtr& tableMeta,
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/session_actor/kqp_worker_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,15 @@ bool CanCacheQuery(const NKqpProto::TKqpPhyQuery& query) {

for (const auto& stage : tx.GetStages()) {
for (const auto& source : stage.GetSources()) {
// S3 provider stores S3 paths to read in AST, so we can't cache such queries
if (source.HasExternalSource() && source.GetExternalSource().GetType() == "S3Source") {
if (!source.HasExternalSource()) {
continue;
}
const auto& externalSourceType = source.GetExternalSource().GetType();

// S3 provider stores S3 paths to read in AST,
// YT provider opens read session during compilation,
// so we can't cache such queries
if (externalSourceType == "S3Source" || externalSourceType == YtProviderName) {
return false;
}
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
nullptr,
appConfig->GetQueryServiceConfig().GetSolomon(),
nullptr,
nullptr,
NYql::NDq::CreateReadActorFactoryConfig(appConfig->GetQueryServiceConfig().GetS3()),
nullptr);

settings
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings));
kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true);

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, {}, nullptr, nullptr, {}});
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, {}, nullptr, nullptr, {}, nullptr});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, nullptr, nullptr, NKikimrConfig::TQueryServiceConfig(), {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem, nullptr);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ message TKqpReadRangesSource {
message TKqpExternalSource {
string Type = 1;
google.protobuf.Any Settings = 2;
bool Embedded = 7;

// Partitioning
string TaskParamKey = 3;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,9 @@ namespace Tests {
Settings->YtGateway ? Settings->YtGateway : NKqp::MakeYtGateway(GetFunctionRegistry(), queryServiceConfig),
queryServiceConfig.GetSolomon(),
Settings->SolomonGateway ? Settings->SolomonGateway : NYql::CreateSolomonGateway(queryServiceConfig.GetSolomon()),
Settings->ComputationFactory
Settings->ComputationFactory,
NYql::NDq::CreateReadActorFactoryConfig(queryServiceConfig.GetS3()),
Settings->DqTaskTransformFactory
);
}

Expand Down
Loading
Loading