From d19338a0dc1c1306977cc94c3af06af8666e9d3d Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 13 Feb 2025 19:16:52 +0000 Subject: [PATCH 1/4] Supported sources in FQ run --- ydb/core/fq/libs/actors/pending_fetcher.cpp | 14 +- ydb/core/fq/libs/actors/proxy.h | 4 +- ydb/core/fq/libs/actors/run_actor.cpp | 2 +- .../libs/compute/common/run_actor_params.cpp | 4 +- .../fq/libs/compute/common/run_actor_params.h | 5 +- ydb/core/fq/libs/init/init.cpp | 10 +- ydb/core/fq/libs/init/init.h | 4 +- .../pq/gateway/dummy/yql_pq_dummy_gateway.cpp | 2 +- ydb/tests/tools/fqrun/.gitignore | 3 + ydb/tests/tools/fqrun/README.md | 35 ++++ .../tools/fqrun/configuration/fq_config.conf | 159 +++++++++++++++++- .../tools/fqrun/{fqprun.cpp => fqrun.cpp} | 112 ++++++++++++ ydb/tests/tools/fqrun/src/common.cpp | 11 ++ ydb/tests/tools/fqrun/src/common.h | 18 ++ ydb/tests/tools/fqrun/src/fq_runner.cpp | 85 ++++++++-- ydb/tests/tools/fqrun/src/fq_runner.h | 4 + ydb/tests/tools/fqrun/src/fq_setup.cpp | 66 +++++--- ydb/tests/tools/fqrun/src/fq_setup.h | 4 + ydb/tests/tools/fqrun/src/ya.make | 2 + ydb/tests/tools/fqrun/ya.make | 10 +- ydb/tests/tools/kqprun/README.md | 62 +++++++ ydb/tests/tools/kqprun/kqprun.cpp | 12 +- ydb/tests/tools/kqprun/runlib/application.h | 2 +- ydb/tests/tools/kqprun/src/kqp_runner.cpp | 8 +- 24 files changed, 582 insertions(+), 56 deletions(-) create mode 100644 ydb/tests/tools/fqrun/README.md rename ydb/tests/tools/fqrun/{fqprun.cpp => fqrun.cpp} (50%) create mode 100644 ydb/tests/tools/fqrun/src/common.cpp create mode 100644 ydb/tests/tools/kqprun/README.md diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index e9016c1806ef..3f2db94b8351 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -157,7 +157,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped { const ::NMonitoring::TDynamicCounterPtr& clientCounters, const TString& tenantName, NActors::TMon* monitoring, - std::shared_ptr s3ActorsFactory + std::shared_ptr s3ActorsFactory, + NYql::IPqGateway::TPtr defaultPqGateway ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) @@ -180,6 +181,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped { , Monitoring(monitoring) , ComputeConfig(config.GetCompute()) , S3ActorsFactory(std::move(s3ActorsFactory)) + , DefaultPqGateway(std::move(defaultPqGateway)) { Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver)); } @@ -472,7 +474,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped { NProtoInterop::CastFromProto(task.result_ttl()), std::map(task.parameters().begin(), task.parameters().end()), S3ActorsFactory, - ComputeConfig.GetWorkloadManagerConfig(task.scope()) + ComputeConfig.GetWorkloadManagerConfig(task.scope()), + DefaultPqGateway ); auto runActorId = @@ -548,6 +551,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped { NActors::TMon* Monitoring; TComputeConfig ComputeConfig; std::shared_ptr S3ActorsFactory; + NYql::IPqGateway::TPtr DefaultPqGateway; }; @@ -567,7 +571,8 @@ NActors::IActor* CreatePendingFetcher( const ::NMonitoring::TDynamicCounterPtr& clientCounters, const TString& tenantName, NActors::TMon* monitoring, - std::shared_ptr s3ActorsFactory) + std::shared_ptr s3ActorsFactory, + NYql::IPqGateway::TPtr defaultPqGateway) { return new TPendingFetcher( yqSharedResources, @@ -585,7 +590,8 @@ NActors::IActor* CreatePendingFetcher( clientCounters, tenantName, monitoring, - std::move(s3ActorsFactory)); + std::move(s3ActorsFactory), + defaultPqGateway); } TActorId MakePendingFetcherId(ui32 nodeId) { diff --git a/ydb/core/fq/libs/actors/proxy.h b/ydb/core/fq/libs/actors/proxy.h index 92cc9dd13faa..9f457859189b 100644 --- a/ydb/core/fq/libs/actors/proxy.h +++ b/ydb/core/fq/libs/actors/proxy.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -53,7 +54,8 @@ NActors::IActor* CreatePendingFetcher( const ::NMonitoring::TDynamicCounterPtr& clientCounters, const TString& tenantName, NActors::TMon* monitoring, - std::shared_ptr s3ActorsFactory + std::shared_ptr s3ActorsFactory, + NYql::IPqGateway::TPtr defaultPqGateway ); NActors::IActor* CreateRunActor( diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 111bf5e6f65b..914be2cfb8a7 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -1977,7 +1977,7 @@ class TRunActor : public NActors::TActorBootstrapped { std::make_shared(gatewaysConfig.GetPq()), Params.FunctionRegistry ); - const auto pqGateway = NYql::CreatePqNativeGateway(pqServices); + const auto pqGateway = Params.DefaultPqGateway ? Params.DefaultPqGateway : NYql::CreatePqNativeGateway(pqServices); dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver)); } diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp index 055e49bf4e18..df7a8a3d6b37 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp +++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp @@ -60,7 +60,8 @@ TRunActorParams::TRunActorParams( TDuration resultTtl, std::map&& queryParameters, std::shared_ptr s3ActorsFactory, - const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager + const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager, + NYql::IPqGateway::TPtr defaultPqGateway ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) @@ -117,6 +118,7 @@ TRunActorParams::TRunActorParams( , QueryParameters(std::move(queryParameters)) , S3ActorsFactory(std::move(s3ActorsFactory)) , WorkloadManager(workloadManager) + , DefaultPqGateway(defaultPqGateway) { } diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h index 7ae4eda044ff..1ff1db905cd5 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.h +++ b/ydb/core/fq/libs/compute/common/run_actor_params.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -79,7 +80,8 @@ struct TRunActorParams { // TODO2 : Change name TDuration resultTtl, std::map&& queryParameters, std::shared_ptr s3ActorsFactory, - const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager + const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager, + NYql::IPqGateway::TPtr defaultPqGateway ); TRunActorParams(const TRunActorParams& params) = default; @@ -145,6 +147,7 @@ struct TRunActorParams { // TODO2 : Change name std::map QueryParameters; std::shared_ptr S3ActorsFactory; ::NFq::NConfig::TWorkloadManagerConfig WorkloadManager; + NYql::IPqGateway::TPtr DefaultPqGateway; }; } /* NFq */ diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index edca3f733212..3bdc089b3677 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -66,7 +66,8 @@ void Init( const IYqSharedResources::TPtr& iyqSharedResources, const std::function& folderServiceFactory, ui32 icPort, - const std::vector& additionalCompNodeFactories + const std::vector& additionalCompNodeFactories, + NYql::IPqGateway::TPtr defaultPqGateway ) { Y_ABORT_UNLESS(iyqSharedResources, "No YQ shared resources created"); @@ -204,7 +205,7 @@ void Init( credentialsFactory, tenant, yqCounters->GetSubgroup("subsystem", "row_dispatcher"), - CreatePqNativeGateway(pqServices), + defaultPqGateway ? defaultPqGateway : CreatePqNativeGateway(pqServices), appData->Mon, appData->Counters); actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release()); @@ -225,7 +226,7 @@ void Init( std::make_shared(protoConfig.GetGateways().GetPq()), appData->FunctionRegistry ); - auto pqGateway = NYql::CreatePqNativeGateway(std::move(pqServices)); + auto pqGateway = defaultPqGateway ? defaultPqGateway : NYql::CreatePqNativeGateway(std::move(pqServices)); RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod()); @@ -345,7 +346,8 @@ void Init( clientCounters, tenant, appData->Mon, - s3ActorsFactory + s3ActorsFactory, + defaultPqGateway ); actorRegistrator(MakePendingFetcherId(nodeId), fetcher); diff --git a/ydb/core/fq/libs/init/init.h b/ydb/core/fq/libs/init/init.h index 6dcf35afd6d1..517557b04d8d 100644 --- a/ydb/core/fq/libs/init/init.h +++ b/ydb/core/fq/libs/init/init.h @@ -12,6 +12,7 @@ #include #include +#include #include @@ -36,7 +37,8 @@ void Init( const IYqSharedResources::TPtr& yqSharedResources, const std::function& folderServiceFactory, ui32 icPort, - const std::vector& additionalCompNodeFactories + const std::vector& additionalCompNodeFactories, + NYql::IPqGateway::TPtr defaultPqGateway = nullptr ); } // NFq diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp index f1575567e468..59add76e5ca8 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp @@ -11,7 +11,7 @@ namespace NYql { NThreading::TFuture TDummyPqGateway::OpenSession(const TString& sessionId, const TString& username) { with_lock (Mutex) { Y_ENSURE(sessionId); - Y_ENSURE(username); + Y_UNUSED(username); Y_ENSURE(!IsIn(OpenedSessions, sessionId), "Session " << sessionId << " is already opened in pq gateway"); OpenedSessions.insert(sessionId); diff --git a/ydb/tests/tools/fqrun/.gitignore b/ydb/tests/tools/fqrun/.gitignore index 51aaf6608d57..99dcb6bb049f 100644 --- a/ydb/tests/tools/fqrun/.gitignore +++ b/ydb/tests/tools/fqrun/.gitignore @@ -2,3 +2,6 @@ sync_dir *.log *.sql +*.conf +*.parquet +*.json diff --git a/ydb/tests/tools/fqrun/README.md b/ydb/tests/tools/fqrun/README.md new file mode 100644 index 000000000000..cd133168dcd4 --- /dev/null +++ b/ydb/tests/tools/fqrun/README.md @@ -0,0 +1,35 @@ +# FQ run tool + +Tool can be used to execute streaming queries by using FQ proxy infrastructure. + +## Examples + +### Queries + +* Run select 42: + ```(bash) + ./fqrun -s "SELECT 42" + ``` + +### Logs + +* Setup log settings: + ```(bash) + ./fqrun -s "SELECT 42" --log-default=warn --log FQ_RUN_ACTOR=trace --log-file query.log + ``` + +### Cluster + +* Embedded UI: + ```(bash) + ./fqrun -M 32000 + ``` + + Monitoring endpoint: https://localhost:32000 + +* gRPC endpoint: + ```(bash) + ./fqrun -G 32000 + ``` + + Connect with ydb CLI: `ydb -e grpc://localhost:32000 -d /Root` diff --git a/ydb/tests/tools/fqrun/configuration/fq_config.conf b/ydb/tests/tools/fqrun/configuration/fq_config.conf index 8a980fa5daec..0a6584bd0d13 100644 --- a/ydb/tests/tools/fqrun/configuration/fq_config.conf +++ b/ydb/tests/tools/fqrun/configuration/fq_config.conf @@ -14,14 +14,14 @@ CheckpointCoordinator { } Common { + YdbMvpCloudEndpoint: "https://ydbc.ydb.cloud.yandex.net:8789/ydbc/cloud-prod" MdbGateway: "https://mdb.api.cloud.yandex.net:443" - ObjectStorageEndpoint: "https://storage-internal.cloud.yandex.net" + ObjectStorageEndpoint: "https://storage.yandexcloud.net" IdsPrefix: "kr" QueryArtifactsCompressionMethod: "zstd_6" MonitoringEndpoint: "monitoring.api.cloud.yandex.net" KeepInternalErrors: true UseNativeProtocolForClickHouse: true - DisableSslForGenericDataSources: true ShowQueryTimeline: true } @@ -45,7 +45,11 @@ ControlPlaneStorage { AvailableStreamingConnection: "OBJECT_STORAGE" AvailableStreamingConnection: "DATA_STREAMS" AvailableStreamingConnection: "MONITORING" + AvailableStreamingConnection: "POSTGRESQL_CLUSTER" + AvailableStreamingConnection: "CLICKHOUSE_CLUSTER" AvailableStreamingConnection: "YDB_DATABASE" + AvailableStreamingConnection: "GREENPLUM_CLUSTER" + AvailableStreamingConnection: "MYSQL_CLUSTER" AvailableBinding: "OBJECT_STORAGE" AvailableBinding: "DATA_STREAMS" @@ -69,6 +73,157 @@ DbPool { } } +Gateways { + Enabled: true + + Dq { + DefaultSettings { + Name: "EnableComputeActor" + Value: "1" + } + DefaultSettings { + Name: "ComputeActorType" + Value: "async" + } + DefaultSettings { + Name: "AnalyzeQuery" + Value: "true" + } + DefaultSettings { + Name: "MaxTasksPerStage" + Value: "200" + } + DefaultSettings { + Name: "MaxTasksPerOperation" + Value: "200" + } + DefaultSettings { + Name: "EnableInsert" + Value: "true" + } + DefaultSettings { + Name: "_EnablePrecompute" + Value: "true" + } + DefaultSettings { + Name: "UseAggPhases" + Value: "true" + } + DefaultSettings { + Name: "HashJoinMode" + Value: "grace" + } + DefaultSettings { + Name: "UseFastPickleTransport" + Value: "true" + } + DefaultSettings { + Name: "UseOOBTransport" + Value: "true" + } + DefaultSettings { + Name: "UseWideChannels" + Value: "true" + } + DefaultSettings { + Name: "_SkipRevisionCheck" + Value: "true" + } + DefaultSettings { + Name: "EnableDqReplicate" + Value: "true" + } + DefaultSettings { + Name: "_TableTimeout" + Value: "600000" + } + } + + Generic { + MdbGateway: "https://mdb.api.cloud.yandex.net:443" + + Connector { + UseSsl: false + + Endpoint { + host: "localhost" + port: 50051 + } + } + + DefaultSettings { + Name: "DateTimeFormat" + Value: "string" + } + } + + HttpGateway { + BuffersSizePerStream: 5000000 + ConnectionTimeoutSeconds: 15 + LowSpeedBytesLimit: 1024 + LowSpeedTimeSeconds: 20 + MaxInFlightCount: 2000 + MaxSimulatenousDownloadsSize: 2000000000 + RequestTimeoutSeconds: 0 + } + + Pq { + ClusterMapping { + Name: "pq" + Endpoint: "localhost:2135" + Database: "local" + ClusterType: CT_DATA_STREAMS + UseSsl: True + SharedReading: True + ReadGroup: "fqrun" + } + } + + Solomon { + DefaultSettings { + Name: "_EnableReading" + Value: "true" + } + } + + S3 { + AllowConcurrentListings: true + AllowLocalFiles: true + FileSizeLimit: 100000000000 + GeneratorPathsLimit: 50000 + ListingCallbackPerThreadQueueSize: 100 + ListingCallbackThreadCount: 1 + MaxDirectoriesAndFilesPerQuery: 500000 + MaxDiscoveryFilesPerQuery: 1000 + MaxFilesPerQuery: 500000 + MaxInflightListsPerQuery: 100 + MinDesiredDirectoriesOfFilesPerQuery: 1000 + RegexpCacheSize: 100 + + DefaultSettings { + Name: "AtomicUploadCommit" + Value: "true" + } + DefaultSettings { + Name: "UseBlocksSource" + Value: "true" + } + DefaultSettings { + Name: "UseRuntimeListing" + Value: "true" + } + } + + YqlCore { + Flags { + Name: "_EnableMatchRecognize" + } + Flags { + Name: "_EnableStreamLookupJoin" + } + } +} + NodesManager { Enabled: true } diff --git a/ydb/tests/tools/fqrun/fqprun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp similarity index 50% rename from ydb/tests/tools/fqrun/fqprun.cpp rename to ydb/tests/tools/fqrun/fqrun.cpp index 277aa8d2ed90..c42fcec6e43c 100644 --- a/ydb/tests/tools/fqrun/fqprun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -3,6 +3,7 @@ #include +#include #include #include #include @@ -15,6 +16,8 @@ namespace { struct TExecutionOptions { TString Query; + std::vector Connections; + std::vector Bindings; bool HasResults() const { return !Query.empty(); @@ -36,6 +39,20 @@ struct TExecutionOptions { void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& runner) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); + if (!executionOptions.Connections.empty()) { + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Creating connections..." << colors.Default() << Endl; + if (!runner.CreateConnections(executionOptions.Connections)) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Failed to create connections"; + } + } + + if (!executionOptions.Bindings.empty()) { + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Creating bindings..." << colors.Default() << Endl; + if (!runner.CreateBindings(executionOptions.Bindings)) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Failed to create bindings"; + } + } + if (executionOptions.Query) { Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing query..." << colors.Default() << Endl; if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions())) { @@ -89,6 +106,8 @@ void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& } class TMain : public TMainBase { + using EVerbose = TFqSetupSettings::EVerbose; + protected: void RegisterOptions(NLastGetopt::TOpts& options) override { options.SetTitle("FqRun -- tool to execute stream queries through FQ proxy"); @@ -101,6 +120,33 @@ class TMain : public TMainBase { .RequiredArgument("file") .StoreMappedResult(&ExecutionOptions.Query, &LoadFile); + options.AddLongOption('s', "sql", "Query SQL text to execute") + .RequiredArgument("str") + .StoreResult(&ExecutionOptions.Query); + options.MutuallyExclusive("query", "sql"); + + options.AddLongOption('c', "connection", "External datasource connection protobuf FederatedQuery::ConnectionContent") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + auto& connection = ExecutionOptions.Connections.emplace_back(); + const TString file(TString(option->CurValOrDef())); + if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &connection)) { + ythrow yexception() << "Bad format of FQ connection in file '" << file << "'"; + } + SetupAcl(connection.mutable_acl()); + }); + + options.AddLongOption('b', "binding", "External datasource binding protobuf FederatedQuery::BindingContent") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + auto& binding = ExecutionOptions.Bindings.emplace_back(); + const TString file(TString(option->CurValOrDef())); + if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &binding)) { + ythrow yexception() << "Bad format of FQ binding in file '" << file << "'"; + } + SetupAcl(binding.mutable_acl()); + }); + options.AddLongOption("fq-cfg", "File with FQ config (NFq::NConfig::TConfig for FQ proxy)") .RequiredArgument("file") .DefaultValue("./configuration/fq_config.conf") @@ -110,6 +156,24 @@ class TMain : public TMainBase { } }); + options.AddLongOption("emulate-s3", "Enable readings by s3 provider from files, `bucket` value in connection - path to folder with files") + .NoArgument() + .SetFlag(&RunnerOptions.FqSettings.EmulateS3); + + options.AddLongOption("emulate-pq", "Emulate YDS with local file, accepts list of tables to emulate with following format: topic@file (can be used in query from cluster `pq`)") + .RequiredArgument("topic@file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TStringBuf topicName; + TStringBuf filePath; + TStringBuf(option->CurVal()).Split('@', topicName, filePath); + if (topicName.empty() || filePath.empty()) { + ythrow yexception() << "Incorrect PQ file mapping, expected form topic@file"; + } + if (!PqFilesMapping.emplace(topicName, filePath).second) { + ythrow yexception() << "Got duplicated topic name: " << topicName; + } + }); + // Outputs options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)") @@ -128,24 +192,72 @@ class TMain : public TMainBase { .Choices(resultFormat.GetChoices()) .StoreMappedResultT(&RunnerOptions.ResultOutputFormat, resultFormat); + // Pipeline settings + + options.AddLongOption("verbose", TStringBuilder() << "Common verbose level (max level " << static_cast(EVerbose::Max) - 1 << ")") + .RequiredArgument("uint") + .DefaultValue(static_cast(EVerbose::Info)) + .StoreMappedResultT(&RunnerOptions.FqSettings.VerboseLevel, [](ui8 value) { + return static_cast(std::min(value, static_cast(EVerbose::Max))); + }); + RegisterKikimrOptions(options, RunnerOptions.FqSettings); } int DoRun(NLastGetopt::TOptsParseResult&&) override { ExecutionOptions.Validate(RunnerOptions); + RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE); + + auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways(); + FillTokens(gatewayConfig.mutable_pq()); + FillTokens(gatewayConfig.mutable_s3()); + FillTokens(gatewayConfig.mutable_generic()); + FillTokens(gatewayConfig.mutable_ydb()); + FillTokens(gatewayConfig.mutable_solomon()); + + for (auto& gateway : *RunnerOptions.FqSettings.FqConfig.mutable_gateways()->mutable_pq()->mutable_clustermapping()) { + if (!gateway.GetToken()) { + gateway.SetToken(RunnerOptions.FqSettings.YqlToken); + } + } + for (auto& gateway : *RunnerOptions.FqSettings.FqConfig.mutable_gateways()->mutable_s3()->mutable_clustermapping()) { + if (!gateway.GetToken()) { + gateway.SetToken(RunnerOptions.FqSettings.YqlToken); + } + } + auto& logConfig = RunnerOptions.FqSettings.LogConfig; logConfig.SetDefaultLevel(NActors::NLog::EPriority::PRI_CRIT); FillLogConfig(logConfig); + if (!PqFilesMapping.empty()) { + auto fileGateway = MakeIntrusive(); + for (const auto& [topic, file] : PqFilesMapping) { + fileGateway->AddDummyTopic(NYql::TDummyTopic("pq", TString(topic), TString(file))); + } + RunnerOptions.FqSettings.PqGateway = std::move(fileGateway); + } + RunScript(ExecutionOptions, RunnerOptions); return 0; } +private: + template + void FillTokens(TGatewayConfig* gateway) const { + for (auto& cluster : *gateway->mutable_clustermapping()) { + if (!cluster.GetToken()) { + cluster.SetToken(RunnerOptions.FqSettings.YqlToken); + } + } + } + private: TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; + std::unordered_map PqFilesMapping; }; } // anonymous namespace diff --git a/ydb/tests/tools/fqrun/src/common.cpp b/ydb/tests/tools/fqrun/src/common.cpp new file mode 100644 index 000000000000..7d0fc1804532 --- /dev/null +++ b/ydb/tests/tools/fqrun/src/common.cpp @@ -0,0 +1,11 @@ +#include "common.h" + +namespace NFqRun { + +void SetupAcl(FederatedQuery::Acl* acl) { + if (acl->visibility() == FederatedQuery::Acl::VISIBILITY_UNSPECIFIED) { + acl->set_visibility(FederatedQuery::Acl::SCOPE); + } +} + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h index 5af4b019dbc3..53dfbe654e93 100644 --- a/ydb/tests/tools/fqrun/src/common.h +++ b/ydb/tests/tools/fqrun/src/common.h @@ -4,13 +4,29 @@ #include #include +#include #include namespace NFqRun { +constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; constexpr i64 MAX_RESULT_SET_ROWS = 1000; struct TFqSetupSettings : public NKikimrRun::TServerSettings { + enum class EVerbose { + None, + Info, + QueriesText, + InitLogs, + Max + }; + + bool EmulateS3 = false; + + EVerbose VerboseLevel = EVerbose::Info; + + TString YqlToken; + NYql::IPqGateway::TPtr PqGateway; NFq::NConfig::TConfig FqConfig; NKikimrConfig::TLogConfig LogConfig; }; @@ -26,4 +42,6 @@ struct TRequestOptions { TString Query; }; +void SetupAcl(FederatedQuery::Acl* acl); + } // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_runner.cpp b/ydb/tests/tools/fqrun/src/fq_runner.cpp index 6fe51cfb50f9..33995c17fac8 100644 --- a/ydb/tests/tools/fqrun/src/fq_runner.cpp +++ b/ydb/tests/tools/fqrun/src/fq_runner.cpp @@ -8,17 +8,24 @@ using namespace NKikimrRun; namespace NFqRun { class TFqRunner::TImpl { + using EVerbose = TFqSetupSettings::EVerbose; + static constexpr TDuration REFRESH_PERIOD = TDuration::Seconds(1); public: explicit TImpl(const TRunnerOptions& options) : Options(options) + , VerboseLevel(options.FqSettings.VerboseLevel) , FqSetup(options.FqSettings) , CerrColors(NColorizer::AutoColors(Cerr)) , CoutColors(NColorizer::AutoColors(Cout)) {} bool ExecuteStreamQuery(const TRequestOptions& query) { + if (VerboseLevel >= EVerbose::QueriesText) { + Cout << CoutColors.Cyan() << "Starting stream request:\n" << CoutColors.Default() << query.Query << Endl; + } + const TRequestResult status = FqSetup.StreamRequest(query, StreamQueryId); if (!status.IsSuccess()) { @@ -52,7 +59,7 @@ class TFqRunner::TImpl { if (Options.ResultOutput) { Cout << CoutColors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Writing query results..." << CoutColors.Default() << Endl; for (size_t i = 0; i < ResultSets.size(); ++i) { - if (ResultSets.size() > 1) { + if (ResultSets.size() > 1 && VerboseLevel >= EVerbose::Info) { *Options.ResultOutput << CoutColors.Cyan() << "Result set " << i + 1 << ":" << CoutColors.Default() << Endl; } PrintResultSet(Options.ResultOutputFormat, *Options.ResultOutput, ResultSets[i]); @@ -60,6 +67,53 @@ class TFqRunner::TImpl { } } + bool CreateConnections(const std::vector& connections) { + for (const auto& connection : connections) { + if (VerboseLevel >= EVerbose::QueriesText) { + Cout << CoutColors.Cyan() << "Creating connection:\n" << CoutColors.Default() << Endl << connection.DebugString() << Endl; + } + + TString connectionId; + const TRequestResult status = FqSetup.CreateConnection(connection, connectionId); + + if (!status.IsSuccess()) { + Cerr << CerrColors.Red() << "Failed to create connection '" << connection.name() << "', reason:" << CerrColors.Default() << Endl << status.ToString() << Endl; + return false; + } + + if (!ConnectionNameToId.emplace(connection.name(), connectionId).second) { + Cerr << CerrColors.Red() << "Got duplicated connection name '" << connection.name() << "'" << CerrColors.Default() << Endl; + return false; + } + } + + return true; + } + + bool CreateBindings(const std::vector& bindings) const { + for (auto binding : bindings) { + if (VerboseLevel >= EVerbose::QueriesText) { + Cout << CoutColors.Cyan() << "Creating binding:\n" << CoutColors.Default() << Endl << binding.DebugString() << Endl; + } + + const auto it = ConnectionNameToId.find(binding.connection_id()); + if (it == ConnectionNameToId.end()) { + Cerr << CerrColors.Red() << "Failed to create binding '" << binding.name() << "', connection with name '" << binding.connection_id() << "' not found" << CerrColors.Default() << Endl; + return false; + } + + binding.set_connection_id(it->second); + const TRequestResult status = FqSetup.CreateBinding(binding); + + if (!status.IsSuccess()) { + Cerr << CerrColors.Red() << "Failed to create binding '" << binding.name() << "', reason:" << CerrColors.Default() << Endl << status.ToString() << Endl; + return false; + } + } + + return true; + } + private: static bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status) { using EStatus = FederatedQuery::QueryMeta; @@ -70,7 +124,13 @@ class TFqRunner::TImpl { StartTime = TInstant::Now(); while (true) { - const TRequestResult status = FqSetup.DescribeQuery(StreamQueryId, ExecutionMeta); + TExecutionMeta meta; + const TRequestResult status = FqSetup.DescribeQuery(StreamQueryId, meta); + + if (meta.TransientIssues.Size() != ExecutionMeta.TransientIssues.Size() && VerboseLevel >= EVerbose::Info) { + Cerr << CerrColors.Red() << "Query transient issues updated:" << CerrColors.Default() << Endl << meta.TransientIssues.ToString() << Endl; + } + ExecutionMeta = meta; if (IsFinalStatus(ExecutionMeta.Status)) { break; @@ -84,13 +144,12 @@ class TFqRunner::TImpl { Sleep(REFRESH_PERIOD); } - Cout << CoutColors.Cyan() << "Query finished. Duration: " << TInstant::Now() - StartTime << CoutColors.Default() << Endl; + if (VerboseLevel >= EVerbose::Info) { + Cout << CoutColors.Cyan() << "Query finished. Duration: " << TInstant::Now() - StartTime << CoutColors.Default() << Endl; + } if (ExecutionMeta.Status != FederatedQuery::QueryMeta::COMPLETED) { Cerr << CerrColors.Red() << "Failed to execute query, invalid final status " << FederatedQuery::QueryMeta::ComputeStatus_Name(ExecutionMeta.Status) << ", issues:" << CerrColors.Default() << Endl << ExecutionMeta.Issues.ToString() << Endl; - if (ExecutionMeta.TransientIssues) { - Cerr << CerrColors.Red() << "Transient issues:" << CerrColors.Default() << Endl << ExecutionMeta.TransientIssues.ToString() << Endl; - } return false; } @@ -98,15 +157,12 @@ class TFqRunner::TImpl { Cerr << CerrColors.Red() << "Query finished with issues:" << CerrColors.Default() << Endl << ExecutionMeta.Issues.ToString() << Endl; } - if (ExecutionMeta.TransientIssues) { - Cerr << CerrColors.Red() << "Query finished with transient issues:" << CerrColors.Default() << Endl << ExecutionMeta.TransientIssues.ToString() << Endl; - } - return true; } private: const TRunnerOptions Options; + const EVerbose VerboseLevel; const TFqSetup FqSetup; const NColorizer::TColors CerrColors; const NColorizer::TColors CoutColors; @@ -115,6 +171,7 @@ class TFqRunner::TImpl { TInstant StartTime; TExecutionMeta ExecutionMeta; std::vector ResultSets; + std::unordered_map ConnectionNameToId; }; TFqRunner::TFqRunner(const TRunnerOptions& options) @@ -133,4 +190,12 @@ void TFqRunner::PrintQueryResults() const { Impl->PrintQueryResults(); } +bool TFqRunner::CreateConnections(const std::vector& connections) const { + return Impl->CreateConnections(connections); +} + +bool TFqRunner::CreateBindings(const std::vector& bindings) const { + return Impl->CreateBindings(bindings); +} + } // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_runner.h b/ydb/tests/tools/fqrun/src/fq_runner.h index 7b803648dcf9..b6e85db142bc 100644 --- a/ydb/tests/tools/fqrun/src/fq_runner.h +++ b/ydb/tests/tools/fqrun/src/fq_runner.h @@ -16,6 +16,10 @@ class TFqRunner { void PrintQueryResults() const; + bool CreateConnections(const std::vector& connections) const; + + bool CreateBindings(const std::vector& bindings) const; + private: class TImpl; std::shared_ptr Impl; diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp index 6b16b4f5af15..e28aec1a410c 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.cpp +++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp @@ -17,13 +17,15 @@ namespace NFqRun { namespace { -Ydb::StatusIds::StatusCode GetStatus(const NYql::TIssues& issues) { - return issues ? Ydb::StatusIds::BAD_REQUEST : Ydb::StatusIds::SUCCESS; +TRequestResult GetStatus(const NYql::TIssues& issues) { + return TRequestResult(issues ? Ydb::StatusIds::BAD_REQUEST : Ydb::StatusIds::SUCCESS, issues); } } // anonymous namespace class TFqSetup::TImpl { + using EVerbose = TFqSetupSettings::EVerbose; + private: TAutoPtr CreateLogBackend() const { if (Settings.LogOutputFile) { @@ -46,7 +48,7 @@ class TFqSetup::TImpl { NKikimr::Tests::TServerSettings serverSettings(PortManager.GetPort()); serverSettings.SetDomainName(Settings.DomainName); - serverSettings.SetVerbose(false); + serverSettings.SetVerbose(Settings.VerboseLevel >= EVerbose::InitLogs); NKikimrConfig::TAppConfig config; *config.MutableLogConfig() = Settings.LogConfig; @@ -83,7 +85,7 @@ class TFqSetup::TImpl { Client->InitRootScheme(); } - NFq::NConfig::TConfig GetFqProxyConfig(ui32 grpcPort, ui32 httpPort) const { + NFq::NConfig::TConfig GetFqProxyConfig(ui32 grpcPort) const { auto fqConfig = Settings.FqConfig; fqConfig.MutableControlPlaneStorage()->AddSuperUsers(BUILTIN_ACL_ROOT); @@ -109,14 +111,15 @@ class TFqSetup::TImpl { nodesMenagerConfig->SetPort(grpcPort); nodesMenagerConfig->SetHost("localhost"); - fqConfig.MutableCommon()->SetYdbMvpCloudEndpoint(TStringBuilder() << "http://localhost:" << httpPort << "/yql-mock/abc"); + if (Settings.EmulateS3) { + fqConfig.MutableCommon()->SetObjectStorageEndpoint("file://"); + } return fqConfig; } void InitializeFqProxy(ui32 grpcPort) { - const ui32 httpPort = PortManager.GetPort(); - const auto& fqConfig = GetFqProxyConfig(grpcPort, httpPort); + const auto& fqConfig = GetFqProxyConfig(grpcPort); const auto counters = GetRuntime()->GetAppData().Counters->GetSubgroup("counters", "yq"); YqSharedResources = NFq::CreateYqSharedResources(fqConfig, NKikimr::CreateYdbCredentialsProviderFactory, counters); @@ -131,10 +134,9 @@ class TFqSetup::TImpl { NFq::Init( fqConfig, GetRuntime()->GetNodeId(), actorRegistrator, &GetRuntime()->GetAppData(), - Settings.DomainName, nullptr, YqSharedResources, folderServiceFactory, 0, {} + Settings.DomainName, nullptr, YqSharedResources, folderServiceFactory, 0, {}, Settings.PqGateway ); - - NFq::InitTest(GetRuntime(), httpPort, grpcPort, YqSharedResources); + YqSharedResources->Init(GetRuntime()->GetActorSystem(0)); } public: @@ -145,11 +147,11 @@ class TFqSetup::TImpl { InitializeServer(grpcPort); InitializeFqProxy(grpcPort); - if (Settings.MonitoringEnabled) { + if (Settings.MonitoringEnabled && Settings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Monitoring port: " << CoutColors.Default() << GetRuntime()->GetMonPort() << Endl; } - if (Settings.GrpcEnabled) { + if (Settings.GrpcEnabled && Settings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Domain gRPC port: " << CoutColors.Default() << grpcPort << Endl; } } @@ -167,7 +169,7 @@ class TFqSetup::TImpl { auto& content = *request.mutable_content(); content.set_type(FederatedQuery::QueryContent::STREAMING); content.set_text(query.Query); - content.mutable_acl()->set_visibility(::FederatedQuery::Acl::SCOPE); + SetupAcl(content.mutable_acl()); return RunControlPlaneProxyRequest(request); } @@ -188,6 +190,20 @@ class TFqSetup::TImpl { return RunControlPlaneProxyRequest(request); } + NFq::TEvControlPlaneProxy::TEvCreateConnectionResponse::TPtr CreateConnection(const FederatedQuery::ConnectionContent& connection) const { + FederatedQuery::CreateConnectionRequest request; + *request.mutable_content() = connection; + + return RunControlPlaneProxyRequest(request); + } + + NFq::TEvControlPlaneProxy::TEvCreateBindingResponse::TPtr CreateBinding(const FederatedQuery::BindingContent& binding) const { + FederatedQuery::CreateBindingRequest request; + *request.mutable_content() = binding; + + return RunControlPlaneProxyRequest(request); + } + private: NActors::TTestActorRuntime* GetRuntime() const { return Server->GetRuntime(); @@ -195,7 +211,7 @@ class TFqSetup::TImpl { template typename TResponse::TPtr RunControlPlaneProxyRequest(const TProto& request) const { - auto event = std::make_unique("yandexcloud://kqprun", request, BUILTIN_ACL_ROOT, BUILTIN_ACL_ROOT, TVector{}); + auto event = std::make_unique("yandexcloud://fqrun", request, BUILTIN_ACL_ROOT, Settings.YqlToken ? Settings.YqlToken : "fqrun", TVector{}); return RunControlPlaneProxyRequest(std::move(event)); } @@ -228,8 +244,7 @@ TRequestResult TFqSetup::StreamRequest(const TRequestOptions& query, TString& qu queryId = response->Get()->Result.query_id(); - const auto& issues = response->Get()->Issues; - return TRequestResult(GetStatus(issues), issues); + return GetStatus(response->Get()->Issues); } TRequestResult TFqSetup::DescribeQuery(const TString& queryId, TExecutionMeta& meta) const { @@ -245,8 +260,7 @@ TRequestResult TFqSetup::DescribeQuery(const TString& queryId, TExecutionMeta& m meta.ResultSetSizes.emplace_back(resultMeta.rows_count()); } - const auto& issues = response->Get()->Issues; - return TRequestResult(GetStatus(issues), issues); + return GetStatus(response->Get()->Issues); } TRequestResult TFqSetup::FetchQueryResults(const TString& queryId, i32 resultSetId, Ydb::ResultSet& resultSet) const { @@ -254,8 +268,20 @@ TRequestResult TFqSetup::FetchQueryResults(const TString& queryId, i32 resultSet resultSet = response->Get()->Result.result_set(); - const auto& issues = response->Get()->Issues; - return TRequestResult(GetStatus(issues), issues); + return GetStatus(response->Get()->Issues); +} + +TRequestResult TFqSetup::CreateConnection(const FederatedQuery::ConnectionContent& connection, TString& connectionId) const { + const auto response = Impl->CreateConnection(connection); + + connectionId = response->Get()->Result.connection_id(); + + return GetStatus(response->Get()->Issues); +} + +TRequestResult TFqSetup::CreateBinding(const FederatedQuery::BindingContent& binding) const { + const auto response = Impl->CreateBinding(binding); + return GetStatus(response->Get()->Issues); } } // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_setup.h b/ydb/tests/tools/fqrun/src/fq_setup.h index c78fad43e2dd..2ef7733bf028 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.h +++ b/ydb/tests/tools/fqrun/src/fq_setup.h @@ -25,6 +25,10 @@ class TFqSetup { TRequestResult FetchQueryResults(const TString& queryId, i32 resultSetId, Ydb::ResultSet& resultSet) const; + TRequestResult CreateConnection(const FederatedQuery::ConnectionContent& connection, TString& connectionId) const; + + TRequestResult CreateBinding(const FederatedQuery::BindingContent& binding) const; + private: class TImpl; std::shared_ptr Impl; diff --git a/ydb/tests/tools/fqrun/src/ya.make b/ydb/tests/tools/fqrun/src/ya.make index 5173c30f4be2..6f44cf16c38d 100644 --- a/ydb/tests/tools/fqrun/src/ya.make +++ b/ydb/tests/tools/fqrun/src/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + common.cpp fq_runner.cpp fq_setup.cpp ) @@ -17,6 +18,7 @@ PEERDIR( ydb/library/folder_service/mock ydb/library/grpc/server/actors ydb/library/security + ydb/library/yql/providers/pq/provider ydb/tests/tools/kqprun/runlib ) diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index fa7212f0f8bc..bcaf2ace4cf4 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -1,18 +1,24 @@ -PROGRAM(fqprun) +PROGRAM(fqrun) SRCS( - fqprun.cpp + fqrun.cpp ) PEERDIR( library/cpp/colorizer + library/cpp/getopt util + ydb/library/yql/providers/pq/gateway/dummy ydb/tests/tools/fqrun/src ydb/tests/tools/kqprun/runlib yql/essentials/parser/pg_wrapper yql/essentials/sql/pg ) +PEERDIR( + yql/essentials/udfs/common/compress_base +) + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/tests/tools/kqprun/README.md b/ydb/tests/tools/kqprun/README.md new file mode 100644 index 000000000000..e5d82a018c56 --- /dev/null +++ b/ydb/tests/tools/kqprun/README.md @@ -0,0 +1,62 @@ +# KqpRun tool + +Tool can be used to execute queries by using kikimr provider. + +For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS`. + +## Examples + +### Queries + +* Run select 42: + ```(bash) + ./kqprun --sql "SELECT 42" + ``` + +* Queries shooting: + ```(bash) + ./kqprun --sql "SELECT 42" -C async --loop-count 0 --loop-delay 100 --inflight-limit 10 + ``` + +### Logs + +* Setup log settings (`-C query` for clear logs): + ```(bash) + ./kqprun --sql "SELECT 42" -C query --log-default=warn --log KQP_YQL=trace --log-file query.log + ``` + +* Trace opt: + ```(bash) + ./kqprun --sql "SELECT 42" -C query -T script + ``` + +* Runtime statistics: + ```(bash) + ./kqprun --sql "SELECT 42" --script-statistics stats.log --script-timeline-file timeline.svg + ``` + +### Cluster + +* Embedded UI: + ```(bash) + ./kqprun -M 32000 + ``` + + Monitoring endpoint: https://localhost:32000 + +* gRPC endpoint: + ```(bash) + ./kqprun -G 32000 + ``` + + Connect with ydb CLI: `ydb -e grpc://localhost:32000 -d /Root` + +* Static storage: + ```(bash) + ./kqprun -M 32000 --storage-path ./storage --storage-size 32 + ``` + +* Create serverless domain and execute query in this domain: + ```(bash) + ./kqprun -M 32000 --shared my-shared --serverless my-serverless --sql "SELECT 42" -D /Root/my-serverless + ``` diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index b736157d4ac1..e860615e9fd7 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -427,6 +427,8 @@ TIntrusivePtr CreateFunctionRegistr class TMain : public TMainBase { + using EVerbose = TYdbSetupSettings::EVerbose; + inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE); inline static IOutputStream* ProfileAllocationsOutput = nullptr; inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); @@ -476,6 +478,10 @@ class TMain : public TMainBase { ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal())); }); + options.AddLongOption("sql", "Script query SQL text to execute (typically DML query)") + .RequiredArgument("str") + .AppendTo(&ExecutionOptions.ScriptQueries); + options.AddLongOption("templates", "Enable templates for -s and -p queries, such as ${YQL_TOKEN} and ${QUERY_ID}") .NoArgument() .SetFlag(&ExecutionOptions.UseTemplates); @@ -663,11 +669,11 @@ class TMain : public TMainBase { .DefaultValue(0) .StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit); - options.AddLongOption("verbose", TStringBuilder() << "Common verbose level (max level " << static_cast(TYdbSetupSettings::EVerbose::Max) - 1 << ")") + options.AddLongOption("verbose", TStringBuilder() << "Common verbose level (max level " << static_cast(EVerbose::Max) - 1 << ")") .RequiredArgument("uint") - .DefaultValue(static_cast(TYdbSetupSettings::EVerbose::Info)) + .DefaultValue(static_cast(EVerbose::Info)) .StoreMappedResultT(&RunnerOptions.YdbSettings.VerboseLevel, [](ui8 value) { - return static_cast(std::min(value, static_cast(TYdbSetupSettings::EVerbose::Max))); + return static_cast(std::min(value, static_cast(EVerbose::Max))); }); TChoices verbose({ diff --git a/ydb/tests/tools/kqprun/runlib/application.h b/ydb/tests/tools/kqprun/runlib/application.h index 6004d30fcb7d..88eb2e289011 100644 --- a/ydb/tests/tools/kqprun/runlib/application.h +++ b/ydb/tests/tools/kqprun/runlib/application.h @@ -2,7 +2,7 @@ #include "settings.h" -#include +#include #include diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index f2378965461b..bbc432be16e4 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -162,7 +162,7 @@ class TKqpRunner::TImpl { if (Options_.ResultOutput) { Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Writing script query results..." << CoutColors_.Default() << Endl; for (size_t i = 0; i < ResultSets_.size(); ++i) { - if (ResultSets_.size() > 1 && Options_.YdbSettings.VerboseLevel >= EVerbose::Info) { + if (ResultSets_.size() > 1 && VerboseLevel_ >= EVerbose::Info) { *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; } PrintResultSet(Options_.ResultOutputFormat, *Options_.ResultOutput, ResultSets_[i]); @@ -244,7 +244,7 @@ class TKqpRunner::TImpl { void PrintSchemeQueryAst(const TString& ast) const { if (Options_.SchemeQueryAstOutput) { - if (Options_.YdbSettings.VerboseLevel >= EVerbose::Info) { + if (VerboseLevel_ >= EVerbose::Info) { Cout << CoutColors_.Cyan() << "Writing scheme query ast" << CoutColors_.Default() << Endl; } Options_.SchemeQueryAstOutput->Write(ast); @@ -253,7 +253,7 @@ class TKqpRunner::TImpl { void PrintScriptAst(size_t queryId, const TString& ast) const { if (const auto output = GetValue(queryId, Options_.ScriptQueryAstOutputs, nullptr)) { - if (Options_.YdbSettings.VerboseLevel >= EVerbose::Info) { + if (VerboseLevel_ >= EVerbose::Info) { Cout << CoutColors_.Cyan() << "Writing script query ast" << CoutColors_.Default() << Endl; } output->Write(ast); @@ -262,7 +262,7 @@ class TKqpRunner::TImpl { void PrintScriptPlan(size_t queryId, const TString& plan) const { if (const auto output = GetValue(queryId, Options_.ScriptQueryPlanOutputs, nullptr)) { - if (Options_.YdbSettings.VerboseLevel >= EVerbose::Info) { + if (VerboseLevel_ >= EVerbose::Info) { Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; } StatsPrinter_.PrintPlan(plan, *output); From 3fe0cbeb4cf31eeda92a0ee005c62b0c5ca78b0f Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 13 Feb 2025 20:06:25 +0000 Subject: [PATCH 2/4] Removed unused code --- ydb/tests/tools/fqrun/fqrun.cpp | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index c42fcec6e43c..587bc405b034 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -216,17 +216,6 @@ class TMain : public TMainBase { FillTokens(gatewayConfig.mutable_ydb()); FillTokens(gatewayConfig.mutable_solomon()); - for (auto& gateway : *RunnerOptions.FqSettings.FqConfig.mutable_gateways()->mutable_pq()->mutable_clustermapping()) { - if (!gateway.GetToken()) { - gateway.SetToken(RunnerOptions.FqSettings.YqlToken); - } - } - for (auto& gateway : *RunnerOptions.FqSettings.FqConfig.mutable_gateways()->mutable_s3()->mutable_clustermapping()) { - if (!gateway.GetToken()) { - gateway.SetToken(RunnerOptions.FqSettings.YqlToken); - } - } - auto& logConfig = RunnerOptions.FqSettings.LogConfig; logConfig.SetDefaultLevel(NActors::NLog::EPriority::PRI_CRIT); FillLogConfig(logConfig); From d859b697e622a01f978a3e122444543f7f4cff97 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 15 Feb 2025 09:41:19 +0000 Subject: [PATCH 3/4] Fixed fq config --- .../tools/fqrun/configuration/fq_config.conf | 131 ++++++++++-------- ydb/tests/tools/fqrun/src/fq_setup.cpp | 4 + ydb/tests/tools/fqrun/ya.make | 1 + 3 files changed, 76 insertions(+), 60 deletions(-) diff --git a/ydb/tests/tools/fqrun/configuration/fq_config.conf b/ydb/tests/tools/fqrun/configuration/fq_config.conf index 0a6584bd0d13..433f1c60001f 100644 --- a/ydb/tests/tools/fqrun/configuration/fq_config.conf +++ b/ydb/tests/tools/fqrun/configuration/fq_config.conf @@ -4,6 +4,12 @@ EnableTaskCounters: true CheckpointCoordinator { Enabled: true + CheckpointingPeriodMillis: 30000 + MaxInflight: 1 + + CheckpointGarbageConfig { + Enabled: true + } Storage { TablePrefix: "yq/checkpoints" @@ -16,6 +22,7 @@ CheckpointCoordinator { Common { YdbMvpCloudEndpoint: "https://ydbc.ydb.cloud.yandex.net:8789/ydbc/cloud-prod" MdbGateway: "https://mdb.api.cloud.yandex.net:443" + MdbTransformHost: false ObjectStorageEndpoint: "https://storage.yandexcloud.net" IdsPrefix: "kr" QueryArtifactsCompressionMethod: "zstd_6" @@ -23,6 +30,14 @@ Common { KeepInternalErrors: true UseNativeProtocolForClickHouse: true ShowQueryTimeline: true + MaxTasksPerOperation: 400 + MaxTasksPerStage: 50 + PqReconnectPeriod: "30m" + + YdbDriverConfig { + ClientThreadsNum: 6 + NetworkThreadsNum: 6 + } } ControlPlaneProxy { @@ -32,6 +47,15 @@ ControlPlaneProxy { ControlPlaneStorage { Enabled: true StatsMode: STATS_MODE_PROFILE + DumpRawStatistics: true + TasksBatchSize: 100 + NumTasksProportion: 4 + AnalyticsRetryCounterLimit: 20 + StreamingRetryCounterLimit: 20 + AnalyticsRetryCounterUpdateTime: "1d" + StreamingRetryCounterUpdateTime: "1d" + TaskLeaseTtl: "30s" + DisableCurrentIam: false AvailableConnection: "OBJECT_STORAGE" AvailableConnection: "DATA_STREAMS" @@ -78,65 +102,13 @@ Gateways { Dq { DefaultSettings { - Name: "EnableComputeActor" + Name: "HashShuffleTasksRatio" Value: "1" } DefaultSettings { - Name: "ComputeActorType" - Value: "async" - } - DefaultSettings { - Name: "AnalyzeQuery" - Value: "true" - } - DefaultSettings { - Name: "MaxTasksPerStage" - Value: "200" - } - DefaultSettings { - Name: "MaxTasksPerOperation" - Value: "200" - } - DefaultSettings { - Name: "EnableInsert" - Value: "true" - } - DefaultSettings { - Name: "_EnablePrecompute" - Value: "true" - } - DefaultSettings { - Name: "UseAggPhases" - Value: "true" - } - DefaultSettings { - Name: "HashJoinMode" - Value: "grace" - } - DefaultSettings { - Name: "UseFastPickleTransport" - Value: "true" - } - DefaultSettings { - Name: "UseOOBTransport" + Name: "UseFinalizeByKey" Value: "true" } - DefaultSettings { - Name: "UseWideChannels" - Value: "true" - } - DefaultSettings { - Name: "_SkipRevisionCheck" - Value: "true" - } - DefaultSettings { - Name: "EnableDqReplicate" - Value: "true" - } - DefaultSettings { - Name: "_TableTimeout" - Value: "600000" - } } Generic { @@ -173,8 +145,8 @@ Gateways { Endpoint: "localhost:2135" Database: "local" ClusterType: CT_DATA_STREAMS - UseSsl: True - SharedReading: True + UseSsl: true + SharedReading: true ReadGroup: "fqrun" } } @@ -200,6 +172,15 @@ Gateways { MinDesiredDirectoriesOfFilesPerQuery: 1000 RegexpCacheSize: 100 + FormatSizeLimit { + Name: "parquet" + FileSizeLimit: 52428800 + } + FormatSizeLimit { + Name: "raw" + FileSizeLimit: 52428800 + } + DefaultSettings { Name: "AtomicUploadCommit" Value: "true" @@ -208,10 +189,6 @@ Gateways { Name: "UseBlocksSource" Value: "true" } - DefaultSettings { - Name: "UseRuntimeListing" - Value: "true" - } } YqlCore { @@ -224,6 +201,10 @@ Gateways { } } +Health { + Enabled: true +} + NodesManager { Enabled: true } @@ -234,6 +215,7 @@ PendingFetcher { PrivateApi { Enabled: true + Loopback: true } PrivateProxy { @@ -242,6 +224,23 @@ PrivateProxy { QuotasManager { Enabled: true + QuotaDescriptions { + SubjectType: "cloud" + MetricName: "yq.cpuPercent.count" + HardLimit: 7500 + DefaultLimit: 3500 + } + QuotaDescriptions { + SubjectType: "cloud" + MetricName: "yq.streamingQueryDurationMinutes.count" + DefaultLimit: 10080 + } + QuotaDescriptions { + SubjectType: "cloud" + MetricName: "yq.analyticsQueryDurationMinutes.count" + HardLimit: 1440 + DefaultLimit: 30 + } } RateLimiter { @@ -261,14 +260,26 @@ RateLimiter { } } +ReadActorsFactoryConfig { + PqReadActorFactoryConfig { + CookieCommitMode: false + } +} + ResourceManager { Enabled: true + MkqlInitialMemoryLimit: 16777216 + MkqlTotalMemoryLimit: 193273528320 + MkqlAllocSize: 16777216 + MkqlTaskHardMemoryLimit: 24696061952 } RowDispatcher { Enabled: true SendStatusPeriodSec: 10 TimeoutBeforeStartSessionSec: 10 + MaxSessionUsedMemory: 16000000 + WithoutConsumer: false CompileService { ParallelCompilationLimit: 20 diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp index e28aec1a410c..394bfc5c3e16 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.cpp +++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp @@ -111,6 +111,10 @@ class TFqSetup::TImpl { nodesMenagerConfig->SetPort(grpcPort); nodesMenagerConfig->SetHost("localhost"); + auto* healthConfig = fqConfig.MutableHealth(); + healthConfig->SetPort(grpcPort); + healthConfig->SetDatabase(database); + if (Settings.EmulateS3) { fqConfig.MutableCommon()->SetObjectStorageEndpoint("file://"); } diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index bcaf2ace4cf4..6553c9748b08 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -17,6 +17,7 @@ PEERDIR( PEERDIR( yql/essentials/udfs/common/compress_base + yql/essentials/udfs/common/re2 ) YQL_LAST_ABI_VERSION() From 44bacec6edf06ac62ace0fefe809507459cf4163 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 15 Feb 2025 16:23:39 +0000 Subject: [PATCH 4/4] Fixes for SLJ --- .../libs/actors/clusters_from_connections.cpp | 27 +++++++++++++++++-- .../tools/fqrun/configuration/fq_config.conf | 4 +-- .../kqprun/configuration/app_config.conf | 2 +- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp index 1249aeafb679..3438e85fd5ca 100644 --- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp +++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp @@ -88,6 +88,19 @@ std::pair ParseHttpEndpoint(const TString& endpoint) { return std::make_pair(ToString(host), scheme != "http"); } +std::pair ParseGrpcEndpoint(const TString& endpoint) { + TStringBuf scheme; + TStringBuf address; + TStringBuf uri; + NHttp::CrackURL(endpoint, scheme, address, uri); + + TString hostname; + TIpPort port; + NHttp::CrackAddress(TString(address), hostname, port); + + return {hostname, port}; +} + void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig, const TString& name, const TString& authToken, @@ -230,8 +243,18 @@ void AddClustersFromConnections( clusterCfg->SetKind(NYql::EGenericDataSourceKind::YDB); clusterCfg->SetProtocol(NYql::EGenericProtocol::NATIVE); clusterCfg->SetName(connectionName); - clusterCfg->SetDatabaseId(db.database_id()); - clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources()); + if (const auto& databaseId = db.database_id()) { + clusterCfg->SetDatabaseId(databaseId); + clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources()); + } else { + const auto& [host, port] = ParseGrpcEndpoint(db.endpoint()); + + auto& endpoint = *clusterCfg->MutableEndpoint(); + endpoint.set_host(host); + endpoint.set_port(port); + clusterCfg->SetUseSsl(db.secure()); + clusterCfg->SetDatabaseName(db.database()); + } FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures); clusters.emplace(connectionName, GenericProviderName); break; diff --git a/ydb/tests/tools/fqrun/configuration/fq_config.conf b/ydb/tests/tools/fqrun/configuration/fq_config.conf index 433f1c60001f..576b0d394bdb 100644 --- a/ydb/tests/tools/fqrun/configuration/fq_config.conf +++ b/ydb/tests/tools/fqrun/configuration/fq_config.conf @@ -119,7 +119,7 @@ Gateways { Endpoint { host: "localhost" - port: 50051 + port: 2130 } } @@ -277,7 +277,7 @@ ResourceManager { RowDispatcher { Enabled: true SendStatusPeriodSec: 10 - TimeoutBeforeStartSessionSec: 10 + TimeoutBeforeStartSessionSec: 0 MaxSessionUsedMemory: 16000000 WithoutConsumer: false diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 4c69371ab8fc..de923393b0e1 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -99,7 +99,7 @@ QueryServiceConfig { Endpoint { host: "localhost" - port: 50051 + port: 2130 } } }