diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp index 738ba1f6fb7c..5d56383c7475 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp @@ -33,7 +33,9 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { private: bool nextImpl() final { while (!Coro->InputFinished || !Coro->Requests.empty()) { + Coro->CpuTime += Coro->GetCpuTimeDelta(); Coro->ProcessOneEvent(); + Coro->StartCycleCount = GetCycleCountFast(); if (Coro->InputBuffer) { RawDataBuffer.swap(Coro->InputBuffer); Coro->InputBuffer.clear(); @@ -65,6 +67,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { } void Run() final { + StartCycleCount = GetCycleCountFast(); + try { std::unique_ptr coroBuffer = std::make_unique(this); NDB::ReadBuffer* buffer = coroBuffer.get(); @@ -74,15 +78,15 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { decompressorBuffer->nextIfAtEnd(); TString data{decompressorBuffer->available(), ' '}; decompressorBuffer->read(&data.front(), decompressorBuffer->available()); - Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data))); + Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data), TakeCpuTimeDelta())); } } catch (const TDtorException&) { // Stop any activity instantly return; } catch (...) { - Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception())); + Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception(), TakeCpuTimeDelta())); } - Send(Parent, new TEvS3Provider::TEvDecompressDataFinish()); + Send(Parent, new TEvS3Provider::TEvDecompressDataFinish(TakeCpuTimeDelta())); } void ProcessOneEvent() { @@ -99,7 +103,19 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { InputBuffer = std::move(event.Data); } + TDuration GetCpuTimeDelta() { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); + } + + TDuration TakeCpuTimeDelta() { + auto currentCpuTime = CpuTime; + CpuTime = TDuration::Zero(); + return currentCpuTime; + } + private: + TDuration CpuTime; + ui64 StartCycleCount = 0; TString InputBuffer; TString Compression; TActorId Parent; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 6c37dcad4a47..de1fcc7a1b4d 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -941,10 +941,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl { } void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) { + CpuTime += ev->Get()->CpuTime; DeferredDecompressedDataParts.push(std::move(ev->Release())); + } - void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) { + void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) { + CpuTime += ev->Get()->CpuTime; DecompressedInputFinished = true; } diff --git a/ydb/library/yql/providers/s3/events/events.h b/ydb/library/yql/providers/s3/events/events.h index b6baf5133650..8101d54deb0d 100644 --- a/ydb/library/yql/providers/s3/events/events.h +++ b/ydb/library/yql/providers/s3/events/events.h @@ -207,13 +207,27 @@ struct TEvS3Provider { }; struct TEvDecompressDataResult : public NActors::TEventLocal { - TEvDecompressDataResult(TString&& data) : Data(std::move(data)) {} - TEvDecompressDataResult(std::exception_ptr exception) : Exception(exception) {} + TEvDecompressDataResult(TString&& data, const TDuration& cpuTime) + : Data(std::move(data)) + , CpuTime(cpuTime) + {} + + TEvDecompressDataResult(std::exception_ptr exception, const TDuration& cpuTime) + : Exception(exception) + , CpuTime(cpuTime) + {} + TString Data; std::exception_ptr Exception; + TDuration CpuTime; }; struct TEvDecompressDataFinish : public NActors::TEventLocal { + TEvDecompressDataFinish(const TDuration& cpuTime) + : CpuTime(cpuTime) + {} + + TDuration CpuTime; }; struct TReadRange {