Skip to content

Commit 37290b1

Browse files
authored
Rename RowCount => ChunkCount to align with semantics and make it less confusing (#15080)
1 parent 6964fca commit 37290b1

13 files changed

+65
-64
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
291291
size_t Size() const {
292292
return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size();
293293
}
294-
295-
ui32 RowCount() const {
296-
return Proto.GetChannelData().GetData().GetRows();
297-
}
298294
};
299295

300296
void HandleChannelData(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {

ydb/core/kqp/runtime/kqp_transport.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void TKqpProtoBuilder::BuildYdbResultSet(
7575
}
7676
NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion);
7777
for (auto& part : data) {
78-
if (part.RowCount()) {
78+
if (part.ChunkCount()) {
7979
TUnboxedValueBatch rows(mkqlSrcRowType);
8080
dataSerializer.Deserialize(std::move(part), mkqlSrcRowType, rows);
8181
rows.ForEachRow([&](const NUdf::TUnboxedValue& value) {

ydb/core/tx/datashard/datashard_kqp.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
523523
NDq::TDqSerializedBatch outputData;
524524
auto fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData);
525525
MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish);
526-
MKQL_ENSURE_S(outputData.Proto.GetRows() == 0);
526+
MKQL_ENSURE_S(outputData.Proto.GetChunks() == 0);
527527
}
528528
}
529529
}

ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
115115
LOG_T("Received input for channelId: " << channelId
116116
<< ", seqNo: " << record.GetSeqNo()
117117
<< ", size: " << channelData.Proto.GetData().GetRaw().size()
118-
<< ", rows: " << channelData.Proto.GetData().GetRows()
118+
<< ", chunks: " << channelData.Proto.GetData().GetChunks()
119119
<< ", watermark: " << channelData.Proto.HasWatermark()
120120
<< ", checkpoint: " << channelData.Proto.HasCheckpoint()
121121
<< ", finished: " << channelData.Proto.GetFinished()
@@ -177,7 +177,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
177177
if (record.GetFinish()) {
178178
auto it = outputChannel.InFlight.begin();
179179
while (it != outputChannel.InFlight.end()) {
180-
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
180+
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount());
181181
it = outputChannel.InFlight.erase(it);
182182
}
183183
outputChannel.RetryState.reset();
@@ -190,7 +190,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
190190
// remove all messages with seqNo <= ackSeqNo
191191
auto it = outputChannel.InFlight.begin();
192192
while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) {
193-
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
193+
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount());
194194
it = outputChannel.InFlight.erase(it);
195195
}
196196

@@ -549,14 +549,14 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
549549
YQL_ENSURE(!outputChannel.RetryState);
550550

551551
const ui64 seqNo = ++outputChannel.LastSentSeqNo;
552-
const ui32 chunkBytes = channelData.PayloadSize();
553-
const ui32 chunkRows = channelData.RowCount();
552+
const ui32 dataBytes = channelData.PayloadSize();
553+
const ui32 dataChunks = channelData.ChunkCount();
554554
const bool finished = channelData.Proto.GetFinished();
555555

556556
LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId()
557557
<< ", peer: " << *outputChannel.Peer
558-
<< ", rows: " << chunkRows
559-
<< ", bytes: " << chunkBytes
558+
<< ", chunks: " << dataChunks
559+
<< ", bytes: " << dataBytes
560560
<< ", watermark: " << channelData.Proto.HasWatermark()
561561
<< ", checkpoint: " << channelData.Proto.HasCheckpoint()
562562
<< ", seqNo: " << seqNo
@@ -588,7 +588,7 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
588588
dataEv->Record.SetNoAck(!needAck);
589589
Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId);
590590

591-
outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows);
591+
outputChannel.PeerState.AddInFlight(dataBytes, dataChunks);
592592
}
593593

594594
bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) {

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
5757
}
5858
}
5959

