Skip to content

Commit 0964dfd

Browse files
authored
YQ-2302: read_max_bytes bypasses file_size_limit (#4117)
1 parent 45c8bad commit 0964dfd

File tree

5 files changed

+88
-42
lines changed

5 files changed

+88
-42
lines changed

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

+39-22
Original file line numberDiff line numberDiff line change
@@ -160,20 +160,20 @@ struct TEvS3FileQueue {
160160
161161
EvEnd
162162
};
163-
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
163+
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
164164
"expect EvEnd < EventSpaceEnd(TEvents::ES_S3_FILE_QUEUE)");
165-
165+
166166
struct TEvUpdateConsumersCount :
167167
public TEventPB<TEvUpdateConsumersCount, NS3::FileQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {
168-
168+
169169
explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) {
170170
Record.SetConsumersCountDelta(consumersCountDelta);
171171
}
172172
};
173173

174174
struct TEvAck :
175175
public TEventPB<TEvAck, NS3::FileQueue::TEvAck, EvAck> {
176-
176+
177177
TEvAck() = default;
178178

179179
explicit TEvAck(const TMessageTransportMeta& transportMeta) {
@@ -388,6 +388,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
388388
TPathList paths,
389389
size_t prefetchSize,
390390
ui64 fileSizeLimit,
391+
ui64 readLimit,
391392
bool useRuntimeListing,
392393
ui64 consumersCount,
393394
ui64 batchSizeLimit,
@@ -401,6 +402,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
401402
: TxId(std::move(txId))
402403
, PrefetchSize(prefetchSize)
403404
, FileSizeLimit(fileSizeLimit)
405+
, ReadLimit(readLimit)
404406
, MaybeIssues(Nothing())
405407
, UseRuntimeListing(useRuntimeListing)
406408
, ConsumersCount(consumersCount)
@@ -513,7 +515,9 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
513515
// skip 'directories'
514516
continue;
515517
}
516-
if (object.Size > FileSizeLimit) {
518+
519+
const ui64 bytesUsed = std::min(object.Size, ReadLimit);
520+
if (bytesUsed > FileSizeLimit) {
517521
auto errorMessage = TStringBuilder()
518522
<< "Size of object " << object.Path << " = "
519523
<< object.Size
@@ -525,10 +529,10 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
525529
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path << " of size " << object.Size);
526530
TObjectPath objectPath;
527531
objectPath.SetPath(object.Path);
528-
objectPath.SetSize(object.Size);
532+
objectPath.SetSize(bytesUsed);
529533
objectPath.SetPathIndex(CurrentDirectoryPathIndex);
530534
Objects.emplace_back(std::move(objectPath));
531-
ObjectsTotalSize += object.Size;
535+
ObjectsTotalSize += bytesUsed;
532536
}
533537
return true;
534538
}
@@ -598,7 +602,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
598602
Send(ev->Sender, new TEvS3FileQueue::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta()));
599603
TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo());
600604
}
601-
605+
602606
void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) {
603607
if (!UpdatedConsumers.contains(ev->Sender)) {
604608
UpdatedConsumers.insert(ev->Sender);
@@ -653,7 +657,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
653657

654658
LOG_D("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer << ", " << ObjectsTotalSize << " bytes left");
655659
Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta));
656-
660+
657661
if (HasNoMoreItems()) {
658662
TryFinish(consumer, transportMeta.GetSeqNo());
659663
}
@@ -675,7 +679,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
675679
}
676680

677681
bool CanSendToConsumer(const TActorId& consumer) {
678-
return !UseRuntimeListing || RoundRobinStageFinished ||
682+
return !UseRuntimeListing || RoundRobinStageFinished ||
679683
(StartedConsumers.size() < ConsumersCount && !StartedConsumers.contains(consumer));
680684
}
681685

