Skip to content

Commit fec9769

Browse files
GrigoriyPAOleg Doronin
authored and
Oleg Doronin
committed
YQ kqprun pass allow local files into runtime listing (ydb-platform#7844)
1 parent a805964 commit fec9769

20 files changed

+81
-44
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
3636
NActors::TActorSystem* actorSystem,
3737
size_t pathsLimit,
3838
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
39-
bool enableInfer) {
39+
bool enableInfer,
40+
bool allowLocalFiles) {
4041
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
4142
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
4243
{
4344
ToString(NYql::EDatabaseType::ObjectStorage),
44-
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
45+
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles)
4546
},
4647
{
4748
ToString(NYql::EDatabaseType::ClickHouse),

ydb/core/external_sources/external_source_factory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
1515
NActors::TActorSystem* actorSystem = nullptr,
1616
size_t pathsLimit = 50000,
1717
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
18-
bool enableInfer = false);
18+
bool enableInfer = false,
19+
bool allowLocalFiles = false);
1920

2021
}

ydb/core/external_sources/object_storage.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
3434
NActors::TActorSystem* actorSystem,
3535
size_t pathsLimit,
3636
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
37-
bool enableInfer)
37+
bool enableInfer,
38+
bool allowLocalFiles)
3839
: HostnamePatterns(hostnamePatterns)
3940
, PathsLimit(pathsLimit)
4041
, ActorSystem(actorSystem)
4142
, CredentialsFactory(std::move(credentialsFactory))
4243
, EnableInfer(enableInfer)
44+
, AllowLocalFiles(allowLocalFiles)
4345
{}
4446

4547
virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
@@ -327,9 +329,9 @@ struct TObjectStorageExternalSource : public IExternalSource {
327329
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
328330
.Url = meta->DataSourceLocation,
329331
.Credentials = credentials,
330-
.Pattern = meta->TableLocation,
331-
}, Nothing(), false);
332-
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
332+
.Pattern = effectiveFilePattern,
333+
}, Nothing(), AllowLocalFiles);
334+
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
333335
auto& listRes = listResFut.GetValue();
334336
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
335337
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
@@ -617,6 +619,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
617619
NActors::TActorSystem* ActorSystem = nullptr;
618620
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
619621
const bool EnableInfer = false;
622+
const bool AllowLocalFiles;
620623
};
621624

622625
}
@@ -626,8 +629,9 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
626629
NActors::TActorSystem* actorSystem,
627630
size_t pathsLimit,
628631
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
629-
bool enableInfer) {
630-
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
632+
bool enableInfer,
633+
bool allowLocalFiles) {
634+
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
631635
}
632636

633637
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location) {

ydb/core/external_sources/object_storage.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
1313
NActors::TActorSystem* actorSystem,
1414
size_t pathsLimit,
1515
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
16-
bool enableInfer);
16+
bool enableInfer,
17+
bool allowLocalFiles);
1718

