Skip to content

Commit 1108e70

Browse files
authored
Add RowsCount/Rows and use it for stats purposes (#15629)
1 parent 4eb60f8 commit 1108e70

File tree

6 files changed

+59
-8
lines changed

6 files changed

+59
-8
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
112112

113113
TInputChannelState& inputChannel = InCh(channelId);
114114

115+
if (Y_UNLIKELY(channelData.Proto.GetData().GetRows() == 0 && channelData.Proto.GetData().GetChunks() > 0)) {
116+
// For backward compatibility, to support communication with old nodes during rollback/migration
117+
// Should be deleted eventually ~ mid 2025
118+
channelData.Proto.MutableData()->SetRows(channelData.Proto.GetData().GetChunks());
119+
}
120+
115121
LOG_T("Received input for channelId: " << channelId
116122
<< ", seqNo: " << record.GetSeqNo()
117123
<< ", size: " << channelData.Proto.GetData().GetRaw().size()

ydb/library/yql/dq/common/dq_serialized_batch.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
6565

6666
ui32 transportVersion = batch.Proto.GetTransportVersion();
6767
ui32 chunkCount = batch.Proto.GetChunks();
68+
ui32 rowCount = batch.Proto.GetRows();
6869

6970
TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));
7071

7172
AppendNumber(result, transportVersion);
7273
AppendNumber(result, chunkCount);
74+
AppendNumber(result, rowCount);
7375
AppendNumber(result, protoPayload.Size());
7476
result.Append(std::move(protoPayload));
7577
AppendNumber(result, batch.Payload.Size());
@@ -85,6 +87,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
8587
TDqSerializedBatch result;
8688
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
8789
result.Proto.SetChunks(ReadNumber<ui32>(source));
90+
result.Proto.SetRows(ReadNumber<ui32>(source));
8891

8992
size_t protoSize = ReadNumber<size_t>(source);
9093
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");

ydb/library/yql/dq/common/dq_serialized_batch.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ struct TDqSerializedBatch {
2929
}
3030

3131
ui32 RowCount() const {
32-
return Proto.GetChunks(); // FIXME with Rows
32+
return Proto.GetRows() ? Proto.GetRows() : Proto.GetChunks();
3333
}
3434

