Skip to content

Commit 02c6b58

Browse files
authored
fix stream lookup bytes calculation (#12026)
1 parent 97c9f43 commit 02c6b58

File tree

3 files changed

+329
-11
lines changed

3 files changed

+329
-11
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

+25-7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
4040
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
4141
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
4242
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
43+
, LookupStrategy(settings.GetLookupStrategy())
4344
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
4445
, Counters(counters)
4546
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
@@ -67,7 +68,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
6768
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
6869
}
6970

70-
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
71+
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats) override {
7172
if (last) {
7273
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
7374
for (auto& table : *stats->MutableTables()) {
@@ -81,9 +82,25 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
8182
tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
8283
}
8384

85+
ui64 rowsReadEstimate = ReadRowsCount;
86+
ui64 bytesReadEstimate = ReadBytesCount;
87+
88+
if (mstats) {
89+
switch(LookupStrategy) {
90+
case NKqpProto::EStreamLookupStrategy::LOOKUP: {
91+
// in lookup case we return as result actual data, that we read from the datashard.
92+
rowsReadEstimate = mstats->Inputs[InputIndex]->RowsConsumed;
93+
bytesReadEstimate = mstats->Inputs[InputIndex]->BytesConsumed;
94+
break;
95+
}
96+
default:
97+
;
98+
}
99+
}
100+
84101
// TODO: use evread statistics after KIKIMR-16924
85-
tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
86-
tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
102+
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
103+
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
87104
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());
88105

89106
NKqpProto::TKqpTableExtraStats tableExtraStats;
@@ -148,7 +165,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
148165
};
149166

150167
struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
151-
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
168+
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
152169
: ReadId(readId)
153170
, LastSeqNo(lastSeqNo)
154171
, InstantStart(instantStart) {
@@ -259,7 +276,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
259276
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
260277
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
261278
if (ev->Get()->Request->ErrorCount > 0) {
262-
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
279+
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
263280
<< StreamLookupWorker->GetTablePath();
264281
LookupActorStateSpan.EndError(errorMsg);
265282

@@ -419,7 +436,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
419436
auto readIt = Reads.find(ev->Get()->ReadId);
420437
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
421438
auto& read = readIt->second;
422-
439+
423440
if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
424441
if (ev->Get()->InstantStart) {
425442
read.SetFinished();
@@ -566,7 +583,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
566583
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
567584

568585
Counters->IteratorsShardResolve->Inc();
569-
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
586+
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
570587
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
571588

572589
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
@@ -625,6 +642,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
625642
NActors::TActorId SchemeCacheRequestTimeoutTimer;
626643
TVector<NKikimrDataEvents::TLock> Locks;
627644
TVector<NKikimrDataEvents::TLock> BrokenLocks;
645+
NKqpProto::EStreamLookupStrategy LookupStrategy;
628646
std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker;
629647
ui64 ReadId = 0;
630648
size_t TotalRetryAttempts = 0;

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

+10-3
Original file line numberDiff line numberDiff line change
@@ -349,13 +349,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
349349
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
350350

351351
i64 rowSize = 0;
352+
i64 storageRowSize = 0;
352353
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
353354
const auto& column = Columns[colIndex];
354355
if (IsSystemColumn(column.Name)) {
355356
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
356357
rowSize += sizeof(NUdf::TUnboxedValue);
357358
} else {
358359
YQL_ENSURE(resultColIndex < resultRow.size());
360+
storageRowSize += resultRow[resultColIndex].Size();
359361
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
360362
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
361363
++resultColIndex;
@@ -370,10 +372,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
370372

371373
batch.push_back(std::move(row));
372374

375+
storageRowSize = std::max(storageRowSize, (i64)8);
376+
373377
resultStats.ReadRowsCount += 1;
374-
resultStats.ReadBytesCount += rowSize;
378+
resultStats.ReadBytesCount += storageRowSize;
375379
resultStats.ResultRowsCount += 1;
376-
resultStats.ResultBytesCount += rowSize;
380+
resultStats.ResultBytesCount += storageRowSize;
377381
}
378382

379383
if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
@@ -895,6 +899,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
895899
auto leftRowType = GetLeftRowType();
896900
YQL_ENSURE(leftRowType);
897901

902+
i64 storageReadBytes = 0;
903+
898904
for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
899905
auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
900906
leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
@@ -916,6 +922,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
916922
NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
917923
rightRowSize += sizeof(NUdf::TUnboxedValue);
918924
} else {
925+
storageReadBytes += rightRow[std::distance(ReadColumns.begin(), it)].Size();
919926
rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
920927
column.PType);
921928
rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
@@ -927,7 +934,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
927934

928935
rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0);
929936
// TODO: use datashard statistics KIKIMR-16924
930-
rowStats.ReadBytesCount += rightRowSize;
937+
rowStats.ReadBytesCount += storageReadBytes;
931938
rowStats.ResultRowsCount += 1;
932939
rowStats.ResultBytesCount += leftRowSize + rightRowSize;
933940

0 commit comments

Comments
 (0)