60-
bool DoHandleChannelsAfterFinishImpl() override final{
60+
bool DoHandleChannelsAfterFinishImpl() override final{
6161
Y_ABORT_UNLESS(this->Checkpoints);
6262

6363
if (this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
@@ -83,7 +83,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
8383

8484
auto channel = inputChannel->Channel;
8585

86-
if (channelData.RowCount()) {
86+
if (channelData.ChunkCount()) {
8787
TDqSerializedBatch batch;
8888
batch.Proto = std::move(*channelData.Proto.MutableData());
8989
batch.Payload = std::move(channelData.Payload);
@@ -211,7 +211,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
211211
if (!limits.OutputChunkMaxSize) {
212212
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
213213
}
214-
214+
215215
if (this->Task.GetEnableSpilling()) {
216216
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
217217
}

ydb/library/yql/dq/actors/dq.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ struct TChannelDataOOB {
8888
return Proto.GetData().GetRaw().size() + Payload.Size();
8989
}
9090

91-
ui32 RowCount() const {
92-
return Proto.GetData().GetRows();
91+
ui32 ChunkCount() const {
92+
return Proto.GetData().GetChunks();
9393
}
9494
};
9595

ydb/library/yql/dq/common/dq_serialized_batch.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ void TDqSerializedBatch::ConvertToNoOOB() {
6363
TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
6464
TChunkedBuffer result;
6565

66-
ui32 transportversion = batch.Proto.GetTransportVersion();
67-
ui32 rowCount = batch.Proto.GetRows();
66+
ui32 transportVersion = batch.Proto.GetTransportVersion();
67+
ui32 chunkCount = batch.Proto.GetChunks();
6868

6969
TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));
7070

71-
AppendNumber(result, transportversion);
72-
AppendNumber(result, rowCount);
71+
AppendNumber(result, transportVersion);
72+
AppendNumber(result, chunkCount);
7373
AppendNumber(result, protoPayload.Size());
7474
result.Append(std::move(protoPayload));
7575
AppendNumber(result, batch.Payload.Size());
@@ -84,7 +84,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
8484
TStringBuf source(sharedBuf->Data(), sharedBuf->Size());
8585
TDqSerializedBatch result;
8686
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
87-
result.Proto.SetRows(ReadNumber<ui32>(source));
87+
result.Proto.SetChunks(ReadNumber<ui32>(source));
8888

8989
size_t protoSize = ReadNumber<size_t>(source);
9090
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");

ydb/library/yql/dq/common/dq_serialized_batch.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ struct TDqSerializedBatch {
2424
return Proto.GetRaw().size() + Payload.Size();
2525
}
2626

27+
ui32 ChunkCount() const {
28+
return Proto.GetChunks();
29+
}
30+
2731
ui32 RowCount() const {
28-
return Proto.GetRows();
32+
return Proto.GetChunks(); // FIXME with Rows
2933
}
3034

3135
void Clear() {

ydb/library/yql/dq/proto/dq_transport.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ enum EDataTransportVersion {
1616
message TData {
1717
uint32 TransportVersion = 1;
1818
bytes Raw = 2;
19-
uint32 Rows = 3;
19+
uint32 Chunks = 3;
2020
optional uint32 PayloadId = 4;
2121
}

ydb/library/yql/dq/runtime/dq_input_channel.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TDqInputChannel : public IDqInputChannel {
4747

4848
void PushImpl(TDqSerializedBatch&& data) {
4949
const i64 space = data.Size();
50-
const size_t rowCount = data.RowCount();
50+
const size_t chunkCount = data.ChunkCount();
5151
auto inputType = Impl.GetInputType();
5252
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
5353
if (Y_UNLIKELY(PushStats.CollectProfile())) {
@@ -58,7 +58,8 @@ class TDqInputChannel : public IDqInputChannel {
5858
DataSerializer.Deserialize(std::move(data), inputType, batch);
5959
}
6060

61-
YQL_ENSURE(batch.RowCount() == rowCount);
61+
// single batch row is chunk and may be Arrow block
62+
YQL_ENSURE(batch.RowCount() == chunkCount);
6263
Impl.AddBatch(std::move(batch), space);
6364
}
6465

@@ -123,7 +124,7 @@ class TDqInputChannel : public IDqInputChannel {
123124

124125
void Push(TDqSerializedBatch&& data) override {
125126
YQL_ENSURE(!Impl.IsFinished(), "input channel " << PushStats.ChannelId << " already finished");
126-
if (Y_UNLIKELY(data.Proto.GetRows() == 0)) {
127+
if (Y_UNLIKELY(data.Proto.GetChunks() == 0)) {
127128
return;
128129
}
129130
StoredSerializedBytes += data.Size();

ydb/library/yql/dq/runtime/dq_output_channel.cpp

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class TDqOutputChannel : public IDqOutputChannel {
5858
}
5959

6060
ui64 GetValuesCount() const override {
61-
return SpilledRowCount + PackedRowCount + ChunkRowCount;
61+
return SpilledChunkCount + PackedChunkCount + PackerCurrentChunkCount;
6262
}
6363

6464
const TDqOutputStats& GetPushStats() const override {
@@ -110,7 +110,7 @@ class TDqOutputChannel : public IDqOutputChannel {
110110
values[i] = {};
111111
}
112112

113-
ChunkRowCount++;
113+
PackerCurrentChunkCount++;
114114

115115
size_t packerSize = Packer.PackedSizeEstimate();
116116
if (packerSize >= MaxChunkBytes) {
@@ -120,9 +120,9 @@ class TDqOutputChannel : public IDqOutputChannel {
120120
PushStats.Bytes += Data.back().Buffer.Size();
121121
}
122122
PackedDataSize += Data.back().Buffer.Size();
123-
PackedRowCount += ChunkRowCount;
124-
Data.back().RowCount = ChunkRowCount;
125-
ChunkRowCount = 0;
123+
PackedChunkCount += PackerCurrentChunkCount;
124+
Data.back().ChunkCount = PackerCurrentChunkCount;
125+
PackerCurrentChunkCount = 0;
126126
packerSize = 0;
127127
}
128128

@@ -133,23 +133,23 @@ class TDqOutputChannel : public IDqOutputChannel {
133133

134134
TDqSerializedBatch data;
135135
data.Proto.SetTransportVersion(TransportVersion);
136-
data.Proto.SetRows(head.RowCount);
136+
data.Proto.SetChunks(head.ChunkCount);
137137
data.SetPayload(std::move(head.Buffer));
138138
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
139139

140140
PackedDataSize -= bufSize;
141-
PackedRowCount -= head.RowCount;
141+
PackedChunkCount -= head.ChunkCount;
142142

143-
SpilledRowCount += head.RowCount;
143+
SpilledChunkCount += head.ChunkCount;
144144

145145
if (PopStats.CollectFull()) {
146-
PopStats.SpilledRows += head.RowCount;
147-
PopStats.SpilledBytes += bufSize + sizeof(head.RowCount);
146+
PopStats.SpilledRows += head.ChunkCount; // FIXME with RowCount
147+
PopStats.SpilledBytes += bufSize + sizeof(head.ChunkCount);
148148
PopStats.SpilledBlobs++;
149149
}
150150

151151
Data.pop_front();
152-
LOG("Data spilled. Total rows spilled: " << SpilledRowCount << ", bytesInMemory: " << (PackedDataSize + packerSize));
152+
LOG("Data spilled. Total rows spilled: " << SpilledChunkCount << ", bytesInMemory: " << (PackedDataSize + packerSize)); // FIXME with RowCount
153153
}
154154

155155
if (IsFull() || FirstStoredId < NextStoredId) {
@@ -158,7 +158,7 @@ class TDqOutputChannel : public IDqOutputChannel {
158158

159159
if (PopStats.CollectFull()) {
160160
PopStats.MaxMemoryUsage = std::max(PopStats.MaxMemoryUsage, PackedDataSize + packerSize);
161-
PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedRowCount);
161+
PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedChunkCount);
162162
}
163163
}
164164

@@ -195,18 +195,18 @@ class TDqOutputChannel : public IDqOutputChannel {
195195
}
196196
++FirstStoredId;
197197
data = LoadSpilled(std::move(blob));
198-
SpilledRowCount -= data.RowCount();
198+
SpilledChunkCount -= data.ChunkCount();
199199
} else if (!Data.empty()) {
200200
auto& packed = Data.front();
201-
PackedRowCount -= packed.RowCount;
201+
PackedChunkCount -= packed.ChunkCount;
202202
PackedDataSize -= packed.Buffer.Size();
203-
data.Proto.SetRows(packed.RowCount);
203+
data.Proto.SetChunks(packed.ChunkCount);
204204
data.SetPayload(std::move(packed.Buffer));
205205
Data.pop_front();
206206
} else {
207-
data.Proto.SetRows(ChunkRowCount);
207+
data.Proto.SetChunks(PackerCurrentChunkCount);
208208
data.SetPayload(FinishPackAndCheckSize());
209-
ChunkRowCount = 0;
209+
PackerCurrentChunkCount = 0;
210210
}
211211

212212
DLOG("Took " << data.RowCount() << " rows");
@@ -255,21 +255,21 @@ class TDqOutputChannel : public IDqOutputChannel {
255255

256256
data.Clear();
257257
data.Proto.SetTransportVersion(TransportVersion);
258-
if (SpilledRowCount == 0 && PackedRowCount == 0) {
259-
data.Proto.SetRows(ChunkRowCount);
258+
if (SpilledChunkCount == 0 && PackedChunkCount == 0) {
259+
data.Proto.SetChunks(PackerCurrentChunkCount);
260260
data.SetPayload(FinishPackAndCheckSize());
261-
ChunkRowCount = 0;
261+
PackerCurrentChunkCount = 0;
262262
return true;
263263
}
264264

265265
// Repack all - thats why PopAll should never be used
266-
if (ChunkRowCount) {
266+
if (PackerCurrentChunkCount) {
267267
Data.emplace_back();
268268
Data.back().Buffer = FinishPackAndCheckSize();
269269
PackedDataSize += Data.back().Buffer.Size();
270-
PackedRowCount += ChunkRowCount;
271-
Data.back().RowCount = ChunkRowCount;
272-
ChunkRowCount = 0;
270+
PackedChunkCount += PackerCurrentChunkCount;
271+
Data.back().ChunkCount = PackerCurrentChunkCount;
272+
PackerCurrentChunkCount = 0;
273273
}
274274

275275
NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
@@ -291,7 +291,7 @@ class TDqOutputChannel : public IDqOutputChannel {
291291
});
292292
}
293293

294-
data.Proto.SetRows(rows.RowCount());
294+
data.Proto.SetChunks(rows.RowCount()); // 1 UVB "row" is Chunk
295295
data.SetPayload(FinishPackAndCheckSize());
296296
if (PopStats.CollectBasic()) {
297297
PopStats.Bytes += data.Size();
@@ -332,7 +332,7 @@ class TDqOutputChannel : public IDqOutputChannel {
332332
ui64 rows = GetValuesCount();
333333
Data.clear();
334334
Packer.Clear();
335-
SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
335+
SpilledChunkCount = PackedDataSize = PackedChunkCount = PackerCurrentChunkCount = 0;
336336
FirstStoredId = NextStoredId;
337337
return rows;
338338
}
@@ -358,18 +358,18 @@ class TDqOutputChannel : public IDqOutputChannel {
358358

359359
struct TSerializedBatch {
360360
TChunkedBuffer Buffer;
361-
ui64 RowCount = 0;
361+
ui64 ChunkCount = 0;
362362
};
363363
std::deque<TSerializedBatch> Data;
364364

365-
size_t SpilledRowCount = 0;
365+
size_t SpilledChunkCount = 0;
366366
ui64 FirstStoredId = 0;
367367
ui64 NextStoredId = 0;
368368

369369
size_t PackedDataSize = 0;
370-
size_t PackedRowCount = 0;
371-
372-
size_t ChunkRowCount = 0;
370+
size_t PackedChunkCount = 0;
371+
372+
size_t PackerCurrentChunkCount = 0;
373373

374374
bool Finished = false;
375375

ydb/library/yql/dq/runtime/dq_transport.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const
4343

4444
TDqSerializedBatch result;
4545
result.Proto.SetTransportVersion(version);
46-
result.Proto.SetRows(1);
46+
result.Proto.SetChunks(1);
4747
result.SetPayload(std::move(packResult));
4848
return result;
4949
}
@@ -87,7 +87,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons
8787

8888
TDqSerializedBatch result;
8989
result.Proto.SetTransportVersion(version);
90-
result.Proto.SetRows(buffer.RowCount());
90+
result.Proto.SetChunks(buffer.RowCount());
9191
result.SetPayload(std::move(packResult));
9292
return result;
9393
}
@@ -176,7 +176,7 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const
176176
NDqProto::TData data;
177177
data.SetTransportVersion(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
178178
data.SetRaw(packResult.data(), packResult.size());
179-
data.SetRows(1);
179+
data.SetChunks(1);
180180

181181
return data;
182182
}

ydb/library/yql/dq/runtime/dq_transport.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class TDqDataSerializer : private TNonCopyable {
8282
}
8383
TDqSerializedBatch result;
8484
result.Proto.SetTransportVersion(TransportVersion);
85-
result.Proto.SetRows(count);
85+
result.Proto.SetChunks(count);
8686
result.SetPayload(packer.Finish());
8787
return result;
8888
}

0 commit comments

Comments
 (0)