1819
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location);
1920

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ namespace NKikimr {
88

99
Y_UNIT_TEST_SUITE(ObjectStorageTest) {
1010
Y_UNIT_TEST(SuccessValidation) {
11-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
11+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1212
NKikimrExternalSources::TSchema schema;
1313
NKikimrExternalSources::TGeneral general;
1414
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
1515
}
1616

1717
Y_UNIT_TEST(FailedCreate) {
18-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
18+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1919
NKikimrExternalSources::TSchema schema;
2020
NKikimrExternalSources::TGeneral general;
2121
general.mutable_attributes()->insert({"a", "b"});
2222
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
2323
}
2424

2525
Y_UNIT_TEST(FailedValidation) {
26-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
26+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
2727
NKikimrExternalSources::TSchema schema;
2828
NKikimrExternalSources::TGeneral general;
2929
general.mutable_attributes()->insert({"projection.h", "b"});

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void Init(
216216
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
217217

218218
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
219-
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
219+
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
220220
s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
221221
httpGateway, s3HttpRetryPolicy);
222222

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
7878

7979
if (federatedQuerySetup) {
8080
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
81-
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig);
81+
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig, nullptr, federatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
8282
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
8383

8484
if (federatedQuerySetup->ConnectorClient) {

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1072,7 +1072,8 @@ class TKqpHost : public IKqpHost {
10721072
ActorSystem,
10731073
FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(),
10741074
FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr,
1075-
Config->FeatureFlags.GetEnableExternalSourceSchemaInference());
1075+
Config->FeatureFlags.GetEnableExternalSourceSchemaInference(),
1076+
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
10761077
}
10771078
}
10781079

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6966,7 +6966,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
69666966
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(
69676967
std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()),
69686968
nullptr,
6969-
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit()
6969+
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit(),
6970+
nullptr,
6971+
appConfig.GetFeatureFlags().GetEnableExternalSourceSchemaInference(),
6972+
appConfig.GetQueryServiceConfig().GetS3().GetAllowLocalFiles()
69706973
);
69716974
}
69726975

ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,17 @@ namespace NYql::NDq {
6464
IHTTPGateway::TPtr gateway,
6565
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
6666
const TS3ReadActorFactoryConfig& cfg,
67-
::NMonitoring::TDynamicCounterPtr counters) override {
67+
::NMonitoring::TDynamicCounterPtr counters,
68+
bool allowLocalFiles) override {
6869

6970
#if defined(_linux_) || defined(_darwin_)
7071
NDB::registerFormats();
7172
factory.RegisterSource<NS3::TSource>("S3Source",
72-
[credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
73+
[credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
7374
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
7475
std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams,
7576
args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
76-
counters, args.TaskCounters, args.MemoryQuotaManager);
77+
counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles);
7778
});
7879
#else
7980
Y_UNUSED(factory);

ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
5959
NActors::TActorId fileQueueActor,
6060
ui64 fileQueueBatchSizeLimit,
6161
ui64 fileQueueBatchObjectCountLimit,
62-
ui64 fileQueueConsumersCountDelta)
62+
ui64 fileQueueConsumersCountDelta,
63+
bool allowLocalFiles)
6364
: ReadActorFactoryCfg(readActorFactoryCfg)
6465
, Gateway(std::move(gateway))
6566
, HolderFactory(holderFactory)
@@ -76,6 +77,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
7677
, FileQueueActor(fileQueueActor)
7778
, AddPathIndex(addPathIndex)
7879
, SizeLimit(sizeLimit)
80+
, AllowLocalFiles(allowLocalFiles)
7981
, Counters(counters)
8082
, TaskCounters(taskCounters)
8183
, FileSizeLimit(fileSizeLimit)
@@ -116,7 +118,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
116118
Credentials,
117119
Pattern,
118120
PatternVariant,
119-
NYql::NS3Lister::ES3PatternType::Wildcard));
121+
NYql::NS3Lister::ES3PatternType::Wildcard,
122+
AllowLocalFiles));
120123
}
121124

122125
LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex << ", FileQueue: " << FileQueueActor << (UseRuntimeListing ? " (remote)" : " (local"));
@@ -467,6 +470,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
467470
const bool AddPathIndex;
468471
const ui64 SizeLimit;
469472
TDuration CpuTime;
473+
const bool AllowLocalFiles;
470474

471475
std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
472476

@@ -521,7 +525,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
521525
NActors::TActorId fileQueueActor,
522526
ui64 fileQueueBatchSizeLimit,
523527
ui64 fileQueueBatchObjectCountLimit,
524-
ui64 fileQueueConsumersCountDelta
528+
ui64 fileQueueConsumersCountDelta,
529+
bool allowLocalFiles
525530
) {
526531
const auto actor = new TS3ReadActor(
527532
inputIndex,
@@ -547,7 +552,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
547552
fileQueueActor,
548553
fileQueueBatchSizeLimit,
549554
fileQueueBatchObjectCountLimit,
550-
fileQueueConsumersCountDelta
555+
fileQueueConsumersCountDelta,
556+
allowLocalFiles
551557
);
552558

553559
return {actor, actor};

ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
3636
NActors::TActorId fileQueueActor,
3737
ui64 fileQueueBatchSizeLimit,
3838
ui64 fileQueueBatchObjectCountLimit,
39-
ui64 fileQueueConsumersCountDelta
39+
ui64 fileQueueConsumersCountDelta,
40+
bool allowLocalFiles
4041
);
4142

