Skip to content

Commit 0d12e54

Browse files
authored
Revert "Yq 3560 Add row dispatcher to dqrun (#9697)" (#10266)
1 parent 2b4c9f9 commit 0d12e54

File tree

8 files changed

+11
-109
lines changed

8 files changed

+11
-109
lines changed

ydb/core/fq/libs/control_plane_storage/ya.make

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ PEERDIR(
2020
library/cpp/lwtrace
2121
library/cpp/protobuf/interop
2222
ydb/core/base
23-
ydb/core/external_sources
2423
ydb/core/fq/libs/actors/logging
2524
ydb/core/fq/libs/common
2625
ydb/core/fq/libs/config
@@ -34,13 +33,13 @@ PEERDIR(
3433
ydb/core/fq/libs/shared_resources
3534
ydb/core/fq/libs/ydb
3635
ydb/core/mon
37-
ydb/library/db_pool
3836
ydb/library/security
39-
ydb/library/yql/providers/s3/path_generator
40-
ydb/library/yql/public/issue
4137
ydb/public/api/protos
4238
ydb/public/sdk/cpp/client/ydb_scheme
4339
ydb/public/sdk/cpp/client/ydb_table
40+
ydb/library/db_pool
41+
ydb/library/yql/providers/s3/path_generator
42+
ydb/library/yql/public/issue
4443
)
4544

4645
YQL_LAST_ABI_VERSION()

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ void Init(
207207
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());
208208

209209
RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
210-
210+
211211
NYql::TPqGatewayServices pqServices(
212212
yqSharedResources->UserSpaceYdbDriver,
213213
pqCmConnections,

ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ class TLocalServiceHolder {
3131
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
3232
IMetricsRegistryPtr metricsRegistry,
3333
const std::function<IActor*(void)>& metricsPusherFactory,
34-
bool withSpilling,
35-
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
34+
bool withSpilling)
3635
: MetricsRegistry(metricsRegistry
3736
? metricsRegistry
3837
: CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq))
@@ -90,9 +89,6 @@ class TLocalServiceHolder {
9089
NDq::MakeDqLocalFileSpillingServiceID(nodeId),
9190
TActorSetupCmd(spillingActor, TMailboxType::Simple, 0));
9291
}
93-
for (auto& [actorId, setupCmd] : additionalLocalServices) {
94-
ServiceNode->AddLocalService(actorId, std::move(setupCmd));
95-
}
9692

9793
auto statsCollector = CreateStatsCollector(1, *ServiceNode->GetSetup(), MetricsRegistry->GetSensors());
9894

@@ -252,8 +248,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
252248
NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
253249
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
254250
IMetricsRegistryPtr metricsRegistry,
255-
const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling,
256-
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
251+
const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling)
257252
{
258253
return MakeHolder<TLocalServiceHolder>(functionRegistry,
259254
compFactory,
@@ -265,17 +260,15 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
265260
threads,
266261
metricsRegistry,
267262
metricsPusherFactory,
268-
withSpilling,
269-
std::move(additionalLocalServices));
263+
withSpilling);
270264
}
271265

272266
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
273267
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
274268
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
275269
bool withSpilling, NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
276270
IMetricsRegistryPtr metricsRegistry,
277-
const std::function<IActor*(void)>& metricsPusherFactory,
278-
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
271+
const std::function<IActor*(void)>& metricsPusherFactory)
279272
{
280273
int startPort = 31337;
281274
TRangeWalker<int> portWalker(startPort, startPort+100);
@@ -294,8 +287,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
294287
threads,
295288
metricsRegistry,
296289
metricsPusherFactory,
297-
withSpilling,
298-
std::move(additionalLocalServices)),
290+
withSpilling),
299291
CreateDqGateway("[::1]", grpcPort.Addr.GetPort()));
300292
}
301293

ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#pragma once
22

3-
#include <ydb/library/actors/core/actorsystem.h>
43
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
54
#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
65
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
@@ -18,7 +17,6 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
1817
bool withSpilling,
1918
NDq::IDqAsyncIoFactory::TPtr = nullptr, int threads = 16,
2019
IMetricsRegistryPtr metricsRegistry = {},
21-
const std::function<NActors::IActor*(void)>& metricsPusherFactory = {},
22-
TVector<std::pair<NActors::TActorId, NActors::TActorSetupCmd>>&& additionalLocalServices = {});
20+
const std::function<NActors::IActor*(void)>& metricsPusherFactory = {});
2321

2422
} // namespace NYql

ydb/library/yql/tools/dqrun/dqrun.cpp

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,8 @@
8181
#include <ydb/library/yql/public/result_format/yql_result_format_data.h>
8282

8383
#include <ydb/core/fq/libs/actors/database_resolver.h>
84-
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
8584
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
8685
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
87-
#include <ydb/core/fq/libs/init/init.h>
88-
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>
89-
9086
#include <ydb/core/util/pb.h>
9187

9288
#include <yt/cpp/mapreduce/interface/init.h>
@@ -184,17 +180,6 @@ void ReadGatewaysConfig(const TString& configFile, TGatewaysConfig* config, THas
184180
}
185181
}
186182

