diff --git a/ydb/core/protos/index_builder.proto b/ydb/core/protos/index_builder.proto index baea28bf6f40..c148577ddeeb 100644 --- a/ydb/core/protos/index_builder.proto +++ b/ydb/core/protos/index_builder.proto @@ -124,7 +124,6 @@ message TEvUploadSampleKResponse { optional Ydb.StatusIds.StatusCode UploadStatus = 2; repeated Ydb.Issue.IssueMessage Issues = 3; - optional uint64 RowsDelta = 4; - optional uint64 BytesDelta = 5; + optional uint64 UploadRows = 4; + optional uint64 UploadBytes = 5; } - diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 215e3010cda2..bf13ccf9caab 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1500,8 +1500,8 @@ message TEvSampleKResponse { optional NKikimrIndexBuilder.EBuildStatus Status = 4; repeated Ydb.Issue.IssueMessage Issues = 5; - optional uint64 RowsDelta = 6; - optional uint64 BytesDelta = 7; + optional uint64 ReadRows = 6; + optional uint64 ReadBytes = 7; optional uint64 RequestSeqNoGeneration = 8; optional uint64 RequestSeqNoRound = 9; @@ -1568,12 +1568,14 @@ message TEvLocalKMeansResponse { optional NKikimrIndexBuilder.EBuildStatus Status = 6; repeated Ydb.Issue.IssueMessage Issues = 7; - // TODO(mbkkt) implement slow-path (reliable-path) - // optional uint64 RowsDelta = 8; - // optional uint64 BytesDelta = 9; + optional uint64 UploadRows = 8; + optional uint64 UploadBytes = 9; + optional uint64 ReadRows = 10; + optional uint64 ReadBytes = 11; - // optional TEvLocalKMeansRequest.EState State = 10; - // optional uint32 DoneRounds = 11; + // TODO(mbkkt) implement slow-path (reliable-path) + // optional TEvLocalKMeansRequest.EState State + // optional uint32 DoneRounds } message TEvReshuffleKMeansRequest { @@ -1617,9 +1619,12 @@ message TEvReshuffleKMeansResponse { optional NKikimrIndexBuilder.EBuildStatus Status = 6; repeated Ydb.Issue.IssueMessage Issues = 7; + optional uint64 UploadRows = 8; + optional uint64 UploadBytes = 9; + optional uint64 ReadRows = 10; + optional uint64 ReadBytes = 11; + // TODO(mbkkt) implement slow-path (reliable-path) - // optional uint64 RowsDelta = 8; - // optional uint64 BytesDelta = 9; // optional last written primary key } diff --git a/ydb/core/tx/datashard/kmeans_helper.cpp b/ydb/core/tx/datashard/kmeans_helper.cpp index d25813e07551..e755d09c5ce3 100644 --- a/ydb/core/tx/datashard/kmeans_helper.cpp +++ b/ydb/core/tx/datashard/kmeans_helper.cpp @@ -56,7 +56,7 @@ void AddRowBuild2Build(TBufferData& buffer, ui32 parent, TArrayRef } void AddRowBuild2Posting(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row, - ui32 dataPos) + ui32 dataPos) { std::array cells; cells[0] = TCell::Make(parent); diff --git a/ydb/core/tx/datashard/kmeans_helper.h b/ydb/core/tx/datashard/kmeans_helper.h index 86164418b4cc..4d1ef5289b11 100644 --- a/ydb/core/tx/datashard/kmeans_helper.h +++ b/ydb/core/tx/datashard/kmeans_helper.h @@ -184,19 +184,12 @@ struct TCalculation: TMetric { } }; -struct TStats { - ui64 Rows = 0; - ui64 Bytes = 0; -}; - template ui32 FeedEmbedding(const TCalculation& calculation, std::span clusters, - const NTable::TRowState& row, NTable::TPos embeddingPos, TStats& stats) + const NTable::TRowState& row, NTable::TPos embeddingPos) { Y_ASSERT(embeddingPos < row.Size()); const auto embedding = row.Get(embeddingPos).AsRef(); - stats.Rows += 1; - stats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead? if (!calculation.IsExpectedSize(embedding)) { return std::numeric_limits::max(); } diff --git a/ydb/core/tx/datashard/local_kmeans.cpp b/ydb/core/tx/datashard/local_kmeans.cpp index ab497f22a416..5d95afcb36d6 100644 --- a/ydb/core/tx/datashard/local_kmeans.cpp +++ b/ydb/core/tx/datashard/local_kmeans.cpp @@ -60,10 +60,10 @@ class TLocalKMeansScanBase: public TActor, public NTable:: TLead Lead; - // Sample - TStats ReadStats; - // TODO(mbkkt) Sent or Upload stats? + ui64 ReadRows = 0; + ui64 ReadBytes = 0; + // Sample ui64 MaxProbability = std::numeric_limits::max(); TReallyFastRng32 Rng; @@ -101,6 +101,9 @@ class TLocalKMeansScanBase: public TActor, public NTable:: TUploadStatus UploadStatus; + ui64 UploadRows = 0; + ui64 UploadBytes = 0; + // Response TActorId ResponseActorId; TAutoPtr Response; @@ -163,6 +166,10 @@ class TLocalKMeansScanBase: public TActor, public NTable:: } auto& record = Response->Record; + record.SetReadRows(ReadRows); + record.SetReadBytes(ReadBytes); + record.SetUploadRows(UploadRows); + record.SetUploadBytes(UploadBytes); if (abort != EAbort::None) { record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED); } else if (UploadStatus.IsSuccess()) { @@ -243,6 +250,8 @@ class TLocalKMeansScanBase: public TActor, public NTable:: UploadStatus.StatusCode = ev->Get()->Status; UploadStatus.Issues = ev->Get()->Issues; if (UploadStatus.IsSuccess()) { + UploadRows += WriteBuf.GetRows(); + UploadBytes += WriteBuf.GetBytes(); WriteBuf.Clear(); if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) { ReadBuf.FlushTo(WriteBuf); @@ -366,6 +375,9 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< if (!InitAggregatedClusters()) { // We don't need to do anything, // because this datashard doesn't have valid embeddings for this parent + if (UploadStatus.IsNone()) { + UploadStatus.StatusCode = Ydb::StatusIds::SUCCESS; + } return EScan::Final; } ++Round; @@ -391,6 +403,8 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan Feed(TArrayRef key, const TRow& row) noexcept final { LOG_T("Feed " << Debug()); + ++ReadRows; + ReadBytes += CountBytes(key, row); switch (State) { case EState::SAMPLE: return FeedSample(row); @@ -467,8 +481,6 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< { Y_ASSERT(row.Size() == 1); const auto embedding = row.Get(0).AsRef(); - ReadStats.Rows += 1; - ReadStats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead? if (!this->IsExpectedSize(embedding)) { return EScan::Feed; } @@ -495,14 +507,14 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedKMeans(const TRow& row) noexcept { Y_ASSERT(row.Size() == 1); - const ui32 pos = FeedEmbedding(*this, Clusters, row, 0, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, 0); AggregateToCluster(pos, row.Get(0).Data()); return EScan::Feed; } EScan FeedUploadMain2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -512,7 +524,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedUploadMain2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -522,7 +534,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedUploadBuild2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -532,7 +544,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedUploadBuild2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } diff --git a/ydb/core/tx/datashard/reshuffle_kmeans.cpp b/ydb/core/tx/datashard/reshuffle_kmeans.cpp index 83e9d4d00462..ae2a74789972 100644 --- a/ydb/core/tx/datashard/reshuffle_kmeans.cpp +++ b/ydb/core/tx/datashard/reshuffle_kmeans.cpp @@ -38,8 +38,8 @@ class TReshuffleKMeansScanBase: public TActor, public TLead Lead; - TStats ReadStats; - // TODO(mbkkt) Sent or Upload stats? + ui64 ReadRows = 0; + ui64 ReadBytes = 0; std::vector Clusters; @@ -63,6 +63,9 @@ class TReshuffleKMeansScanBase: public TActor, public TUploadStatus UploadStatus; + ui64 UploadRows = 0; + ui64 UploadBytes = 0; + // Response TActorId ResponseActorId; TAutoPtr Response; @@ -138,6 +141,10 @@ class TReshuffleKMeansScanBase: public TActor, public } auto& record = Response->Record; + record.SetReadRows(ReadRows); + record.SetReadBytes(ReadBytes); + record.SetUploadRows(UploadRows); + record.SetUploadBytes(UploadBytes); if (abort != EAbort::None) { record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED); } else if (UploadStatus.IsSuccess()) { @@ -218,6 +225,8 @@ class TReshuffleKMeansScanBase: public TActor, public UploadStatus.StatusCode = ev->Get()->Status; UploadStatus.Issues = ev->Get()->Issues; if (UploadStatus.IsSuccess()) { + UploadRows += WriteBuf.GetRows(); + UploadBytes += WriteBuf.GetBytes(); WriteBuf.Clear(); if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) { ReadBuf.FlushTo(WriteBuf); @@ -282,6 +291,8 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan Feed(TArrayRef key, const TRow& row) noexcept final { LOG_T("Feed " << Debug()); + ++ReadRows; + ReadBytes += CountBytes(key, row); switch (UploadState) { case EState::UPLOAD_MAIN_TO_BUILD: return FeedUploadMain2Build(key, row); @@ -299,7 +310,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc private: EScan FeedUploadMain2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -309,7 +320,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan FeedUploadMain2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -319,7 +330,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan FeedUploadBuild2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -329,7 +340,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan FeedUploadBuild2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } diff --git a/ydb/core/tx/datashard/sample_k.cpp b/ydb/core/tx/datashard/sample_k.cpp index 5715d491becd..df372f899092 100644 --- a/ydb/core/tx/datashard/sample_k.cpp +++ b/ydb/core/tx/datashard/sample_k.cpp @@ -44,8 +44,8 @@ class TSampleKScan final: public TActor, public NTable::IScan { auto operator<=>(const TProbability&) const noexcept = default; }; - ui64 RowsCount = 0; - ui64 RowsBytes = 0; + ui64 ReadRows = 0; + ui64 ReadBytes = 0; // We are using binary heap, because we don't want to do batch processing here, // serialization is more expensive than compare @@ -116,27 +116,28 @@ class TSampleKScan final: public TActor, public NTable::IScan { EScan Feed(TArrayRef key, const TRow& row) noexcept final { LOG_T("Feed key " << DebugPrintPoint(KeyTypes, key, *AppData()->TypeRegistry) << " " << Debug()); - ++RowsCount; + ++ReadRows; + ReadBytes += CountBytes(key, row); const auto probability = GetProbability(); if (probability > MaxProbability) { - // TODO(mbkkt) it's not nice that we need to compute this, probably can be precomputed in TRow - RowsBytes += TSerializedCellVec::SerializedSize(*row); return EScan::Feed; } - auto serialized = TSerializedCellVec::Serialize(*row); - RowsBytes += serialized.size(); - if (DataRows.size() < K) { MaxRows.push_back({probability, DataRows.size()}); - DataRows.emplace_back(std::move(serialized)); + DataRows.emplace_back(TSerializedCellVec::Serialize(*row)); if (DataRows.size() == K) { std::make_heap(MaxRows.begin(), MaxRows.end()); MaxProbability = MaxRows.front().P; } } else { - ReplaceRow(std::move(serialized), probability); + // TODO(mbkkt) use tournament tree to make less compare and swaps + std::pop_heap(MaxRows.begin(), MaxRows.end()); + TSerializedCellVec::Serialize(DataRows[MaxRows.back().I], *row); + MaxRows.back().P = probability; + std::push_heap(MaxRows.begin(), MaxRows.end()); + MaxProbability = MaxRows.front().P; } if (MaxProbability == 0) { @@ -147,6 +148,8 @@ class TSampleKScan final: public TActor, public NTable::IScan { TAutoPtr Finish(EAbort abort) noexcept final { Y_ABORT_UNLESS(Response); + Response->Record.SetReadRows(ReadRows); + Response->Record.SetReadBytes(ReadBytes); if (abort == EAbort::None) { FillResponse(); } else { @@ -187,15 +190,6 @@ class TSampleKScan final: public TActor, public NTable::IScan { } } - void ReplaceRow(TString&& row, ui64 p) { - // TODO(mbkkt) use tournament tree to make less compare and swaps - std::pop_heap(MaxRows.begin(), MaxRows.end()); - DataRows[MaxRows.back().I] = std::move(row); - MaxRows.back().P = p; - std::push_heap(MaxRows.begin(), MaxRows.end()); - MaxProbability = MaxRows.front().P; - } - void FillResponse() { std::sort(MaxRows.begin(), MaxRows.end()); auto& record = Response->Record; @@ -203,8 +197,6 @@ class TSampleKScan final: public TActor, public NTable::IScan { record.AddProbabilities(p); record.AddRows(std::move(DataRows[i])); } - record.SetRowsDelta(RowsCount); - record.SetBytesDelta(RowsBytes); record.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE); } diff --git a/ydb/core/tx/datashard/scan_common.cpp b/ydb/core/tx/datashard/scan_common.cpp index 2645a7495d44..3612f61d88ab 100644 --- a/ydb/core/tx/datashard/scan_common.cpp +++ b/ydb/core/tx/datashard/scan_common.cpp @@ -29,4 +29,15 @@ TColumnsTypes GetAllTypes(const TUserTable& tableInfo) { return result; } +ui64 CountBytes(TArrayRef key, const NTable::TRowState& row) { + ui64 bytes = 0; + for (auto& cell : key) { + bytes += cell.Size(); + } + for (auto& cell : *row) { + bytes += cell.Size(); + } + return bytes; +} + } diff --git a/ydb/core/tx/datashard/scan_common.h b/ydb/core/tx/datashard/scan_common.h index 738cb135fe97..3aa1d5306473 100644 --- a/ydb/core/tx/datashard/scan_common.h +++ b/ydb/core/tx/datashard/scan_common.h @@ -80,4 +80,9 @@ using TColumnsTypes = THashMap; TColumnsTypes GetAllTypes(const TUserTable& tableInfo); +// TODO(mbkkt) unfortunately key can have same columns as row +// I can detect this but maybe better +// if IScan will provide for us "how much data did we read"? +ui64 CountBytes(TArrayRef key, const NTable::TRowState& row); + } diff --git a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp index 606415bad0c1..76278ac524f6 100644 --- a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp @@ -786,7 +786,6 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas TABLEH() {str << "DebugMessage";} TABLEH() {str << "SeqNo";} TABLEH() {str << "Processed";} - TABLEH() {str << "Billed";} } } for (auto item : info.Shards) { diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp index e79576fcc2e6..83cf89d80cb3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp @@ -199,11 +199,22 @@ void TSchemeShard::PersistBuildIndexUnlockTxId(NIceDb::TNiceDb& db, const TIndex NIceDb::TUpdate(indexInfo.UnlockTxId)); } -void TSchemeShard::PersistBuildIndexBilling(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { +void TSchemeShard::PersistBuildIndexProcessed(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { db.Table().Key(indexInfo.Id).Update( - NIceDb::TUpdate(indexInfo.Billed.GetRows()), - NIceDb::TUpdate(indexInfo.Billed.GetBytes()) - ); + NIceDb::TUpdate(indexInfo.Processed.GetUploadRows()), + NIceDb::TUpdate(indexInfo.Processed.GetUploadBytes()), + NIceDb::TUpdate(indexInfo.Processed.GetReadRows()), + NIceDb::TUpdate(indexInfo.Processed.GetReadBytes()) + ); +} + +void TSchemeShard::PersistBuildIndexBilled(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { + db.Table().Key(indexInfo.Id).Update( + NIceDb::TUpdate(indexInfo.Billed.GetUploadRows()), + NIceDb::TUpdate(indexInfo.Billed.GetUploadBytes()), + NIceDb::TUpdate(indexInfo.Billed.GetReadRows()), + NIceDb::TUpdate(indexInfo.Billed.GetReadBytes()) + ); } void TSchemeShard::PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, const TIndexBuildInfo::TShardStatus& shardStatus) { @@ -212,8 +223,10 @@ void TSchemeShard::PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBu NIceDb::TUpdate(shardStatus.Status), NIceDb::TUpdate(shardStatus.DebugMessage), NIceDb::TUpdate(shardStatus.UploadStatus), - NIceDb::TUpdate(shardStatus.Processed.GetRows()), - NIceDb::TUpdate(shardStatus.Processed.GetBytes()) + NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), + NIceDb::TUpdate(shardStatus.Processed.GetReadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetReadBytes()) ); } @@ -230,8 +243,13 @@ void TSchemeShard::PersistBuildIndexUploadInitiate(NIceDb::TNiceDb& db, TIndexBu void TSchemeShard::PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, TIndexBuildInfo::TShardStatus& shardStatus) { shardStatus.Status = NKikimrIndexBuilder::EBuildStatus::INVALID; + shardStatus.Processed = {}; db.Table().Key(buildId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update( - NIceDb::TUpdate(shardStatus.Status) + NIceDb::TUpdate(shardStatus.Status), + NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), + NIceDb::TUpdate(shardStatus.Processed.GetReadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetReadBytes()) ); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index e013bd6bd7bd..7ee23db86c08 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -215,8 +215,8 @@ class TUploadSampleK: public TActorBootstrapped { TAutoPtr response = new TEvIndexBuilder::TEvUploadSampleKResponse; response->Record.SetId(BuildIndexId); - response->Record.SetRowsDelta(Rows->size()); - response->Record.SetBytesDelta(RowsBytes); + response->Record.SetUploadRows(Rows->size()); + response->Record.SetUploadBytes(RowsBytes); UploadStatusToMessage(response->Record); @@ -698,15 +698,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil std::array typeInfos{NScheme::NTypeIds::Uint32}; auto range = ParentRange(buildInfo); for (const auto& [idx, status] : buildInfo.Shards) { + if (buildInfo.KMeans.Parent != 0 + && Intersect(typeInfos, range.ToTableRange(), status.Range.ToTableRange()).IsEmptyRange(typeInfos)) { + continue; + } switch (status.Status) { case NKikimrIndexBuilder::EBuildStatus::INVALID: - if (buildInfo.KMeans.Parent != 0) { - if (!Intersect(typeInfos, range.ToTableRange(), status.Range.ToTableRange()).IsEmptyRange(typeInfos)) { - buildInfo.ToUploadShards.emplace_back(idx); - } - break; - } - [[fallthrough]]; + buildInfo.ToUploadShards.emplace_back(idx); + break; case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: case NKikimrIndexBuilder::EBuildStatus::ABORTED: @@ -767,6 +766,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil Self->PersistBuildIndexUploadReset(db, BuildId, idx, status); } buildInfo.DoneShards.clear(); + Self->PersistBuildIndexProcessed(db, buildInfo); // TODO(mbkkt) persist buildInfo.KMeans changes if (!buildInfo.Sample.Rows.empty()) { if (!buildInfo.Sample.Sent) { @@ -921,6 +921,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil Self->PersistBuildIndexApplyTxStatus(db, buildInfo); Self->PersistBuildIndexApplyTxDone(db, buildInfo); Self->PersistBuildIndexUploadReset(db, buildInfo); + Self->PersistBuildIndexProcessed(db, buildInfo); ChangeState(BuildId, TIndexBuildInfo::EState::CreateBuild); Progress(BuildId); @@ -1287,11 +1288,9 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K); } - if (record.HasRowsDelta() || record.HasBytesDelta()) { - TBillingStats delta(record.GetRowsDelta(), record.GetBytesDelta()); - shardStatus.Processed += delta; - buildInfo.Processed += delta; - } + TBillingStats stats{0, 0, record.GetReadRows(), record.GetReadBytes()}; + shardStatus.Processed += stats; + buildInfo.Processed += stats; NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); @@ -1412,7 +1411,9 @@ struct TSchemeShard::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard return true; } - // TODO(mbkkt) add billing + TBillingStats stats{record.GetUploadRows(), record.GetUploadBytes(), record.GetReadRows(), record.GetReadBytes()}; + shardStatus.Processed += stats; + buildInfo.Processed += stats; NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); @@ -1510,15 +1511,17 @@ struct TSchemeShard::TIndexBuilder::TTxReplyUpload: public TSchemeShard::TIndexB switch (const auto state = buildInfo.State; state) { case TIndexBuildInfo::EState::Filling: { - if (record.HasRowsDelta() || record.HasBytesDelta()) { - TBillingStats delta(record.GetRowsDelta(), record.GetBytesDelta()); - buildInfo.Processed += delta; - } + NIceDb::TNiceDb db(txc.DB); + + TBillingStats stats{record.GetUploadRows(), record.GetUploadBytes(), 0, 0}; + buildInfo.Processed += stats; + // As long as we don't try to upload sample in parallel with requests to shards, + // it's okay to persist Processed not incrementally + Self->PersistBuildIndexProcessed(db, buildInfo); NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); - NIceDb::TNiceDb db(txc.DB); auto status = record.GetUploadStatus(); if (status == Ydb::StatusIds::SUCCESS) { ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Filling); @@ -1639,11 +1642,10 @@ struct TSchemeShard::TIndexBuilder::TTxReplyProgress: public TSchemeShard::TInde shardStatus.LastKeyAck = record.GetLastKeyAck(); } - if (record.HasRowsDelta() || record.HasBytesDelta()) { - TBillingStats delta(record.GetRowsDelta(), record.GetBytesDelta()); - shardStatus.Processed += delta; - buildInfo.Processed += delta; - } + // TODO(mbkkt) we should account uploads and reads separately + TBillingStats stats{record.GetRowsDelta(), record.GetBytesDelta(), record.GetRowsDelta(), record.GetBytesDelta()}; + shardStatus.Processed += stats; + buildInfo.Processed += stats; shardStatus.Status = record.GetStatus(); shardStatus.UploadStatus = record.GetUploadStatus(); diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index 022a6d272951..dec6877e3ff6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -52,8 +52,8 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplySchedule(const TActorContext& ct } ui64 TSchemeShard::TIndexBuilder::TTxBase::RequestUnits(const TBillingStats& stats) { - return TRUCalculator::ReadTable(stats.GetBytes()) - + TRUCalculator::BulkUpsert(stats.GetBytes(), stats.GetRows()); + return TRUCalculator::ReadTable(stats.GetReadBytes()) + + TRUCalculator::BulkUpsert(stats.GetUploadBytes(), stats.GetUploadRows()); } void TSchemeShard::TIndexBuilder::TTxBase::RoundPeriod(TInstant& start, TInstant& end) { @@ -93,8 +93,10 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTrans const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); Y_VERIFY_S(buildInfoPtr, "IndexBuilds has no " << buildId); auto& buildInfo = *buildInfoPtr->Get(); + auto& processed = buildInfo.Processed; + auto& billed = buildInfo.Billed; - TBillingStats toBill = buildInfo.Processed - buildInfo.Billed; + TBillingStats toBill = processed - billed; if (!toBill) { continue; } @@ -137,19 +139,21 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTrans continue; } + TString id = TStringBuilder() + << buildId << "-" + << buildInfo.TablePathId.OwnerId << "-" << buildInfo.TablePathId.LocalPathId << "-" + << billed.GetUploadRows() << "-" << billed.GetReadRows() << "-" + << billed.GetUploadBytes() << "-" << billed.GetReadBytes() << "-" + << processed.GetUploadRows() << "-" << processed.GetReadRows() << "-" + << processed.GetUploadBytes() << "-" << processed.GetReadBytes(); + NIceDb::TNiceDb db(txc.DB); - buildInfo.Billed += toBill; - Self->PersistBuildIndexBilling(db, buildInfo); + billed += toBill; + Self->PersistBuildIndexBilled(db, buildInfo); ui64 requestUnits = RequestUnits(toBill); - TString id = TStringBuilder() - << buildId << "-" - << buildInfo.TablePathId.OwnerId << "-" << buildInfo.TablePathId.LocalPathId << "-" - << buildInfo.Billed.GetRows() << "-" << buildInfo.Billed.GetBytes() << "-" - << buildInfo.Processed.GetRows() << "-" << buildInfo.Processed.GetBytes(); - const TString billRecord = TBillRecord() .Id(id) .CloudId(cloud_id) diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index fdc08090647e..740b0dd9c25d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1320,7 +1320,8 @@ class TSchemeShard void PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, const TIndexBuildInfo::TShardStatus& shardStatus); void PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, TIndexBuildInfo::TShardStatus& shardStatus); void PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuildInfo& info); - void PersistBuildIndexBilling(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); + void PersistBuildIndexProcessed(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); + void PersistBuildIndexBilled(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); void PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index e29ff7f4f7b1..4dbcc415f566 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -471,7 +471,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( if (!featureFlags.EnableParameterizedDecimal && decimalType != NScheme::TDecimalType::Default()){ errStr = Sprintf("Type '%s' specified for column '%s', but support for parametrized decimal is disabled (EnableParameterizedDecimal feature flag is off)", col.GetType().data(), colName.data()); return nullptr; - } + } break; } case NScheme::NTypeIds::Pg: @@ -479,7 +479,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( errStr = Sprintf("Type '%s' specified for column '%s', but support for pg types is disabled (EnableTablePgTypes feature flag is off)", col.GetType().data(), colName.data()); return nullptr; } - break; + break; default: break; } @@ -2332,98 +2332,39 @@ bool TTopicInfo::FillKeySchema(const TString& tabletConfig) { return FillKeySchema(proto, unused); } -TBillingStats::TBillingStats(ui64 rows, ui64 bytes) - : Rows(rows) - , Bytes(bytes) -{ -} - -TBillingStats::TBillingStats(const TBillingStats &other) - : Rows(other.Rows) - , Bytes(other.Bytes) +TBillingStats::TBillingStats(ui64 readRows, ui64 readBytes, ui64 uploadRows, ui64 uploadBytes) + : UploadRows{uploadRows} + , UploadBytes{uploadBytes} + , ReadRows{readRows} + , ReadBytes{readBytes} { } -TBillingStats &TBillingStats::operator =(const TBillingStats &other) { - if (this == &other) { - return *this; - } - - Rows = other.Rows; - Bytes = other.Bytes; - return *this; -} - TBillingStats TBillingStats::operator -(const TBillingStats &other) const { - Y_ABORT_UNLESS(Rows >= other.Rows); - Y_ABORT_UNLESS(Bytes >= other.Bytes); + Y_ABORT_UNLESS(UploadRows >= other.UploadRows); + Y_ABORT_UNLESS(UploadBytes >= other.UploadBytes); + Y_ABORT_UNLESS(ReadRows >= other.ReadRows); + Y_ABORT_UNLESS(ReadBytes >= other.ReadBytes); - return TBillingStats(Rows - other.Rows, Bytes - other.Bytes); -} - -TBillingStats &TBillingStats::operator -=(const TBillingStats &other) { - if (this == &other) { - Rows = 0; - Bytes = 0; - return *this; - } - - Y_ABORT_UNLESS(Rows >= other.Rows); - Y_ABORT_UNLESS(Bytes >= other.Bytes); - - Rows -= other.Rows; - Bytes -= other.Bytes; - return *this; + return {UploadRows - other.UploadRows, UploadBytes - other.UploadBytes, + ReadRows - other.ReadRows, ReadBytes - other.ReadBytes}; } TBillingStats TBillingStats::operator +(const TBillingStats &other) const { - return TBillingStats(Rows + other.Rows, Bytes + other.Bytes); -} - -TBillingStats &TBillingStats::operator +=(const TBillingStats &other) { - if (this == &other) { - Rows += Rows; - Bytes += Bytes; - return *this; - } - - Rows += other.Rows; - Bytes += other.Bytes; - return *this; -} - -bool TBillingStats::operator < (const TBillingStats &other) const { - return Rows < other.Rows && Bytes < other.Bytes; -} - -bool TBillingStats::operator <= (const TBillingStats &other) const { - return Rows <= other.Rows && Bytes <= other.Bytes; -} - -bool TBillingStats::operator ==(const TBillingStats &other) const { - return Rows == other.Rows && Bytes == other.Bytes; + return {UploadRows + other.UploadRows, UploadBytes + other.UploadBytes, + ReadRows + other.ReadRows, ReadBytes + other.ReadBytes}; } TString TBillingStats::ToString() const { return TStringBuilder() << "{" - << " rows: " << GetRows() - << " bytes: " << GetBytes() + << " upload rows: " << UploadRows + << ", upload bytes: " << UploadBytes + << ", read rows: " << ReadRows + << ", read bytes: " << ReadBytes << " }"; } -ui64 TBillingStats::GetRows() const { - return Rows; -} - -ui64 TBillingStats::GetBytes() const { - return Bytes; -} - -NKikimr::NSchemeShard::TBillingStats::operator bool() const { - return Rows || Bytes; -} - TSequenceInfo::TSequenceInfo( ui64 alterVersion, NKikimrSchemeOp::TSequenceDescription&& description, diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index cd790cd31f7d..163f1d59a076 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2865,31 +2865,48 @@ struct TImportInfo: public TSimpleRefCount { class TBillingStats { public: TBillingStats() = default; - TBillingStats(ui64 rows, ui64 bytes); - TBillingStats(const TBillingStats& other); + TBillingStats(const TBillingStats& other) = default; + TBillingStats& operator = (const TBillingStats& other) = default; - TBillingStats& operator = (const TBillingStats& other); + TBillingStats(ui64 uploadRows, ui64 uploadBytes, ui64 readRows, ui64 readBytes); TBillingStats operator - (const TBillingStats& other) const; - TBillingStats& operator -= (const TBillingStats& other); + TBillingStats& operator -= (const TBillingStats& other) { + return *this = *this - other; + } TBillingStats operator + (const TBillingStats& other) const; - TBillingStats& operator += (const TBillingStats& other); + TBillingStats& operator += (const TBillingStats& other) { + return *this = *this + other; + } - bool operator < (const TBillingStats& other) const; - bool operator <= (const TBillingStats& other) const; - bool operator == (const TBillingStats& other) const; + bool operator == (const TBillingStats& other) const = default; - operator bool () const; + explicit operator bool () const { + return *this != TBillingStats{}; + } TString ToString() const; - ui64 GetRows() const; - ui64 GetBytes() const; + ui64 GetUploadRows() const { + return UploadRows; + } + ui64 GetUploadBytes() const { + return UploadBytes; + } + + ui64 GetReadRows() const { + return ReadRows; + } + ui64 GetReadBytes() const { + return ReadBytes; + } private: - ui64 Rows = 0; - ui64 Bytes = 0; + ui64 UploadRows = 0; + ui64 UploadBytes = 0; + ui64 ReadRows = 0; + ui64 ReadBytes = 0; }; // TODO(mbkkt) separate it to 3 classes: TBuildColumnsInfo TBuildSecondaryInfo TBuildVectorInfo with single base TBuildInfo @@ -3394,9 +3411,24 @@ struct TIndexBuildInfo: public TSimpleRefCount { row.template GetValueOrDefault( indexInfo->AlterMainTableTxDone); - indexInfo->Billed = TBillingStats( + auto& billed = indexInfo->Billed; + billed = { row.template GetValueOrDefault(0), - row.template GetValueOrDefault(0)); + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + }; + if (billed.GetUploadRows() != 0 && billed.GetReadRows() == 0 && indexInfo->IsFillBuildIndex()) { + // old format: assign upload to read + billed += {0, 0, billed.GetUploadRows(), billed.GetUploadBytes()}; + } + + indexInfo->Processed = { + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + }; return indexInfo; } @@ -3428,19 +3460,28 @@ struct TIndexBuildInfo: public TSimpleRefCount { Schema::IndexBuildShardStatus::UploadStatus>( Ydb::StatusIds::STATUS_CODE_UNSPECIFIED); - shardStatus.Processed = TBillingStats( - row.template GetValueOrDefault< - Schema::IndexBuildShardStatus::RowsProcessed>(0), - row.template GetValueOrDefault< - Schema::IndexBuildShardStatus::BytesProcessed>(0)); - - Processed += shardStatus.Processed; + auto& processed = shardStatus.Processed; + processed = { + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + }; + if (processed.GetUploadRows() != 0 && processed.GetReadRows() == 0 && IsFillBuildIndex()) { + // old format: assign upload to read + processed += {0, 0, processed.GetUploadRows(), processed.GetUploadBytes()}; + } + Processed += processed; } bool IsCancellationRequested() const { return CancelRequested; } + bool IsFillBuildIndex() const { + return IsBuildSecondaryIndex() || IsBuildColumns(); + } + bool IsBuildSecondaryIndex() const { return BuildKind == EBuildKind::BuildSecondaryIndex; } diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index db5df8b9de83..0e8f3b31ead1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1316,8 +1316,8 @@ struct Schema : NIceDb::Schema { struct MaxRetries : Column<27, NScheme::NTypeIds::Uint32> {}; - struct RowsBilled : Column<28, NScheme::NTypeIds::Uint64> {}; - struct BytesBilled : Column<29, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ RowsBilled : Column<28, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ BytesBilled : Column<29, NScheme::NTypeIds::Uint64> {}; struct BuildKind : Column<30, NScheme::NTypeIds::Uint32> {}; @@ -1328,6 +1328,15 @@ struct Schema : NIceDb::Schema { // Serialized as string NKikimrSchemeOp::TIndexCreationConfig protobuf. struct CreationConfig : Column<34, NScheme::NTypeIds::String> { using Type = TString; }; + struct ReadRowsBilled : Column<35, NScheme::NTypeIds::Uint64> {}; + struct ReadBytesBilled : Column<36, NScheme::NTypeIds::Uint64> {}; + + struct UploadRowsProcessed : Column<37, NScheme::NTypeIds::Uint64> {}; + struct UploadBytesProcessed : Column<38, NScheme::NTypeIds::Uint64> {}; + + struct ReadRowsProcessed : Column<39, NScheme::NTypeIds::Uint64> {}; + struct ReadBytesProcessed : Column<40, NScheme::NTypeIds::Uint64> {}; + using TKey = TableKey; using TColumns = TableColumns< Id, @@ -1363,7 +1372,13 @@ struct Schema : NIceDb::Schema { AlterMainTableTxId, AlterMainTableTxStatus, AlterMainTableTxDone, - CreationConfig + CreationConfig, + ReadRowsBilled, + ReadBytesBilled, + UploadRowsProcessed, + UploadBytesProcessed, + ReadRowsProcessed, + ReadBytesProcessed >; }; @@ -1431,8 +1446,11 @@ struct Schema : NIceDb::Schema { struct Message : Column<7, NScheme::NTypeIds::Utf8> {}; struct UploadStatus : Column<8, NScheme::NTypeIds::Uint32> { using Type = Ydb::StatusIds::StatusCode; }; - struct RowsProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; - struct BytesProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ RowsProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ BytesProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; + + struct ReadRowsProcessed : Column<11, NScheme::NTypeIds::Uint64> {}; + struct ReadBytesProcessed : Column<12, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -1445,7 +1463,9 @@ struct Schema : NIceDb::Schema { Message, UploadStatus, RowsProcessed, - BytesProcessed + BytesProcessed, + ReadRowsProcessed, + ReadBytesProcessed >; }; diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp index 7ca8c653a3c0..39d2f3c98ef6 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp @@ -299,7 +299,7 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":0,"quantity":179,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-101-1818-101-1818","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; + const TString meteringData = R"({"usage":{"start":0,"quantity":179,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-101-101-1818-1818","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData + "\n"); diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index bc40d41d6d8b..eda961bbc0d9 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-404-3486-404-3486","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; + const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-404-1290-2686","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 5b08a08f41d9..3b29dea4b7bb 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -4805,6 +4805,36 @@ "ColumnId": 34, "ColumnName": "CreationConfig", "ColumnType": "String" + }, + { + "ColumnId": 35, + "ColumnName": "ReadRowsBilled", + "ColumnType": "Uint64" + }, + { + "ColumnId": 36, + "ColumnName": "ReadBytesBilled", + "ColumnType": "Uint64" + }, + { + "ColumnId": 37, + "ColumnName": "UploadRowsProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 38, + "ColumnName": "UploadBytesProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 39, + "ColumnName": "ReadRowsProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 40, + "ColumnName": "ReadBytesProcessed", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -4844,7 +4874,13 @@ 31, 32, 33, - 34 + 34, + 35, + 36, + 37, + 38, + 39, + 40 ], "RoomID": 0, "Codec": 0, @@ -5112,6 +5148,16 @@ "ColumnId": 10, "ColumnName": "BytesProcessed", "ColumnType": "Uint64" + }, + { + "ColumnId": 11, + "ColumnName": "ReadRowsProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 12, + "ColumnName": "ReadBytesProcessed", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -5127,7 +5173,9 @@ 7, 8, 9, - 10 + 10, + 11, + 12 ], "RoomID": 0, "Codec": 0,