4243
} // namespace NYql::NDq

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,7 +1292,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
12921292
ui64 fileQueueBatchObjectCountLimit,
12931293
ui64 fileQueueConsumersCountDelta,
12941294
bool asyncDecoding,
1295-
bool asyncDecompressing
1295+
bool asyncDecompressing,
1296+
bool allowLocalFiles
12961297
) : ReadActorFactoryCfg(readActorFactoryCfg)
12971298
, Gateway(std::move(gateway))
12981299
, HolderFactory(holderFactory)
@@ -1319,7 +1320,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
13191320
, FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit)
13201321
, FileQueueConsumersCountDelta(fileQueueConsumersCountDelta)
13211322
, AsyncDecoding(asyncDecoding)
1322-
, AsyncDecompressing(asyncDecompressing) {
1323+
, AsyncDecompressing(asyncDecompressing)
1324+
, AllowLocalFiles(allowLocalFiles) {
13231325
if (Counters) {
13241326
QueueDataSize = Counters->GetCounter("QueueDataSize");
13251327
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -1396,7 +1398,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
13961398
Credentials,
13971399
Pattern,
13981400
PatternVariant,
1399-
ES3PatternType::Wildcard));
1401+
ES3PatternType::Wildcard,
1402+
AllowLocalFiles));
14001403
}
14011404
FileQueueEvents.Init(TxId, SelfId(), SelfId());
14021405
FileQueueEvents.OnNewRecipientId(FileQueueActor);
@@ -1904,6 +1907,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
19041907
ui64 FileQueueConsumersCountDelta;
19051908
const bool AsyncDecoding;
19061909
const bool AsyncDecompressing;
1910+
const bool AllowLocalFiles;
19071911
bool IsCurrentBatchEmpty = false;
19081912
bool IsFileQueueEmpty = false;
19091913
bool IsWaitingFileQueueResponse = false;
@@ -2067,7 +2071,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
20672071
const TS3ReadActorFactoryConfig& cfg,
20682072
::NMonitoring::TDynamicCounterPtr counters,
20692073
::NMonitoring::TDynamicCounterPtr taskCounters,
2070-
IMemoryQuotaManager::TPtr memoryQuotaManager)
2074+
IMemoryQuotaManager::TPtr memoryQuotaManager,
2075+
bool allowLocalFiles)
20712076
{
20722077
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
20732078

@@ -2257,7 +2262,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
22572262
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
22582263
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
22592264
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
2260-
params.GetAsyncDecoding(), params.GetAsyncDecompressing());
2265+
params.GetAsyncDecoding(), params.GetAsyncDecompressing(), allowLocalFiles);
22612266

22622267
return {actor, actor};
22632268
} else {
@@ -2268,7 +2273,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
22682273
return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
22692274
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
22702275
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint,
2271-
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
2276+
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta, allowLocalFiles);
22722277
}
22732278
}
22742279

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ NActors::IActor* CreateS3FileQueueActor(
2929
const TS3Credentials& credentials,
3030
TString pattern,
3131
NYql::NS3Lister::ES3PatternVariant patternVariant,
32-
NS3Lister::ES3PatternType patternType);
32+
NS3Lister::ES3PatternType patternType,
33+
bool allowLocalFiles);
3334

3435
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor(
3536
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
@@ -48,6 +49,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
4849
const TS3ReadActorFactoryConfig& cfg,
4950
::NMonitoring::TDynamicCounterPtr counters,
5051
::NMonitoring::TDynamicCounterPtr taskCounters,
51-
IMemoryQuotaManager::TPtr memoryQuotaManager);
52+
IMemoryQuotaManager::TPtr memoryQuotaManager,
53+
bool allowLocalFiles);
5254

5355
} // namespace NYql::NDq

0 commit comments

Comments
 (0)