Skip to content

Fix Push stats (update it before deserialization) for InputChannels #16063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion ydb/library/yql/dq/runtime/dq_async_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,21 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
Pending = space != 0;
if (!batch.empty()) {
AddBatch(std::move(batch), space);
auto rows = AddBatch(std::move(batch), space);

if (PushStats.CollectBasic()) {
PushStats.Bytes += space;
PushStats.Rows += rows;
PushStats.Chunks++;
PushStats.Resume();
if (PushStats.CollectFull()) {
PushStats.MaxMemoryUsage = std::max(PushStats.MaxMemoryUsage, StoredBytes);
}
}

if (GetFreeSpace() < 0) {
PopStats.TryPause();
}
}
}

Expand Down
38 changes: 25 additions & 13 deletions ydb/library/yql/dq/runtime/dq_input_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ class TDqInputChannelImpl : public TDqInputImpl<TDqInputChannelImpl, IDqInputCha
using TBaseImpl = TDqInputImpl<TDqInputChannelImpl, IDqInputChannel>;

public:
using TBaseImpl::StoredBytes;

TDqInputChannelStats PushStats;
TDqInputStats PopStats;

TDqInputChannelImpl(ui64 channelId, ui32 srcStageId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, TCollectStatsLevel level,
const NKikimr::NMiniKQL::TTypeEnvironment&, const NKikimr::NMiniKQL::THolderFactory&,
NDqProto::EDataTransportVersion)
TDqInputChannelImpl(ui64 channelId, ui32 srcStageId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, TCollectStatsLevel level)
: TBaseImpl(inputType, maxBufferBytes)
{
PopStats.Level = level;
Expand Down Expand Up @@ -50,10 +50,10 @@ class TDqInputChannel : public IDqInputChannel {
const size_t chunkCount = data.ChunkCount();
auto inputType = Impl.GetInputType();
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
if (Y_UNLIKELY(PushStats.CollectProfile())) {
if (Y_UNLIKELY(Impl.PushStats.CollectProfile())) {
auto startTime = TInstant::Now();
DataSerializer.Deserialize(std::move(data), inputType, batch);
PushStats.DeserializationTime += (TInstant::Now() - startTime);
Impl.PushStats.DeserializationTime += (TInstant::Now() - startTime);
} else {
DataSerializer.Deserialize(std::move(data), inputType, batch);
}
Expand All @@ -72,26 +72,23 @@ class TDqInputChannel : public IDqInputChannel {
}

public:
TDqInputChannelStats PushStats;
TDqInputStats PopStats;

TDqInputChannel(ui64 channelId, ui32 srcStageId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, TCollectStatsLevel level,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
NDqProto::EDataTransportVersion transportVersion)
: Impl(channelId, srcStageId, inputType, maxBufferBytes, level, typeEnv, holderFactory, transportVersion)
: Impl(channelId, srcStageId, inputType, maxBufferBytes, level)
, DataSerializer(typeEnv, holderFactory, transportVersion) {
}

ui64 GetChannelId() const override {
return Impl.GetChannelId();
return Impl.PushStats.ChannelId;
}

const TDqInputChannelStats& GetPushStats() const override {
return Impl.GetPushStats();
return Impl.PushStats;
}

const TDqInputStats& GetPopStats() const override {
return Impl.GetPopStats();
return Impl.PopStats;
}

i64 GetFreeSpace() const override {
Expand Down Expand Up @@ -123,11 +120,26 @@ class TDqInputChannel : public IDqInputChannel {
}

void Push(TDqSerializedBatch&& data) override {
YQL_ENSURE(!Impl.IsFinished(), "input channel " << PushStats.ChannelId << " already finished");
YQL_ENSURE(!Impl.IsFinished(), "input channel " << Impl.PushStats.ChannelId << " already finished");
if (Y_UNLIKELY(data.Proto.GetChunks() == 0)) {
return;
}
StoredSerializedBytes += data.Size();

if (Impl.PushStats.CollectBasic()) {
Impl.PushStats.Bytes += data.Size();
Impl.PushStats.Rows += data.RowCount();
Impl.PushStats.Chunks++;
Impl.PushStats.Resume();
if (Impl.PushStats.CollectFull()) {
Impl.PushStats.MaxMemoryUsage = std::max(Impl.PushStats.MaxMemoryUsage, StoredSerializedBytes + Impl.StoredBytes);
}
}

if (GetFreeSpace() < 0) {
Impl.PopStats.TryPause();
}

DataForDeserialize.emplace_back(std::move(data));
}

Expand Down
20 changes: 6 additions & 14 deletions ydb/library/yql/dq/runtime/dq_input_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ enum TInputChannelFormat {
LEGACY_CH,
LEGACY_SIMPLE_BLOCK,
LEGACY_TUPLED_BLOCK
};
};

template <class TDerived, class IInputInterface>
class TDqInputImpl : public IInputInterface {
Expand Down Expand Up @@ -157,28 +157,20 @@ class TDqInputImpl : public IInputInterface {
}
}

void AddBatch(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) {
ui64 AddBatch(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) {
Y_ABORT_UNLESS(batch.Width() == GetWidth());

ui64 rows = GetRowsCount(batch);
StoredBytes += space;
StoredRows += batch.RowCount();
auto& pushStats = static_cast<TDerived*>(this)->PushStats;

if (pushStats.CollectBasic()) {
pushStats.Bytes += space;
pushStats.Rows += GetRowsCount(batch);
pushStats.Chunks++;
pushStats.Resume();
if (pushStats.CollectFull()) {
pushStats.MaxMemoryUsage = std::max(pushStats.MaxMemoryUsage, StoredBytes);
}
}
StoredRows += rows;

if (GetFreeSpace() < 0) {
static_cast<TDerived*>(this)->PopStats.TryPause();
}

Batches.emplace_back(std::move(batch));

return rows;
}

[[nodiscard]]
Expand Down
Loading