187-
void ReadFqConfig(const TString& fqCfgFile, NFq::NConfig::TConfig* fqConfig) {
188-
if (fqCfgFile.empty()) {
189-
return;
190-
}
191-
auto configData = TFileInput(fqCfgFile).ReadAll();
192-
using ::google::protobuf::TextFormat;
193-
if (!TextFormat::ParseFromString(configData, fqConfig)) {
194-
ythrow yexception() << "Bad format of fq configuration";
195-
}
196-
}
197-
198183
void PatchGatewaysConfig(TGatewaysConfig* config, const TString& mrJobBin, const TString& mrJobUdfsDir,
199184
size_t numThreads, bool keepTemp)
200185
{
@@ -498,35 +483,9 @@ int RunProgram(TProgramPtr program, const TRunOptions& options, const THashMap<T
498483
return 0;
499484
}
500485

501-
void InitFq(const NFq::NConfig::TConfig& fqConfig, TVector<std::pair<TActorId, TActorSetupCmd>>& additionalLocalServices) {
502-
if (fqConfig.HasRowDispatcher() && fqConfig.GetRowDispatcher().GetEnabled()) {
503-
NFq::IYqSharedResources::TPtr iSharedResources = NFq::CreateYqSharedResources(
504-
fqConfig,
505-
NKikimr::CreateYdbCredentialsProviderFactory,
506-
MakeIntrusive<NMonitoring::TDynamicCounters>());
507-
NFq::TYqSharedResources::TPtr yqSharedResources = NFq::TYqSharedResources::Cast(iSharedResources);
508-
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory;
509-
510-
NFq::NConfig::TCommonConfig commonConfig;
511-
auto rowDispatcher = NFq::NewRowDispatcherService(
512-
fqConfig.GetRowDispatcher(),
513-
commonConfig,
514-
NKikimr::CreateYdbCredentialsProviderFactory,
515-
yqSharedResources,
516-
credentialsFactory,
517-
"/tenant",
518-
MakeIntrusive<NMonitoring::TDynamicCounters>());
519-
520-
additionalLocalServices.emplace_back(
521-
NFq::RowDispatcherServiceActorId(),
522-
TActorSetupCmd(rowDispatcher.release(), TMailboxType::Simple, 0));
523-
}
524-
}
525-
526486
int RunMain(int argc, const char* argv[])
527487
{
528488
TString gatewaysCfgFile;
529-
TString fqCfgFile;
530489
TString progFile;
531490
TVector<TString> tablesMappingList;
532491
THashMap<TString, TString> tablesMapping;
@@ -622,10 +581,6 @@ int RunMain(int argc, const char* argv[])
622581
.Optional()
623582
.RequiredArgument("FILE")
624583
.StoreResult(&gatewaysCfgFile);
625-
opts.AddLongOption("fq-cfg", "federated query configuration file")
626-
.Optional()
627-
.RequiredArgument("FILE")
628-
.StoreResult(&fqCfgFile);
629584
opts.AddLongOption("fs-cfg", "Path to file storage config")
630585
.Optional()
631586
.StoreResult(&fileStorageCfg);
@@ -919,9 +874,6 @@ int RunMain(int argc, const char* argv[])
919874
setting->SetValue("1");
920875
}
921876

922-
NFq::NConfig::TConfig fqConfig;
923-
ReadFqConfig(fqCfgFile, &fqConfig);
924-
925877
if (res.Has("enable-spilling")) {
926878
auto* setting = gatewaysConfig.MutableDq()->AddDefaultSettings();
927879
setting->SetName("SpillingEngine");
@@ -1088,8 +1040,6 @@ int RunMain(int argc, const char* argv[])
10881040
clusters.emplace(to_lower(cluster.GetName()), TString{NYql::SolomonProviderName});
10891041
}
10901042
}
1091-
TVector<std::pair<TActorId, TActorSetupCmd>> additionalLocalServices;
1092-
InitFq(fqConfig, additionalLocalServices);
10931043

10941044
std::function<NActors::IActor*(void)> metricsPusherFactory = {};
10951045

@@ -1114,7 +1064,7 @@ int RunMain(int argc, const char* argv[])
11141064
bool enableSpilling = res.Has("enable-spilling");
11151065
dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, enableSpilling,
11161066
CreateAsyncIoFactory(driver, httpGateway, ytFileServices, genericClient, credentialsFactory, *funcRegistry, requestTimeout, maxRetries, pqGateway), threads,
1117-
metricsRegistry, metricsPusherFactory, std::move(additionalLocalServices));
1067+
metricsRegistry, metricsPusherFactory);
11181068
}
11191069

11201070
dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));

ydb/library/yql/tools/dqrun/examples/fq.conf

Lines changed: 0 additions & 20 deletions
This file was deleted.

ydb/library/yql/tools/dqrun/examples/gateways.conf

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ Dq {
6868
Name: "EnableDqReplicate"
6969
Value: "true"
7070
}
71-
DefaultSettings {
72-
Name: "_TableTimeout"
73-
Value: "600000"
74-
}
7571
}
7672

7773
Generic {
@@ -137,14 +133,3 @@ SqlCore {
137133
TranslationFlags: ["FlexibleTypes", "DisableAnsiOptionalAs", "EmitAggApply"]
138134
}
139135

140-
Pq {
141-
ClusterMapping {
142-
Name: "pq"
143-
Endpoint: "localhost:2135"
144-
Database: "local"
145-
ClusterType: CT_DATA_STREAMS
146-
UseSsl: True
147-
SharedReading:False
148-
}
149-
}
150-

ydb/library/yql/tools/dqrun/ya.make

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ ENDIF()
9595
ydb/library/yql/utils/actor_system
9696
ydb/core/fq/libs/actors
9797
ydb/core/fq/libs/db_id_async_resolver_impl
98-
ydb/core/fq/libs/init
99-
ydb/core/external_sources
10098

10199
ydb/library/yql/udfs/common/clickhouse/client
102100
)

0 commit comments

Comments
 (0)