Skip to content

Commit 93b3a80

Browse files
authored
Merge f1acf46 into 752095e
2 parents 752095e + f1acf46 commit 93b3a80

20 files changed

+79
-42
lines changed

ydb/core/external_sources/external_source_factory.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
3636
NActors::TActorSystem* actorSystem,
3737
size_t pathsLimit,
3838
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
39-
bool enableInfer) {
39+
bool enableInfer,
40+
bool allowLocalFiles) {
4041
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
4142
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
4243
{
4344
ToString(NYql::EDatabaseType::ObjectStorage),
44-
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
45+
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles)
4546
},
4647
{
4748
ToString(NYql::EDatabaseType::ClickHouse),

ydb/core/external_sources/external_source_factory.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
1515
NActors::TActorSystem* actorSystem = nullptr,
1616
size_t pathsLimit = 50000,
1717
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
18-
bool enableInfer = false);
18+
bool enableInfer = false,
19+
bool allowLocalFiles = false);
1920

2021
}

ydb/core/external_sources/object_storage.cpp

+8-4
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
3434
NActors::TActorSystem* actorSystem,
3535
size_t pathsLimit,
3636
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
37-
bool enableInfer)
37+
bool enableInfer,
38+
bool allowLocalFiles)
3839
: HostnamePatterns(hostnamePatterns)
3940
, PathsLimit(pathsLimit)
4041
, ActorSystem(actorSystem)
4142
, CredentialsFactory(std::move(credentialsFactory))
4243
, EnableInfer(enableInfer)
44+
, AllowLocalFiles(allowLocalFiles)
4345
{}
4446

4547
virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
@@ -320,7 +322,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
320322
.Url = meta->DataSourceLocation,
321323
.Credentials = credentials,
322324
.Pattern = effectiveFilePattern,
323-
}, Nothing(), false);
325+
}, Nothing(), AllowLocalFiles);
324326
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
325327
auto& listRes = listResFut.GetValue();
326328
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
@@ -613,6 +615,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
613615
NActors::TActorSystem* ActorSystem = nullptr;
614616
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
615617
const bool EnableInfer = false;
618+
const bool AllowLocalFiles;
616619
};
617620

618621
}
@@ -622,8 +625,9 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
622625
NActors::TActorSystem* actorSystem,
623626
size_t pathsLimit,
624627
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
625-
bool enableInfer) {
626-
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
628+
bool enableInfer,
629+
bool allowLocalFiles) {
630+
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
627631
}
628632

629633
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {

ydb/core/external_sources/object_storage.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
1313
NActors::TActorSystem* actorSystem,
1414
size_t pathsLimit,
1515
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
16-
bool enableInfer);
16+
bool enableInfer,
17+
bool allowLocalFiles);
1718

1819
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
1920

ydb/core/external_sources/object_storage_ut.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ namespace NKikimr {
88

99
Y_UNIT_TEST_SUITE(ObjectStorageTest) {
1010
Y_UNIT_TEST(SuccessValidation) {
11-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
11+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1212
NKikimrExternalSources::TSchema schema;
1313
NKikimrExternalSources::TGeneral general;
1414
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
1515
}
1616

1717
Y_UNIT_TEST(FailedCreate) {
18-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
18+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1919
NKikimrExternalSources::TSchema schema;
2020
NKikimrExternalSources::TGeneral general;
2121
general.mutable_attributes()->insert({"a", "b"});
2222
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
2323
}
2424

2525
Y_UNIT_TEST(FailedValidation) {
26-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
26+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
2727
NKikimrExternalSources::TSchema schema;
2828
NKikimrExternalSources::TGeneral general;
2929
general.mutable_attributes()->insert({"projection.h", "b"});

ydb/core/fq/libs/init/init.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void Init(
216216
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
217217

218218
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
219-
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
219+
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
220220
s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
221221
httpGateway, s3HttpRetryPolicy);
222222

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
7878

7979
if (federatedQuerySetup) {
8080
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
81-
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig);
81+
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig, nullptr, federatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
8282
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
8383

8484
if (federatedQuerySetup->ConnectorClient) {

ydb/core/kqp/host/kqp_host.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,8 @@ class TKqpHost : public IKqpHost {
10731073
ActorSystem,
10741074
FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(),
10751075
FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr,
1076-
Config->FeatureFlags.GetEnableExternalSourceSchemaInference());
1076+
Config->FeatureFlags.GetEnableExternalSourceSchemaInference(),
1077+
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
10771078
}
10781079
}
10791080

ydb/core/tx/schemeshard/schemeshard_impl.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -6978,7 +6978,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
69786978
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(
69796979
std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()),
69806980
nullptr,
6981-
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit()
6981+
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit(),
6982+
nullptr,
6983+
appConfig.GetFeatureFlags().GetEnableExternalSourceSchemaInference(),
6984+
appConfig.GetQueryServiceConfig().GetS3().GetAllowLocalFiles()
69826985
);
69836986
}
69846987

ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,17 @@ namespace NYql::NDq {
6464
IHTTPGateway::TPtr gateway,
6565
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
6666
const TS3ReadActorFactoryConfig& cfg,
67-
::NMonitoring::TDynamicCounterPtr counters) override {
67+
::NMonitoring::TDynamicCounterPtr counters,
68+
bool allowLocalFiles) override {
6869

6970
#if defined(_linux_) || defined(_darwin_)
7071
NDB::registerFormats();
7172
factory.RegisterSource<NS3::TSource>("S3Source",
72-
[credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
73+
[credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
7374
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
7475
std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams,
7576
args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
76-
counters, args.TaskCounters, args.MemoryQuotaManager);
77+
counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles);
7778
});
7879
#else
7980
Y_UNUSED(factory);

ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp

+10-4
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
5959
NActors::TActorId fileQueueActor,
6060
ui64 fileQueueBatchSizeLimit,
6161
ui64 fileQueueBatchObjectCountLimit,
62-
ui64 fileQueueConsumersCountDelta)
62+
ui64 fileQueueConsumersCountDelta,
63+
bool allowLocalFiles)
6364
: ReadActorFactoryCfg(readActorFactoryCfg)
6465
, Gateway(std::move(gateway))
6566
, HolderFactory(holderFactory)
@@ -76,6 +77,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
7677
, FileQueueActor(fileQueueActor)
7778
, AddPathIndex(addPathIndex)
7879
, SizeLimit(sizeLimit)
80+
, AllowLocalFiles(allowLocalFiles)
7981
, Counters(counters)
8082
, TaskCounters(taskCounters)
8183
, FileSizeLimit(fileSizeLimit)
@@ -116,7 +118,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
116118
Credentials,
117119
Pattern,
118120
PatternVariant,
119-
NYql::NS3Lister::ES3PatternType::Wildcard));
121+
NYql::NS3Lister::ES3PatternType::Wildcard,
122+
AllowLocalFiles));
120123
}
121124

122125
LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex << ", FileQueue: " << FileQueueActor << (UseRuntimeListing ? " (remote)" : " (local"));
@@ -467,6 +470,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
467470
const bool AddPathIndex;
468471
const ui64 SizeLimit;
469472
TDuration CpuTime;
473+
const bool AllowLocalFiles;
470474

471475
std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
472476

@@ -521,7 +525,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
521525
NActors::TActorId fileQueueActor,
522526
ui64 fileQueueBatchSizeLimit,
523527
ui64 fileQueueBatchObjectCountLimit,
524-
ui64 fileQueueConsumersCountDelta
528+
ui64 fileQueueConsumersCountDelta,
529+
bool allowLocalFiles
525530
) {
526531
const auto actor = new TS3ReadActor(
527532
inputIndex,
@@ -547,7 +552,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
547552
fileQueueActor,
548553
fileQueueBatchSizeLimit,
549554
fileQueueBatchObjectCountLimit,
550-
fileQueueConsumersCountDelta
555+
fileQueueConsumersCountDelta,
556+
allowLocalFiles
551557
);
552558

553559
return {actor, actor};

ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
3636
NActors::TActorId fileQueueActor,
3737
ui64 fileQueueBatchSizeLimit,
3838
ui64 fileQueueBatchObjectCountLimit,
39-
ui64 fileQueueConsumersCountDelta
39+
ui64 fileQueueConsumersCountDelta,
40+
bool allowLocalFiles
4041
);
4142

4243
} // namespace NYql::NDq

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