3535
void Clear() {

ydb/library/yql/dq/proto/dq_transport.proto

+1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ message TData {
1818
bytes Raw = 2;
1919
uint32 Chunks = 3;
2020
optional uint32 PayloadId = 4;
21+
uint32 Rows = 5;
2122
}

ydb/library/yql/dq/runtime/dq_output_channel.cpp

+45-7
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,12 @@ class TDqOutputChannel : public IDqOutputChannel {
9595
return;
9696
}
9797

98+
ui32 rows = Packer.IsBlock() ?
99+
NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value
100+
: 1;
101+
98102
if (PushStats.CollectBasic()) {
99-
PushStats.Rows++;
103+
PushStats.Rows += rows;
100104
PushStats.Chunks++;
101105
PushStats.Resume();
102106
}
@@ -111,6 +115,7 @@ class TDqOutputChannel : public IDqOutputChannel {
111115
}
112116

113117
PackerCurrentChunkCount++;
118+
PackerCurrentRowCount += rows;
114119

115120
size_t packerSize = Packer.PackedSizeEstimate();
116121
if (packerSize >= MaxChunkBytes) {
@@ -121,8 +126,11 @@ class TDqOutputChannel : public IDqOutputChannel {
121126
}
122127
PackedDataSize += Data.back().Buffer.Size();
123128
PackedChunkCount += PackerCurrentChunkCount;
129+
PackedRowCount += PackerCurrentRowCount;
124130
Data.back().ChunkCount = PackerCurrentChunkCount;
131+
Data.back().RowCount = PackerCurrentRowCount;
125132
PackerCurrentChunkCount = 0;
133+
PackerCurrentRowCount = 0;
126134
packerSize = 0;
127135
}
128136

@@ -134,11 +142,13 @@ class TDqOutputChannel : public IDqOutputChannel {
134142
TDqSerializedBatch data;
135143
data.Proto.SetTransportVersion(TransportVersion);
136144
data.Proto.SetChunks(head.ChunkCount);
145+
data.Proto.SetRows(head.RowCount);
137146
data.SetPayload(std::move(head.Buffer));
138147
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
139148

140149
PackedDataSize -= bufSize;
141150
PackedChunkCount -= head.ChunkCount;
151+
PackedRowCount -= head.RowCount;
142152

143153
SpilledChunkCount += head.ChunkCount;
144154

@@ -199,22 +209,26 @@ class TDqOutputChannel : public IDqOutputChannel {
199209
} else if (!Data.empty()) {
200210
auto& packed = Data.front();
201211
PackedChunkCount -= packed.ChunkCount;
212+
PackedRowCount -= packed.RowCount;
202213
PackedDataSize -= packed.Buffer.Size();
203214
data.Proto.SetChunks(packed.ChunkCount);
215+
data.Proto.SetRows(packed.RowCount);
204216
data.SetPayload(std::move(packed.Buffer));
205217
Data.pop_front();
206218
} else {
207219
data.Proto.SetChunks(PackerCurrentChunkCount);
220+
data.Proto.SetRows(PackerCurrentRowCount);
208221
data.SetPayload(FinishPackAndCheckSize());
209222
PackerCurrentChunkCount = 0;
223+
PackerCurrentRowCount = 0;
210224
}
211225

212226
DLOG("Took " << data.RowCount() << " rows");
213227

214228
if (PopStats.CollectBasic()) {
215229
PopStats.Bytes += data.Size();
216230
PopStats.Rows += data.RowCount();
217-
PopStats.Chunks++;
231+
PopStats.Chunks++; // pop chunks do not match push chunks
218232
if (!IsFull() || FirstStoredId == NextStoredId) {
219233
PopStats.Resume();
220234
}
@@ -257,28 +271,43 @@ class TDqOutputChannel : public IDqOutputChannel {
257271
data.Proto.SetTransportVersion(TransportVersion);
258272
if (SpilledChunkCount == 0 && PackedChunkCount == 0) {
259273
data.Proto.SetChunks(PackerCurrentChunkCount);
274+
data.Proto.SetRows(PackerCurrentRowCount);
260275
data.SetPayload(FinishPackAndCheckSize());
276+
if (PushStats.CollectBasic()) {
277+
PushStats.Bytes += data.Payload.Size();
278+
}
261279
PackerCurrentChunkCount = 0;
280+
PackerCurrentRowCount = 0;
262281
return true;
263282
}
264283

265284
// Repack all - thats why PopAll should never be used
266285
if (PackerCurrentChunkCount) {
267286
Data.emplace_back();
268287
Data.back().Buffer = FinishPackAndCheckSize();
288+
if (PushStats.CollectBasic()) {
289+
PushStats.Bytes += Data.back().Buffer.Size();
290+
}
269291
PackedDataSize += Data.back().Buffer.Size();
270292
PackedChunkCount += PackerCurrentChunkCount;
293+
PackedRowCount += PackerCurrentRowCount;
271294
Data.back().ChunkCount = PackerCurrentChunkCount;
295+
Data.back().RowCount = PackerCurrentRowCount;
272296
PackerCurrentChunkCount = 0;
297+
PackerCurrentRowCount = 0;
273298
}
274299

275300
NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
301+
size_t repackedChunkCount = 0;
302+
size_t repackedRowCount = 0;
276303
for (;;) {
277-
TDqSerializedBatch chunk;
278-
if (!this->Pop(chunk)) {
304+
TDqSerializedBatch batch;
305+
if (!this->Pop(batch)) {
279306
break;
280307
}
281-
Packer.UnpackBatch(chunk.PullPayload(), HolderFactory, rows);
308+
repackedChunkCount += batch.ChunkCount();
309+
repackedRowCount += batch.RowCount();
310+
Packer.UnpackBatch(batch.PullPayload(), HolderFactory, rows);
282311
}
283312

284313
if (OutputType->IsMulti()) {
@@ -291,7 +320,8 @@ class TDqOutputChannel : public IDqOutputChannel {
291320
});
292321
}
293322

294-
data.Proto.SetChunks(rows.RowCount()); // 1 UVB "row" is Chunk
323+
data.Proto.SetChunks(repackedChunkCount);
324+
data.Proto.SetRows(repackedRowCount);
295325
data.SetPayload(FinishPackAndCheckSize());
296326
if (PopStats.CollectBasic()) {
297327
PopStats.Bytes += data.Size();
@@ -332,7 +362,12 @@ class TDqOutputChannel : public IDqOutputChannel {
332362
ui64 rows = GetValuesCount();
333363
Data.clear();
334364
Packer.Clear();
335-
SpilledChunkCount = PackedDataSize = PackedChunkCount = PackerCurrentChunkCount = 0;
365+
PackedDataSize = 0;
366+
PackedChunkCount = 0;
367+
PackedRowCount = 0;
368+
SpilledChunkCount = 0;
369+
PackerCurrentChunkCount = 0;
370+
PackerCurrentRowCount = 0;
336371
FirstStoredId = NextStoredId;
337372
return rows;
338373
}
@@ -359,6 +394,7 @@ class TDqOutputChannel : public IDqOutputChannel {
359394
struct TSerializedBatch {
360395
TChunkedBuffer Buffer;
361396
ui64 ChunkCount = 0;
397+
ui64 RowCount = 0;
362398
};
363399
std::deque<TSerializedBatch> Data;
364400

@@ -368,8 +404,10 @@ class TDqOutputChannel : public IDqOutputChannel {
368404

369405
size_t PackedDataSize = 0;
370406
size_t PackedChunkCount = 0;
407+
size_t PackedRowCount = 0;
371408

372409
size_t PackerCurrentChunkCount = 0;
410+
size_t PackerCurrentRowCount = 0;
373411

374412
bool Finished = false;
375413

ydb/library/yql/dq/runtime/dq_transport.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const
4444
TDqSerializedBatch result;
4545
result.Proto.SetTransportVersion(version);
4646
result.Proto.SetChunks(1);
47+
result.Proto.SetRows(1);
4748
result.SetPayload(std::move(packResult));
4849
return result;
4950
}
@@ -88,6 +89,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons
8889
TDqSerializedBatch result;
8990
result.Proto.SetTransportVersion(version);
9091
result.Proto.SetChunks(buffer.RowCount());
92+
result.Proto.SetRows(buffer.RowCount()); // maybe incorrect for Arrow Blocks
9193
result.SetPayload(std::move(packResult));
9294
return result;
9395
}
@@ -177,6 +179,7 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const
177179
data.SetTransportVersion(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
178180
data.SetRaw(packResult.data(), packResult.size());
179181
data.SetChunks(1);
182+
data.SetRows(1);
180183

181184
return data;
182185
}

0 commit comments

Comments
 (0)