Skip to content

cpu time has been accounted into async decompressing #6921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
private:
bool nextImpl() final {
while (!Coro->InputFinished || !Coro->Requests.empty()) {
Coro->CpuTime += Coro->GetCpuTimeDelta();
Coro->ProcessOneEvent();
Coro->StartCycleCount = GetCycleCountFast();
if (Coro->InputBuffer) {
RawDataBuffer.swap(Coro->InputBuffer);
Coro->InputBuffer.clear();
Expand Down Expand Up @@ -65,6 +67,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
}

void Run() final {
StartCycleCount = GetCycleCountFast();

try {
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
NDB::ReadBuffer* buffer = coroBuffer.get();
Expand All @@ -74,15 +78,15 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
decompressorBuffer->nextIfAtEnd();
TString data{decompressorBuffer->available(), ' '};
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data), TakeCpuTimeDelta()));
}
} catch (const TDtorException&) {
// Stop any activity instantly
return;
} catch (...) {
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception(), TakeCpuTimeDelta()));
}
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish(TakeCpuTimeDelta()));
}

void ProcessOneEvent() {
Expand All @@ -99,7 +103,19 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
InputBuffer = std::move(event.Data);
}

TDuration GetCpuTimeDelta() {
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
}

TDuration TakeCpuTimeDelta() {
auto currentCpuTime = CpuTime;
CpuTime = TDuration::Zero();
return currentCpuTime;
}

private:
TDuration CpuTime;
ui64 StartCycleCount = 0;
TString InputBuffer;
TString Compression;
TActorId Parent;
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}

void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DeferredDecompressedDataParts.push(std::move(ev->Release()));

}

void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) {
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DecompressedInputFinished = true;
}

Expand Down
18 changes: 16 additions & 2 deletions ydb/library/yql/providers/s3/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,27 @@ struct TEvS3Provider {
};

struct TEvDecompressDataResult : public NActors::TEventLocal<TEvDecompressDataResult, EvDecompressDataResult> {
TEvDecompressDataResult(TString&& data) : Data(std::move(data)) {}
TEvDecompressDataResult(std::exception_ptr exception) : Exception(exception) {}
TEvDecompressDataResult(TString&& data, const TDuration& cpuTime)
: Data(std::move(data))
, CpuTime(cpuTime)
{}

TEvDecompressDataResult(std::exception_ptr exception, const TDuration& cpuTime)
: Exception(exception)
, CpuTime(cpuTime)
{}

TString Data;
std::exception_ptr Exception;
TDuration CpuTime;
};

struct TEvDecompressDataFinish : public NActors::TEventLocal<TEvDecompressDataFinish, EvDecompressDataFinish> {
TEvDecompressDataFinish(const TDuration& cpuTime)
: CpuTime(cpuTime)
{}

TDuration CpuTime;
};

struct TReadRange {
Expand Down
Loading