@@ -753,7 +757,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
753757
}
754758
});
755759
}
756-
760+
757761
void ScheduleRequest(const TActorId& consumer, const TMessageTransportMeta& transportMeta) {
758762
PendingRequests[consumer].push_back(transportMeta);
759763
HasPendingRequests = true;
@@ -790,7 +794,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
790794
}
791795
}
792796
}
793-
797+
794798
void TryFinish(const TActorId& consumer, ui64 seqNo) {
795799
LOG_T("TS3FileQueueActor", "TryFinish from consumer " << consumer << ", " << FinishedConsumers.size() << " consumers already finished, seqNo=" << seqNo);
796800
if (FinishingConsumerToLastSeqNo.contains(consumer)) {
@@ -814,6 +818,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
814818

815819
size_t PrefetchSize;
816820
ui64 FileSizeLimit;
821+
ui64 ReadLimit;
817822
TMaybe<NS3Lister::IS3Lister::TPtr> MaybeLister = Nothing();
818823
TMaybe<NThreading::TFuture<NS3Lister::TListResult>> ListingFuture;
819824
size_t CurrentDirectoryPathIndex = 0;
@@ -838,7 +843,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
838843
const TString Pattern;
839844
const ES3PatternVariant PatternVariant;
840845
const ES3PatternType PatternType;
841-
846+
842847
static constexpr TDuration PoisonTimeout = TDuration::Hours(3);
843848
static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3);
844849
};
@@ -918,6 +923,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
918923
std::move(Paths),
919924
ReadActorFactoryCfg.MaxInflight * 2,
920925
FileSizeLimit,
926+
SizeLimit,
921927
false,
922928
1,
923929
FileQueueBatchSizeLimit,
@@ -1097,7 +1103,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
10971103
void HandleAck(TEvS3FileQueue::TEvAck::TPtr& ev) {
10981104
FileQueueEvents.OnEventReceived(ev);
10991105
}
1100-
1106+
11011107
static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) {
11021108
if (!result.Issues) {
11031109
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(result.Content), requestId, pathInd, path)));
@@ -1209,7 +1215,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
12091215
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error});
12101216
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
12111217
}
1212-
1218+
12131219
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
12141220
FileQueueEvents.Retry();
12151221
}
@@ -2088,7 +2094,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
20882094
if (isCancelled) {
20892095
LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " <<
20902096
QueueBufferCounter->DownloadedBytes << " bytes");
2091-
break;
2097+
break;
20922098
}
20932099
}
20942100
}
@@ -2538,6 +2544,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
25382544
::NMonitoring::TDynamicCounterPtr counters,
25392545
::NMonitoring::TDynamicCounterPtr taskCounters,
25402546
ui64 fileSizeLimit,
2547+
ui64 readLimit,
25412548
std::optional<ui64> rowsLimitHint,
25422549
IMemoryQuotaManager::TPtr memoryQuotaManager,
25432550
bool useRuntimeListing,
@@ -2564,6 +2571,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
25642571
, TaskCounters(std::move(taskCounters))
25652572
, FileQueueActor(fileQueueActor)
25662573
, FileSizeLimit(fileSizeLimit)
2574+
, ReadLimit(readLimit)
25672575
, MemoryQuotaManager(memoryQuotaManager)
25682576
, UseRuntimeListing(useRuntimeListing)
25692577
, FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
@@ -2622,6 +2630,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
26222630
std::move(Paths),
26232631
ReadActorFactoryCfg.MaxInflight * 2,
26242632
FileSizeLimit,
2633+
ReadLimit,
26252634
false,
26262635
1,
26272636
FileQueueBatchSizeLimit,
@@ -2784,7 +2793,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
27842793
void CommitState(const NDqProto::TCheckpoint&) final {}
27852794

27862795
ui64 GetInputIndex() const final {
2787-
return InputIndex;
2796+
return InputIndex;
27882797
}
27892798

