Skip to content

Commit c3d51bf

Browse files
authored
fix stream lookup bytes calculation (#12026) (#12101)
1 parent 83cc4da commit c3d51bf

File tree

3 files changed

+330
-11
lines changed

3 files changed

+330
-11
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

+25-7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
4040
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
4141
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
4242
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
43+
, LookupStrategy(settings.GetLookupStrategy())
4344
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
4445
, Counters(counters)
4546
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
@@ -67,7 +68,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
6768
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
6869
}
6970

70-
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
71+
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats) override {
7172
if (last) {
7273
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
7374
for (auto& table : *stats->MutableTables()) {
@@ -81,9 +82,25 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
8182
tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
8283
}
8384

85+
ui64 rowsReadEstimate = ReadRowsCount;
86+
ui64 bytesReadEstimate = ReadBytesCount;
87+
88+
if (mstats) {
89+
switch(LookupStrategy) {
90+
case NKqpProto::EStreamLookupStrategy::LOOKUP: {
91+
// in lookup case we return as result actual data, that we read from the datashard.
92+
rowsReadEstimate = mstats->Inputs[InputIndex]->RowsConsumed;
93+
bytesReadEstimate = mstats->Inputs[InputIndex]->BytesConsumed;
94+
break;
95+
}
96+
default:
97+
;
98+
}
99+
}
100+
84101
// TODO: use evread statistics after KIKIMR-16924
85-
tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
86-
tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
102+
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
103+
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
87104
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());
88105

89106
NKqpProto::TKqpTableExtraStats tableExtraStats;
@@ -148,7 +165,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
148165
};
149166

150167
struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
151-
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
168+
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
152169
: ReadId(readId)
153170
, LastSeqNo(lastSeqNo)
154171
, InstantStart(instantStart) {
@@ -259,7 +276,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
259276
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
260277
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
261278
if (ev->Get()->Request->ErrorCount > 0) {
262-
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
279+
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
263280
<< StreamLookupWorker->GetTablePath();
264281
LookupActorStateSpan.EndError(errorMsg);
265282

@@ -419,7 +436,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
419436
auto readIt = Reads.find(ev->Get()->ReadId);
420437
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
421438
auto& read = readIt->second;
422-
439+
423440
if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
424441
if (ev->Get()->InstantStart) {
425442
read.SetFinished();
@@ -566,7 +583,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
566583
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
567584

568585
Counters->IteratorsShardResolve->Inc();
569-
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
586+
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
570587
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
571588

572589
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
@@ -625,6 +642,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
625642
NActors::TActorId SchemeCacheRequestTimeoutTimer;
626643
TVector<NKikimrDataEvents::TLock> Locks;
627644
TVector<NKikimrDataEvents::TLock> BrokenLocks;
645+
NKqpProto::EStreamLookupStrategy LookupStrategy;
628646
std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker;
629647
ui64 ReadId = 0;
630648
size_t TotalRetryAttempts = 0;

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

+10-3
Original file line numberDiff line numberDiff line change
@@ -361,13 +361,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
361361
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
362362

363363
i64 rowSize = 0;
364+
i64 storageRowSize = 0;
364365
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
365366
const auto& column = Columns[colIndex];
366367
if (IsSystemColumn(column.Name)) {
367368
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
368369
rowSize += sizeof(NUdf::TUnboxedValue);
369370
} else {
370371
YQL_ENSURE(resultColIndex < resultRow.size());
372+
storageRowSize += resultRow[resultColIndex].Size();
371373
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
372374
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
373375
++resultColIndex;
@@ -382,10 +384,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
382384

383385
batch.push_back(std::move(row));
384386

387+
storageRowSize = std::max(storageRowSize, (i64)8);
388+
385389
resultStats.ReadRowsCount += 1;
386-
resultStats.ReadBytesCount += rowSize;
390+
resultStats.ReadBytesCount += storageRowSize;
387391
resultStats.ResultRowsCount += 1;
388-
resultStats.ResultBytesCount += rowSize;
392+
resultStats.ResultBytesCount += storageRowSize;
389393
}
390394

391395
if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
@@ -907,6 +911,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
907911
auto leftRowType = GetLeftRowType();
908912
YQL_ENSURE(leftRowType);
909913

914+
i64 storageReadBytes = 0;
915+
910916
for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
911917
auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
912918
leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
@@ -928,6 +934,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
928934
NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
929935
rightRowSize += sizeof(NUdf::TUnboxedValue);
930936
} else {
937+
storageReadBytes += rightRow[std::distance(ReadColumns.begin(), it)].Size();
931938
rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
932939
column.PType);
933940
rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
@@ -939,7 +946,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
939946

940947
rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0);
941948
// TODO: use datashard statistics KIKIMR-16924
942-
rowStats.ReadBytesCount += rightRowSize;
949+
rowStats.ReadBytesCount += storageReadBytes;
943950
rowStats.ResultRowsCount += 1;
944951
rowStats.ResultBytesCount += leftRowSize + rightRowSize;
945952

0 commit comments

Comments
 (0)