Skip to content

Commit e30c4cd

Browse files
authored
cpu time has been accounted into async decompressing (#6921)
1 parent 9564ab2 commit e30c4cd

File tree

3 files changed

+39
-6
lines changed

3 files changed

+39
-6
lines changed

ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp

+19-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
3333
private:
3434
bool nextImpl() final {
3535
while (!Coro->InputFinished || !Coro->Requests.empty()) {
36+
Coro->CpuTime += Coro->GetCpuTimeDelta();
3637
Coro->ProcessOneEvent();
38+
Coro->StartCycleCount = GetCycleCountFast();
3739
if (Coro->InputBuffer) {
3840
RawDataBuffer.swap(Coro->InputBuffer);
3941
Coro->InputBuffer.clear();
@@ -65,6 +67,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
6567
}
6668

6769
void Run() final {
70+
StartCycleCount = GetCycleCountFast();
71+
6872
try {
6973
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
7074
NDB::ReadBuffer* buffer = coroBuffer.get();
@@ -74,15 +78,15 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
7478
decompressorBuffer->nextIfAtEnd();
7579
TString data{decompressorBuffer->available(), ' '};
7680
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
77-
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
81+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data), TakeCpuTimeDelta()));
7882
}
7983
} catch (const TDtorException&) {
8084
// Stop any activity instantly
8185
return;
8286
} catch (...) {
83-
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
87+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception(), TakeCpuTimeDelta()));
8488
}
85-
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
89+
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish(TakeCpuTimeDelta()));
8690
}
8791

8892
void ProcessOneEvent() {
@@ -99,7 +103,19 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
99103
InputBuffer = std::move(event.Data);
100104
}
101105

106+
TDuration GetCpuTimeDelta() {
107+
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
108+
}
109+
110+
TDuration TakeCpuTimeDelta() {
111+
auto currentCpuTime = CpuTime;
112+
CpuTime = TDuration::Zero();
113+
return currentCpuTime;
114+
}
115+
102116
private:
117+
TDuration CpuTime;
118+
ui64 StartCycleCount = 0;
103119
TString InputBuffer;
104120
TString Compression;
105121
TActorId Parent;

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -941,10 +941,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
941941
}
942942

943943
void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
944+
CpuTime += ev->Get()->CpuTime;
944945
DeferredDecompressedDataParts.push(std::move(ev->Release()));
946+
945947
}
946948

947-
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) {
949+
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) {
950+
CpuTime += ev->Get()->CpuTime;
948951
DecompressedInputFinished = true;
949952
}
950953

ydb/library/yql/providers/s3/events/events.h

+16-2
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,27 @@ struct TEvS3Provider {
207207
};
208208

209209
struct TEvDecompressDataResult : public NActors::TEventLocal<TEvDecompressDataResult, EvDecompressDataResult> {
210-
TEvDecompressDataResult(TString&& data) : Data(std::move(data)) {}
211-
TEvDecompressDataResult(std::exception_ptr exception) : Exception(exception) {}
210+
TEvDecompressDataResult(TString&& data, const TDuration& cpuTime)
211+
: Data(std::move(data))
212+
, CpuTime(cpuTime)
213+
{}
214+
215+
TEvDecompressDataResult(std::exception_ptr exception, const TDuration& cpuTime)
216+
: Exception(exception)
217+
, CpuTime(cpuTime)
218+
{}
219+
212220
TString Data;
213221
std::exception_ptr Exception;
222+
TDuration CpuTime;
214223
};
215224

216225
struct TEvDecompressDataFinish : public NActors::TEventLocal<TEvDecompressDataFinish, EvDecompressDataFinish> {
226+
TEvDecompressDataFinish(const TDuration& cpuTime)
227+
: CpuTime(cpuTime)
228+
{}
229+
230+
TDuration CpuTime;
217231
};
218232

219233
struct TReadRange {

0 commit comments

Comments
 (0)