27902799
const TDqAsyncStats& GetIngressStats() const final {
@@ -3038,7 +3047,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
30383047
}
30393048
}
30403049
}
3041-
3050+
30423051
void Handle(TEvS3FileQueue::TEvAck::TPtr& ev) {
30433052
FileQueueEvents.OnEventReceived(ev);
30443053
}
@@ -3136,6 +3145,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
31363145
std::set<NActors::TActorId> CoroActors;
31373146
NActors::TActorId FileQueueActor;
31383147
const ui64 FileSizeLimit;
3148+
const ui64 ReadLimit;
31393149
bool Bootstrapped = false;
31403150
IMemoryQuotaManager::TPtr MemoryQuotaManager;
31413151
bool UseRuntimeListing;
@@ -3295,6 +3305,7 @@ IActor* CreateS3FileQueueActor(
32953305
TPathList paths,
32963306
size_t prefetchSize,
32973307
ui64 fileSizeLimit,
3308+
ui64 readLimit,
32983309
bool useRuntimeListing,
32993310
ui64 consumersCount,
33003311
ui64 batchSizeLimit,
@@ -3310,6 +3321,7 @@ IActor* CreateS3FileQueueActor(
33103321
paths,
33113322
prefetchSize,
33123323
fileSizeLimit,
3324+
readLimit,
33133325
useRuntimeListing,
33143326
consumersCount,
33153327
batchSizeLimit,
@@ -3394,15 +3406,15 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
33943406
if (params.GetRowsLimitHint() != 0) {
33953407
rowsLimitHint = params.GetRowsLimitHint();
33963408
}
3397-
3409+
33983410
TActorId fileQueueActor;
33993411
if (auto it = settings.find("fileQueueActor"); it != settings.cend()) {
34003412
NActorsProto::TActorId protoId;
34013413
TMemoryInput inputStream(it->second);
34023414
ParseFromTextFormat(inputStream, protoId);
34033415
fileQueueActor = ActorIdFromProto(protoId);
34043416
}
3405-
3417+
34063418
ui64 fileQueueBatchSizeLimit = 0;
34073419
if (auto it = settings.find("fileQueueBatchSizeLimit"); it != settings.cend()) {
34083420
fileQueueBatchSizeLimit = FromString<ui64>(it->second);
@@ -3412,7 +3424,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
34123424
if (auto it = settings.find("fileQueueBatchObjectCountLimit"); it != settings.cend()) {
34133425
fileQueueBatchObjectCountLimit = FromString<ui64>(it->second);
34143426
}
3415-
3427+
34163428
ui64 fileQueueConsumersCountDelta = 0;
34173429
if (readRanges.size() > 1) {
34183430
fileQueueConsumersCountDelta = readRanges.size() - 1;
@@ -3520,9 +3532,14 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
35203532

35213533
#undef SET_FLAG
35223534
#undef SUPPORTED_FLAGS
3535+
ui64 sizeLimit = std::numeric_limits<ui64>::max();
3536+
if (const auto it = settings.find("sizeLimit"); settings.cend() != it) {
3537+
sizeLimit = FromString<ui64>(it->second);
3538+
}
3539+
35233540
const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
35243541
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
3525-
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager,
3542+
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
35263543
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
35273544

35283545
return {actor, actor};

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ NActors::IActor* CreateS3FileQueueActor(
1818
NS3Details::TPathList paths,
1919
size_t prefetchSize,
2020
ui64 fileSizeLimit,
21+
ui64 readLimit,
2122
bool useRuntimeListing,
2223
ui64 consumersCount,
2324
ui64 batchSizeLimit,

ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp

+11-5
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
428428

429429
auto fileQueueBatchObjectCountLimit = State_->Configuration->FileQueueBatchObjectCountLimit.Get().GetOrElse(1000);
430430
srcDesc.MutableSettings()->insert({"fileQueueBatchObjectCountLimit", ToString(fileQueueBatchObjectCountLimit)});
431-
431+
432432
YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing;
433433

434434
if (useRuntimeListing) {
@@ -440,8 +440,8 @@ class TS3DqIntegration: public TDqIntegrationBase {
440440
packed.Data().Literal().Value(),
441441
FromString<bool>(packed.IsText().Literal().Value()),
442442
paths);
443-
paths.insert(paths.end(),
444-
std::make_move_iterator(pathsChunk.begin()),
443+
paths.insert(paths.end(),
444+
std::make_move_iterator(pathsChunk.begin()),
445445
std::make_move_iterator(pathsChunk.end()));
446446
}
447447

@@ -452,11 +452,11 @@ class TS3DqIntegration: public TDqIntegrationBase {
452452
builder.AddPath(f.Path, f.Size, f.IsDirectory);
453453
});
454454
builder.Save(&range);
455-
455+
456456
TVector<TString> serialized(1);
457457
TStringOutput out(serialized.front());
458458
range.Save(&out);
459-
459+
460460
paths.clear();
461461
ReadPathsList(srcDesc, {}, serialized, paths);
462462

@@ -503,12 +503,18 @@ class TS3DqIntegration: public TDqIntegrationBase {
503503

504504
YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount;
505505

506+
ui64 readLimit = std::numeric_limits<ui64>::max();
507+
if (const auto sizeLimitIter = srcDesc.MutableSettings()->find("sizeLimit"); sizeLimitIter != srcDesc.MutableSettings()->cend()) {
508+
readLimit = FromString<ui64>(sizeLimitIter->second);
509+
}
510+
506511
auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register(
507512
NDq::CreateS3FileQueueActor(
508513
0ul,
509514
std::move(paths),
510515
fileQueuePrefetchSize,
511516
fileSizeLimit,
517+
readLimit,
512518
useRuntimeListing,
513519
consumersCount,
514520
fileQueueBatchSizeLimit,

ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,15 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
233233
fileSizeLimit = it->second;
234234
}
235235
}
236+
237+
ui64 userSizeLimit = std::numeric_limits<ui64>::max();
236238
if (formatName == "parquet") {
237239
fileSizeLimit = State_->Configuration->BlockFileSizeLimit;
240+
} else if (formatName == "raw") {
241+
const auto sizeLimitParam = dqSource.Input().Cast<TS3SourceSettings>().SizeLimit().Maybe<TCoAtom>();
242+
if (sizeLimitParam.IsValid()) {
243+
userSizeLimit = FromString<ui64>(sizeLimitParam.Cast().StringValue());
244+
}
238245
}
239246

240247
for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) {
@@ -245,13 +252,14 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
245252
UnpackPathsList(packed, isTextEncoded, paths);
246253

247254
for (auto& entry : paths) {
248-
if (entry.Size > fileSizeLimit) {
255+
const ui64 bytesUsed = std::min(entry.Size, userSizeLimit);
256+
if (bytesUsed > fileSizeLimit) {
249257
ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()),
250258
TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName));
251259
hasErr = true;
252260
return false;
253261
}
254-
totalSize += entry.Size;
262+
totalSize += bytesUsed;
255263
++count;
256264
}
257265
}
@@ -673,7 +681,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
673681
.RowsLimitHint(count.Literal())
674682
.Build()
675683
.Build()
676-
.Done();
684+
.Done();
677685
}
678686

679687
TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const {

0 commit comments

Comments
 (0)