+11-6
Original file line numberDiff line numberDiff line change
@@ -1292,7 +1292,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
12921292
ui64 fileQueueBatchObjectCountLimit,
12931293
ui64 fileQueueConsumersCountDelta,
12941294
bool asyncDecoding,
1295-
bool asyncDecompressing
1295+
bool asyncDecompressing,
1296+
bool allowLocalFiles
12961297
) : ReadActorFactoryCfg(readActorFactoryCfg)
12971298
, Gateway(std::move(gateway))
12981299
, HolderFactory(holderFactory)
@@ -1319,7 +1320,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
13191320
, FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit)
13201321
, FileQueueConsumersCountDelta(fileQueueConsumersCountDelta)
13211322
, AsyncDecoding(asyncDecoding)
1322-
, AsyncDecompressing(asyncDecompressing) {
1323+
, AsyncDecompressing(asyncDecompressing)
1324+
, AllowLocalFiles(allowLocalFiles) {
13231325
if (Counters) {
13241326
QueueDataSize = Counters->GetCounter("QueueDataSize");
13251327
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -1396,7 +1398,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
13961398
Credentials,
13971399
Pattern,
13981400
PatternVariant,
1399-
ES3PatternType::Wildcard));
1401+
ES3PatternType::Wildcard,
1402+
AllowLocalFiles));
14001403
}
14011404
FileQueueEvents.Init(TxId, SelfId(), SelfId());
14021405
FileQueueEvents.OnNewRecipientId(FileQueueActor);
@@ -1904,6 +1907,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
19041907
ui64 FileQueueConsumersCountDelta;
19051908
const bool AsyncDecoding;
19061909
const bool AsyncDecompressing;
1910+
const bool AllowLocalFiles;
19071911
bool IsCurrentBatchEmpty = false;
19081912
bool IsFileQueueEmpty = false;
19091913
bool IsWaitingFileQueueResponse = false;
@@ -2067,7 +2071,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
20672071
const TS3ReadActorFactoryConfig& cfg,
20682072
::NMonitoring::TDynamicCounterPtr counters,
20692073
::NMonitoring::TDynamicCounterPtr taskCounters,
2070-
IMemoryQuotaManager::TPtr memoryQuotaManager)
2074+
IMemoryQuotaManager::TPtr memoryQuotaManager,
2075+
bool allowLocalFiles)
20712076
{
20722077
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
20732078

@@ -2257,7 +2262,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
22572262
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
22582263
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
22592264
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
2260-
params.GetAsyncDecoding(), params.GetAsyncDecompressing());
2265+
params.GetAsyncDecoding(), params.GetAsyncDecompressing(), allowLocalFiles);
22612266

22622267
return {actor, actor};
22632268
} else {
@@ -2268,7 +2273,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
22682273
return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
22692274
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
22702275
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint,
2271-
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
2276+
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta, allowLocalFiles);
22722277
}
22732278
}
22742279

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ NActors::IActor* CreateS3FileQueueActor(
2929
const TS3Credentials& credentials,
3030
TString pattern,
3131
NYql::NS3Lister::ES3PatternVariant patternVariant,
32-
NS3Lister::ES3PatternType patternType);
32+
NS3Lister::ES3PatternType patternType,
33+
bool allowLocalFiles);
3334

3435
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor(
3536
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
@@ -48,6 +49,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
4849
const TS3ReadActorFactoryConfig& cfg,
4950
::NMonitoring::TDynamicCounterPtr counters,
5051
::NMonitoring::TDynamicCounterPtr taskCounters,
51-
IMemoryQuotaManager::TPtr memoryQuotaManager);
52+
IMemoryQuotaManager::TPtr memoryQuotaManager,
53+
bool allowLocalFiles);
5254

5355
} // namespace NYql::NDq

ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ void RegisterS3ReadActorFactory(
1010
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
1111
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
1212
const TS3ReadActorFactoryConfig& factoryConfig = {},
13-
::NMonitoring::TDynamicCounterPtr counters = nullptr) {
13+
::NMonitoring::TDynamicCounterPtr counters = nullptr,
14+
bool allowLocalFiles = false) {
1415
CreateS3ActorsFactory()->RegisterS3ReadActorFactory(
15-
factory, credentialsFactory, gateway, retryPolicy, factoryConfig, counters
16+
factory, credentialsFactory, gateway, retryPolicy, factoryConfig, counters, allowLocalFiles
1617
);
1718
}
1819

0 commit comments

Comments
 (0)