From 100c4c88f841591aadb799c5797831a3433108a0 Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Fri, 1 Nov 2024 05:48:03 +0000 Subject: [PATCH 1/7] Fix billing in DataShards --- ydb/core/protos/tx_datashard.proto | 23 +++++++++------ ydb/core/tx/datashard/kmeans_helper.cpp | 2 +- ydb/core/tx/datashard/kmeans_helper.h | 9 +----- ydb/core/tx/datashard/local_kmeans.cpp | 32 +++++++++++++------- ydb/core/tx/datashard/reshuffle_kmeans.cpp | 23 +++++++++++---- ydb/core/tx/datashard/sample_k.cpp | 34 +++++++++------------- ydb/core/tx/datashard/scan_common.cpp | 11 +++++++ ydb/core/tx/datashard/scan_common.h | 5 ++++ 8 files changed, 84 insertions(+), 55 deletions(-) diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 215e3010cda2..bf13ccf9caab 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1500,8 +1500,8 @@ message TEvSampleKResponse { optional NKikimrIndexBuilder.EBuildStatus Status = 4; repeated Ydb.Issue.IssueMessage Issues = 5; - optional uint64 RowsDelta = 6; - optional uint64 BytesDelta = 7; + optional uint64 ReadRows = 6; + optional uint64 ReadBytes = 7; optional uint64 RequestSeqNoGeneration = 8; optional uint64 RequestSeqNoRound = 9; @@ -1568,12 +1568,14 @@ message TEvLocalKMeansResponse { optional NKikimrIndexBuilder.EBuildStatus Status = 6; repeated Ydb.Issue.IssueMessage Issues = 7; - // TODO(mbkkt) implement slow-path (reliable-path) - // optional uint64 RowsDelta = 8; - // optional uint64 BytesDelta = 9; + optional uint64 UploadRows = 8; + optional uint64 UploadBytes = 9; + optional uint64 ReadRows = 10; + optional uint64 ReadBytes = 11; - // optional TEvLocalKMeansRequest.EState State = 10; - // optional uint32 DoneRounds = 11; + // TODO(mbkkt) implement slow-path (reliable-path) + // optional TEvLocalKMeansRequest.EState State + // optional uint32 DoneRounds } message TEvReshuffleKMeansRequest { @@ -1617,9 +1619,12 @@ message TEvReshuffleKMeansResponse { optional NKikimrIndexBuilder.EBuildStatus Status = 6; repeated Ydb.Issue.IssueMessage Issues = 7; + optional uint64 UploadRows = 8; + optional uint64 UploadBytes = 9; + optional uint64 ReadRows = 10; + optional uint64 ReadBytes = 11; + // TODO(mbkkt) implement slow-path (reliable-path) - // optional uint64 RowsDelta = 8; - // optional uint64 BytesDelta = 9; // optional last written primary key } diff --git a/ydb/core/tx/datashard/kmeans_helper.cpp b/ydb/core/tx/datashard/kmeans_helper.cpp index d25813e07551..e755d09c5ce3 100644 --- a/ydb/core/tx/datashard/kmeans_helper.cpp +++ b/ydb/core/tx/datashard/kmeans_helper.cpp @@ -56,7 +56,7 @@ void AddRowBuild2Build(TBufferData& buffer, ui32 parent, TArrayRef } void AddRowBuild2Posting(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row, - ui32 dataPos) + ui32 dataPos) { std::array cells; cells[0] = TCell::Make(parent); diff --git a/ydb/core/tx/datashard/kmeans_helper.h b/ydb/core/tx/datashard/kmeans_helper.h index 86164418b4cc..4d1ef5289b11 100644 --- a/ydb/core/tx/datashard/kmeans_helper.h +++ b/ydb/core/tx/datashard/kmeans_helper.h @@ -184,19 +184,12 @@ struct TCalculation: TMetric { } }; -struct TStats { - ui64 Rows = 0; - ui64 Bytes = 0; -}; - template ui32 FeedEmbedding(const TCalculation& calculation, std::span clusters, - const NTable::TRowState& row, NTable::TPos embeddingPos, TStats& stats) + const NTable::TRowState& row, NTable::TPos embeddingPos) { Y_ASSERT(embeddingPos < row.Size()); const auto embedding = row.Get(embeddingPos).AsRef(); - stats.Rows += 1; - stats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead? if (!calculation.IsExpectedSize(embedding)) { return std::numeric_limits::max(); } diff --git a/ydb/core/tx/datashard/local_kmeans.cpp b/ydb/core/tx/datashard/local_kmeans.cpp index ab497f22a416..5d95afcb36d6 100644 --- a/ydb/core/tx/datashard/local_kmeans.cpp +++ b/ydb/core/tx/datashard/local_kmeans.cpp @@ -60,10 +60,10 @@ class TLocalKMeansScanBase: public TActor, public NTable:: TLead Lead; - // Sample - TStats ReadStats; - // TODO(mbkkt) Sent or Upload stats? + ui64 ReadRows = 0; + ui64 ReadBytes = 0; + // Sample ui64 MaxProbability = std::numeric_limits::max(); TReallyFastRng32 Rng; @@ -101,6 +101,9 @@ class TLocalKMeansScanBase: public TActor, public NTable:: TUploadStatus UploadStatus; + ui64 UploadRows = 0; + ui64 UploadBytes = 0; + // Response TActorId ResponseActorId; TAutoPtr Response; @@ -163,6 +166,10 @@ class TLocalKMeansScanBase: public TActor, public NTable:: } auto& record = Response->Record; + record.SetReadRows(ReadRows); + record.SetReadBytes(ReadBytes); + record.SetUploadRows(UploadRows); + record.SetUploadBytes(UploadBytes); if (abort != EAbort::None) { record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED); } else if (UploadStatus.IsSuccess()) { @@ -243,6 +250,8 @@ class TLocalKMeansScanBase: public TActor, public NTable:: UploadStatus.StatusCode = ev->Get()->Status; UploadStatus.Issues = ev->Get()->Issues; if (UploadStatus.IsSuccess()) { + UploadRows += WriteBuf.GetRows(); + UploadBytes += WriteBuf.GetBytes(); WriteBuf.Clear(); if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) { ReadBuf.FlushTo(WriteBuf); @@ -366,6 +375,9 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< if (!InitAggregatedClusters()) { // We don't need to do anything, // because this datashard doesn't have valid embeddings for this parent + if (UploadStatus.IsNone()) { + UploadStatus.StatusCode = Ydb::StatusIds::SUCCESS; + } return EScan::Final; } ++Round; @@ -391,6 +403,8 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan Feed(TArrayRef key, const TRow& row) noexcept final { LOG_T("Feed " << Debug()); + ++ReadRows; + ReadBytes += CountBytes(key, row); switch (State) { case EState::SAMPLE: return FeedSample(row); @@ -467,8 +481,6 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< { Y_ASSERT(row.Size() == 1); const auto embedding = row.Get(0).AsRef(); - ReadStats.Rows += 1; - ReadStats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead? if (!this->IsExpectedSize(embedding)) { return EScan::Feed; } @@ -495,14 +507,14 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedKMeans(const TRow& row) noexcept { Y_ASSERT(row.Size() == 1); - const ui32 pos = FeedEmbedding(*this, Clusters, row, 0, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, 0); AggregateToCluster(pos, row.Get(0).Data()); return EScan::Feed; } EScan FeedUploadMain2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -512,7 +524,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedUploadMain2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -522,7 +534,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedUploadBuild2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -532,7 +544,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation< EScan FeedUploadBuild2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } diff --git a/ydb/core/tx/datashard/reshuffle_kmeans.cpp b/ydb/core/tx/datashard/reshuffle_kmeans.cpp index 83e9d4d00462..ae2a74789972 100644 --- a/ydb/core/tx/datashard/reshuffle_kmeans.cpp +++ b/ydb/core/tx/datashard/reshuffle_kmeans.cpp @@ -38,8 +38,8 @@ class TReshuffleKMeansScanBase: public TActor, public TLead Lead; - TStats ReadStats; - // TODO(mbkkt) Sent or Upload stats? + ui64 ReadRows = 0; + ui64 ReadBytes = 0; std::vector Clusters; @@ -63,6 +63,9 @@ class TReshuffleKMeansScanBase: public TActor, public TUploadStatus UploadStatus; + ui64 UploadRows = 0; + ui64 UploadBytes = 0; + // Response TActorId ResponseActorId; TAutoPtr Response; @@ -138,6 +141,10 @@ class TReshuffleKMeansScanBase: public TActor, public } auto& record = Response->Record; + record.SetReadRows(ReadRows); + record.SetReadBytes(ReadBytes); + record.SetUploadRows(UploadRows); + record.SetUploadBytes(UploadBytes); if (abort != EAbort::None) { record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED); } else if (UploadStatus.IsSuccess()) { @@ -218,6 +225,8 @@ class TReshuffleKMeansScanBase: public TActor, public UploadStatus.StatusCode = ev->Get()->Status; UploadStatus.Issues = ev->Get()->Issues; if (UploadStatus.IsSuccess()) { + UploadRows += WriteBuf.GetRows(); + UploadBytes += WriteBuf.GetBytes(); WriteBuf.Clear(); if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) { ReadBuf.FlushTo(WriteBuf); @@ -282,6 +291,8 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan Feed(TArrayRef key, const TRow& row) noexcept final { LOG_T("Feed " << Debug()); + ++ReadRows; + ReadBytes += CountBytes(key, row); switch (UploadState) { case EState::UPLOAD_MAIN_TO_BUILD: return FeedUploadMain2Build(key, row); @@ -299,7 +310,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc private: EScan FeedUploadMain2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -309,7 +320,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan FeedUploadMain2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -319,7 +330,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan FeedUploadBuild2Build(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } @@ -329,7 +340,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc EScan FeedUploadBuild2Posting(TArrayRef key, const TRow& row) noexcept { - const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats); + const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos); if (pos > K) { return EScan::Feed; } diff --git a/ydb/core/tx/datashard/sample_k.cpp b/ydb/core/tx/datashard/sample_k.cpp index 5715d491becd..df372f899092 100644 --- a/ydb/core/tx/datashard/sample_k.cpp +++ b/ydb/core/tx/datashard/sample_k.cpp @@ -44,8 +44,8 @@ class TSampleKScan final: public TActor, public NTable::IScan { auto operator<=>(const TProbability&) const noexcept = default; }; - ui64 RowsCount = 0; - ui64 RowsBytes = 0; + ui64 ReadRows = 0; + ui64 ReadBytes = 0; // We are using binary heap, because we don't want to do batch processing here, // serialization is more expensive than compare @@ -116,27 +116,28 @@ class TSampleKScan final: public TActor, public NTable::IScan { EScan Feed(TArrayRef key, const TRow& row) noexcept final { LOG_T("Feed key " << DebugPrintPoint(KeyTypes, key, *AppData()->TypeRegistry) << " " << Debug()); - ++RowsCount; + ++ReadRows; + ReadBytes += CountBytes(key, row); const auto probability = GetProbability(); if (probability > MaxProbability) { - // TODO(mbkkt) it's not nice that we need to compute this, probably can be precomputed in TRow - RowsBytes += TSerializedCellVec::SerializedSize(*row); return EScan::Feed; } - auto serialized = TSerializedCellVec::Serialize(*row); - RowsBytes += serialized.size(); - if (DataRows.size() < K) { MaxRows.push_back({probability, DataRows.size()}); - DataRows.emplace_back(std::move(serialized)); + DataRows.emplace_back(TSerializedCellVec::Serialize(*row)); if (DataRows.size() == K) { std::make_heap(MaxRows.begin(), MaxRows.end()); MaxProbability = MaxRows.front().P; } } else { - ReplaceRow(std::move(serialized), probability); + // TODO(mbkkt) use tournament tree to make less compare and swaps + std::pop_heap(MaxRows.begin(), MaxRows.end()); + TSerializedCellVec::Serialize(DataRows[MaxRows.back().I], *row); + MaxRows.back().P = probability; + std::push_heap(MaxRows.begin(), MaxRows.end()); + MaxProbability = MaxRows.front().P; } if (MaxProbability == 0) { @@ -147,6 +148,8 @@ class TSampleKScan final: public TActor, public NTable::IScan { TAutoPtr Finish(EAbort abort) noexcept final { Y_ABORT_UNLESS(Response); + Response->Record.SetReadRows(ReadRows); + Response->Record.SetReadBytes(ReadBytes); if (abort == EAbort::None) { FillResponse(); } else { @@ -187,15 +190,6 @@ class TSampleKScan final: public TActor, public NTable::IScan { } } - void ReplaceRow(TString&& row, ui64 p) { - // TODO(mbkkt) use tournament tree to make less compare and swaps - std::pop_heap(MaxRows.begin(), MaxRows.end()); - DataRows[MaxRows.back().I] = std::move(row); - MaxRows.back().P = p; - std::push_heap(MaxRows.begin(), MaxRows.end()); - MaxProbability = MaxRows.front().P; - } - void FillResponse() { std::sort(MaxRows.begin(), MaxRows.end()); auto& record = Response->Record; @@ -203,8 +197,6 @@ class TSampleKScan final: public TActor, public NTable::IScan { record.AddProbabilities(p); record.AddRows(std::move(DataRows[i])); } - record.SetRowsDelta(RowsCount); - record.SetBytesDelta(RowsBytes); record.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE); } diff --git a/ydb/core/tx/datashard/scan_common.cpp b/ydb/core/tx/datashard/scan_common.cpp index 2645a7495d44..3612f61d88ab 100644 --- a/ydb/core/tx/datashard/scan_common.cpp +++ b/ydb/core/tx/datashard/scan_common.cpp @@ -29,4 +29,15 @@ TColumnsTypes GetAllTypes(const TUserTable& tableInfo) { return result; } +ui64 CountBytes(TArrayRef key, const NTable::TRowState& row) { + ui64 bytes = 0; + for (auto& cell : key) { + bytes += cell.Size(); + } + for (auto& cell : *row) { + bytes += cell.Size(); + } + return bytes; +} + } diff --git a/ydb/core/tx/datashard/scan_common.h b/ydb/core/tx/datashard/scan_common.h index 738cb135fe97..3aa1d5306473 100644 --- a/ydb/core/tx/datashard/scan_common.h +++ b/ydb/core/tx/datashard/scan_common.h @@ -80,4 +80,9 @@ using TColumnsTypes = THashMap; TColumnsTypes GetAllTypes(const TUserTable& tableInfo); +// TODO(mbkkt) unfortunately key can have same columns as row +// I can detect this but maybe better +// if IScan will provide for us "how much data did we read"? +ui64 CountBytes(TArrayRef key, const NTable::TRowState& row); + } From c047a118a474e7e9def95f358d5e10453410a43c Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Fri, 1 Nov 2024 05:57:31 +0000 Subject: [PATCH 2/7] Fix billing in SchemeShard --- ydb/core/protos/index_builder.proto | 5 +- .../schemeshard/schemeshard__monitoring.cpp | 1 - .../schemeshard/schemeshard_build_index.cpp | 32 ++++-- .../schemeshard_build_index__progress.cpp | 52 +++++----- .../schemeshard_build_index_tx_base.cpp | 16 +-- ydb/core/tx/schemeshard/schemeshard_impl.h | 3 +- .../tx/schemeshard/schemeshard_info_types.cpp | 97 ++++--------------- .../tx/schemeshard/schemeshard_info_types.h | 87 ++++++++++++----- ydb/core/tx/schemeshard/schemeshard_schema.h | 38 ++++++-- 9 files changed, 177 insertions(+), 154 deletions(-) diff --git a/ydb/core/protos/index_builder.proto b/ydb/core/protos/index_builder.proto index baea28bf6f40..c148577ddeeb 100644 --- a/ydb/core/protos/index_builder.proto +++ b/ydb/core/protos/index_builder.proto @@ -124,7 +124,6 @@ message TEvUploadSampleKResponse { optional Ydb.StatusIds.StatusCode UploadStatus = 2; repeated Ydb.Issue.IssueMessage Issues = 3; - optional uint64 RowsDelta = 4; - optional uint64 BytesDelta = 5; + optional uint64 UploadRows = 4; + optional uint64 UploadBytes = 5; } - diff --git a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp index 606415bad0c1..76278ac524f6 100644 --- a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp @@ -786,7 +786,6 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas TABLEH() {str << "DebugMessage";} TABLEH() {str << "SeqNo";} TABLEH() {str << "Processed";} - TABLEH() {str << "Billed";} } } for (auto item : info.Shards) { diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp index e79576fcc2e6..eaeb88e5799e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp @@ -199,11 +199,22 @@ void TSchemeShard::PersistBuildIndexUnlockTxId(NIceDb::TNiceDb& db, const TIndex NIceDb::TUpdate(indexInfo.UnlockTxId)); } -void TSchemeShard::PersistBuildIndexBilling(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { +void TSchemeShard::PersistBuildIndexProcessed(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { db.Table().Key(indexInfo.Id).Update( - NIceDb::TUpdate(indexInfo.Billed.GetRows()), - NIceDb::TUpdate(indexInfo.Billed.GetBytes()) - ); + NIceDb::TUpdate(indexInfo.Processed.GetUploadRows()), + NIceDb::TUpdate(indexInfo.Processed.GetUploadBytes()), + NIceDb::TUpdate(indexInfo.Processed.GetReadRows()), + NIceDb::TUpdate(indexInfo.Processed.GetReadBytes()) + ); +} + +void TSchemeShard::PersistBuildIndexBilled(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { + db.Table().Key(indexInfo.Id).Update( + NIceDb::TUpdate(indexInfo.Billed.GetUploadRows()), + NIceDb::TUpdate(indexInfo.Billed.GetUploadBytes()), + NIceDb::TUpdate(indexInfo.Billed.GetReadRows()), + NIceDb::TUpdate(indexInfo.Billed.GetReadBytes()) + ); } void TSchemeShard::PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, const TIndexBuildInfo::TShardStatus& shardStatus) { @@ -212,8 +223,10 @@ void TSchemeShard::PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBu NIceDb::TUpdate(shardStatus.Status), NIceDb::TUpdate(shardStatus.DebugMessage), NIceDb::TUpdate(shardStatus.UploadStatus), - NIceDb::TUpdate(shardStatus.Processed.GetRows()), - NIceDb::TUpdate(shardStatus.Processed.GetBytes()) + NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), + NIceDb::TUpdate(shardStatus.Processed.GetReadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetReadBytes()) ); } @@ -230,8 +243,13 @@ void TSchemeShard::PersistBuildIndexUploadInitiate(NIceDb::TNiceDb& db, TIndexBu void TSchemeShard::PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, TIndexBuildInfo::TShardStatus& shardStatus) { shardStatus.Status = NKikimrIndexBuilder::EBuildStatus::INVALID; + shardStatus.Processed = {}; db.Table().Key(buildId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update( - NIceDb::TUpdate(shardStatus.Status) + NIceDb::TUpdate(shardStatus.Status), + NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), + NIceDb::TUpdate(shardStatus.Processed.GetReadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetReadBytes()) ); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index e013bd6bd7bd..7ee23db86c08 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -215,8 +215,8 @@ class TUploadSampleK: public TActorBootstrapped { TAutoPtr response = new TEvIndexBuilder::TEvUploadSampleKResponse; response->Record.SetId(BuildIndexId); - response->Record.SetRowsDelta(Rows->size()); - response->Record.SetBytesDelta(RowsBytes); + response->Record.SetUploadRows(Rows->size()); + response->Record.SetUploadBytes(RowsBytes); UploadStatusToMessage(response->Record); @@ -698,15 +698,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil std::array typeInfos{NScheme::NTypeIds::Uint32}; auto range = ParentRange(buildInfo); for (const auto& [idx, status] : buildInfo.Shards) { + if (buildInfo.KMeans.Parent != 0 + && Intersect(typeInfos, range.ToTableRange(), status.Range.ToTableRange()).IsEmptyRange(typeInfos)) { + continue; + } switch (status.Status) { case NKikimrIndexBuilder::EBuildStatus::INVALID: - if (buildInfo.KMeans.Parent != 0) { - if (!Intersect(typeInfos, range.ToTableRange(), status.Range.ToTableRange()).IsEmptyRange(typeInfos)) { - buildInfo.ToUploadShards.emplace_back(idx); - } - break; - } - [[fallthrough]]; + buildInfo.ToUploadShards.emplace_back(idx); + break; case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: case NKikimrIndexBuilder::EBuildStatus::ABORTED: @@ -767,6 +766,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil Self->PersistBuildIndexUploadReset(db, BuildId, idx, status); } buildInfo.DoneShards.clear(); + Self->PersistBuildIndexProcessed(db, buildInfo); // TODO(mbkkt) persist buildInfo.KMeans changes if (!buildInfo.Sample.Rows.empty()) { if (!buildInfo.Sample.Sent) { @@ -921,6 +921,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil Self->PersistBuildIndexApplyTxStatus(db, buildInfo); Self->PersistBuildIndexApplyTxDone(db, buildInfo); Self->PersistBuildIndexUploadReset(db, buildInfo); + Self->PersistBuildIndexProcessed(db, buildInfo); ChangeState(BuildId, TIndexBuildInfo::EState::CreateBuild); Progress(BuildId); @@ -1287,11 +1288,9 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K); } - if (record.HasRowsDelta() || record.HasBytesDelta()) { - TBillingStats delta(record.GetRowsDelta(), record.GetBytesDelta()); - shardStatus.Processed += delta; - buildInfo.Processed += delta; - } + TBillingStats stats{0, 0, record.GetReadRows(), record.GetReadBytes()}; + shardStatus.Processed += stats; + buildInfo.Processed += stats; NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); @@ -1412,7 +1411,9 @@ struct TSchemeShard::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard return true; } - // TODO(mbkkt) add billing + TBillingStats stats{record.GetUploadRows(), record.GetUploadBytes(), record.GetReadRows(), record.GetReadBytes()}; + shardStatus.Processed += stats; + buildInfo.Processed += stats; NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); @@ -1510,15 +1511,17 @@ struct TSchemeShard::TIndexBuilder::TTxReplyUpload: public TSchemeShard::TIndexB switch (const auto state = buildInfo.State; state) { case TIndexBuildInfo::EState::Filling: { - if (record.HasRowsDelta() || record.HasBytesDelta()) { - TBillingStats delta(record.GetRowsDelta(), record.GetBytesDelta()); - buildInfo.Processed += delta; - } + NIceDb::TNiceDb db(txc.DB); + + TBillingStats stats{record.GetUploadRows(), record.GetUploadBytes(), 0, 0}; + buildInfo.Processed += stats; + // As long as we don't try to upload sample in parallel with requests to shards, + // it's okay to persist Processed not incrementally + Self->PersistBuildIndexProcessed(db, buildInfo); NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); - NIceDb::TNiceDb db(txc.DB); auto status = record.GetUploadStatus(); if (status == Ydb::StatusIds::SUCCESS) { ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Filling); @@ -1639,11 +1642,10 @@ struct TSchemeShard::TIndexBuilder::TTxReplyProgress: public TSchemeShard::TInde shardStatus.LastKeyAck = record.GetLastKeyAck(); } - if (record.HasRowsDelta() || record.HasBytesDelta()) { - TBillingStats delta(record.GetRowsDelta(), record.GetBytesDelta()); - shardStatus.Processed += delta; - buildInfo.Processed += delta; - } + // TODO(mbkkt) we should account uploads and reads separately + TBillingStats stats{record.GetRowsDelta(), record.GetBytesDelta(), record.GetRowsDelta(), record.GetBytesDelta()}; + shardStatus.Processed += stats; + buildInfo.Processed += stats; shardStatus.Status = record.GetStatus(); shardStatus.UploadStatus = record.GetUploadStatus(); diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index 022a6d272951..43a3bfacbb91 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -52,8 +52,8 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplySchedule(const TActorContext& ct } ui64 TSchemeShard::TIndexBuilder::TTxBase::RequestUnits(const TBillingStats& stats) { - return TRUCalculator::ReadTable(stats.GetBytes()) - + TRUCalculator::BulkUpsert(stats.GetBytes(), stats.GetRows()); + return TRUCalculator::ReadTable(stats.GetReadBytes()) + + TRUCalculator::BulkUpsert(stats.GetUploadBytes(), stats.GetUploadRows()); } void TSchemeShard::TIndexBuilder::TTxBase::RoundPeriod(TInstant& start, TInstant& end) { @@ -93,8 +93,10 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTrans const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); Y_VERIFY_S(buildInfoPtr, "IndexBuilds has no " << buildId); auto& buildInfo = *buildInfoPtr->Get(); + auto& processed = buildInfo.Processed; + auto& billed = buildInfo.Billed; - TBillingStats toBill = buildInfo.Processed - buildInfo.Billed; + TBillingStats toBill = processed - billed; if (!toBill) { continue; } @@ -139,16 +141,16 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTrans NIceDb::TNiceDb db(txc.DB); - buildInfo.Billed += toBill; - Self->PersistBuildIndexBilling(db, buildInfo); + billed += toBill; + Self->PersistBuildIndexBilled(db, buildInfo); ui64 requestUnits = RequestUnits(toBill); TString id = TStringBuilder() << buildId << "-" << buildInfo.TablePathId.OwnerId << "-" << buildInfo.TablePathId.LocalPathId << "-" - << buildInfo.Billed.GetRows() << "-" << buildInfo.Billed.GetBytes() << "-" - << buildInfo.Processed.GetRows() << "-" << buildInfo.Processed.GetBytes(); + << billed.GetUploadRows() + billed.GetReadRows() << "-" << billed.GetUploadBytes() + billed.GetReadBytes() << "-" + << processed.GetUploadRows() + processed.GetReadRows() << "-" << processed.GetUploadBytes() + processed.GetReadBytes(); const TString billRecord = TBillRecord() .Id(id) diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index fdc08090647e..740b0dd9c25d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1320,7 +1320,8 @@ class TSchemeShard void PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, const TIndexBuildInfo::TShardStatus& shardStatus); void PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuildId buildId, const TShardIdx& shardIdx, TIndexBuildInfo::TShardStatus& shardStatus); void PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuildInfo& info); - void PersistBuildIndexBilling(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); + void PersistBuildIndexProcessed(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); + void PersistBuildIndexBilled(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); void PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index e29ff7f4f7b1..4dbcc415f566 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -471,7 +471,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( if (!featureFlags.EnableParameterizedDecimal && decimalType != NScheme::TDecimalType::Default()){ errStr = Sprintf("Type '%s' specified for column '%s', but support for parametrized decimal is disabled (EnableParameterizedDecimal feature flag is off)", col.GetType().data(), colName.data()); return nullptr; - } + } break; } case NScheme::NTypeIds::Pg: @@ -479,7 +479,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( errStr = Sprintf("Type '%s' specified for column '%s', but support for pg types is disabled (EnableTablePgTypes feature flag is off)", col.GetType().data(), colName.data()); return nullptr; } - break; + break; default: break; } @@ -2332,98 +2332,39 @@ bool TTopicInfo::FillKeySchema(const TString& tabletConfig) { return FillKeySchema(proto, unused); } -TBillingStats::TBillingStats(ui64 rows, ui64 bytes) - : Rows(rows) - , Bytes(bytes) -{ -} - -TBillingStats::TBillingStats(const TBillingStats &other) - : Rows(other.Rows) - , Bytes(other.Bytes) +TBillingStats::TBillingStats(ui64 readRows, ui64 readBytes, ui64 uploadRows, ui64 uploadBytes) + : UploadRows{uploadRows} + , UploadBytes{uploadBytes} + , ReadRows{readRows} + , ReadBytes{readBytes} { } -TBillingStats &TBillingStats::operator =(const TBillingStats &other) { - if (this == &other) { - return *this; - } - - Rows = other.Rows; - Bytes = other.Bytes; - return *this; -} - TBillingStats TBillingStats::operator -(const TBillingStats &other) const { - Y_ABORT_UNLESS(Rows >= other.Rows); - Y_ABORT_UNLESS(Bytes >= other.Bytes); + Y_ABORT_UNLESS(UploadRows >= other.UploadRows); + Y_ABORT_UNLESS(UploadBytes >= other.UploadBytes); + Y_ABORT_UNLESS(ReadRows >= other.ReadRows); + Y_ABORT_UNLESS(ReadBytes >= other.ReadBytes); - return TBillingStats(Rows - other.Rows, Bytes - other.Bytes); -} - -TBillingStats &TBillingStats::operator -=(const TBillingStats &other) { - if (this == &other) { - Rows = 0; - Bytes = 0; - return *this; - } - - Y_ABORT_UNLESS(Rows >= other.Rows); - Y_ABORT_UNLESS(Bytes >= other.Bytes); - - Rows -= other.Rows; - Bytes -= other.Bytes; - return *this; + return {UploadRows - other.UploadRows, UploadBytes - other.UploadBytes, + ReadRows - other.ReadRows, ReadBytes - other.ReadBytes}; } TBillingStats TBillingStats::operator +(const TBillingStats &other) const { - return TBillingStats(Rows + other.Rows, Bytes + other.Bytes); -} - -TBillingStats &TBillingStats::operator +=(const TBillingStats &other) { - if (this == &other) { - Rows += Rows; - Bytes += Bytes; - return *this; - } - - Rows += other.Rows; - Bytes += other.Bytes; - return *this; -} - -bool TBillingStats::operator < (const TBillingStats &other) const { - return Rows < other.Rows && Bytes < other.Bytes; -} - -bool TBillingStats::operator <= (const TBillingStats &other) const { - return Rows <= other.Rows && Bytes <= other.Bytes; -} - -bool TBillingStats::operator ==(const TBillingStats &other) const { - return Rows == other.Rows && Bytes == other.Bytes; + return {UploadRows + other.UploadRows, UploadBytes + other.UploadBytes, + ReadRows + other.ReadRows, ReadBytes + other.ReadBytes}; } TString TBillingStats::ToString() const { return TStringBuilder() << "{" - << " rows: " << GetRows() - << " bytes: " << GetBytes() + << " upload rows: " << UploadRows + << ", upload bytes: " << UploadBytes + << ", read rows: " << ReadRows + << ", read bytes: " << ReadBytes << " }"; } -ui64 TBillingStats::GetRows() const { - return Rows; -} - -ui64 TBillingStats::GetBytes() const { - return Bytes; -} - -NKikimr::NSchemeShard::TBillingStats::operator bool() const { - return Rows || Bytes; -} - TSequenceInfo::TSequenceInfo( ui64 alterVersion, NKikimrSchemeOp::TSequenceDescription&& description, diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index cd790cd31f7d..e31f655d441f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2865,31 +2865,48 @@ struct TImportInfo: public TSimpleRefCount { class TBillingStats { public: TBillingStats() = default; - TBillingStats(ui64 rows, ui64 bytes); - TBillingStats(const TBillingStats& other); + TBillingStats(const TBillingStats& other) = default; + TBillingStats& operator = (const TBillingStats& other) = default; - TBillingStats& operator = (const TBillingStats& other); + TBillingStats(ui64 uploadRows, ui64 uploadBytes, ui64 readRows, ui64 readBytes); TBillingStats operator - (const TBillingStats& other) const; - TBillingStats& operator -= (const TBillingStats& other); + TBillingStats& operator -= (const TBillingStats& other) { + return *this = *this - other; + } TBillingStats operator + (const TBillingStats& other) const; - TBillingStats& operator += (const TBillingStats& other); + TBillingStats& operator += (const TBillingStats& other) { + return *this = *this + other; + } - bool operator < (const TBillingStats& other) const; - bool operator <= (const TBillingStats& other) const; - bool operator == (const TBillingStats& other) const; + bool operator == (const TBillingStats& other) const = default; - operator bool () const; + explicit operator bool () const { + return *this != TBillingStats{}; + } TString ToString() const; - ui64 GetRows() const; - ui64 GetBytes() const; + ui64 GetUploadRows() const { + return UploadRows; + } + ui64 GetUploadBytes() const { + return UploadBytes; + } + + ui64 GetReadRows() const { + return ReadRows; + } + ui64 GetReadBytes() const { + return ReadBytes; + } private: - ui64 Rows = 0; - ui64 Bytes = 0; + ui64 UploadRows = 0; + ui64 UploadBytes = 0; + ui64 ReadRows = 0; + ui64 ReadBytes = 0; }; // TODO(mbkkt) separate it to 3 classes: TBuildColumnsInfo TBuildSecondaryInfo TBuildVectorInfo with single base TBuildInfo @@ -3394,9 +3411,24 @@ struct TIndexBuildInfo: public TSimpleRefCount { row.template GetValueOrDefault( indexInfo->AlterMainTableTxDone); - indexInfo->Billed = TBillingStats( - row.template GetValueOrDefault(0), - row.template GetValueOrDefault(0)); + auto& billed = indexInfo->Billed; + billed = { + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + }; + if (billed.GetUploadRows() != 0 && billed.GetReadRows() == 0 && indexInfo->IsFillBuildIndex()) { + // old format: assign upload to read + billed += {0, 0, billed.GetUploadRows(), billed.GetUploadBytes()}; + } + + indexInfo->Processed = { + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + }; return indexInfo; } @@ -3428,19 +3460,28 @@ struct TIndexBuildInfo: public TSimpleRefCount { Schema::IndexBuildShardStatus::UploadStatus>( Ydb::StatusIds::STATUS_CODE_UNSPECIFIED); - shardStatus.Processed = TBillingStats( - row.template GetValueOrDefault< - Schema::IndexBuildShardStatus::RowsProcessed>(0), - row.template GetValueOrDefault< - Schema::IndexBuildShardStatus::BytesProcessed>(0)); - - Processed += shardStatus.Processed; + auto& processed = shardStatus.Processed; + processed = { + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + }; + if (processed.GetUploadRows() != 0 && processed.GetReadRows() == 0 && IsFillBuildIndex()) { + // old format: assign upload to read + processed += {0, 0, processed.GetUploadRows(), processed.GetUploadBytes()}; + } + Processed += processed; } bool IsCancellationRequested() const { return CancelRequested; } + bool IsFillBuildIndex() const { + return IsBuildSecondaryIndex() || IsBuildColumns(); + } + bool IsBuildSecondaryIndex() const { return BuildKind == EBuildKind::BuildSecondaryIndex; } diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index db5df8b9de83..e45c24a8bed7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1316,8 +1316,8 @@ struct Schema : NIceDb::Schema { struct MaxRetries : Column<27, NScheme::NTypeIds::Uint32> {}; - struct RowsBilled : Column<28, NScheme::NTypeIds::Uint64> {}; - struct BytesBilled : Column<29, NScheme::NTypeIds::Uint64> {}; + struct UploadRowsBilled : Column<28, NScheme::NTypeIds::Uint64> {}; + struct UploadBytesBilled : Column<29, NScheme::NTypeIds::Uint64> {}; struct BuildKind : Column<30, NScheme::NTypeIds::Uint32> {}; @@ -1328,6 +1328,15 @@ struct Schema : NIceDb::Schema { // Serialized as string NKikimrSchemeOp::TIndexCreationConfig protobuf. struct CreationConfig : Column<34, NScheme::NTypeIds::String> { using Type = TString; }; + struct ReadRowsBilled : Column<35, NScheme::NTypeIds::Uint64> {}; + struct ReadBytesBilled : Column<36, NScheme::NTypeIds::Uint64> {}; + + struct UploadRowsProcessed : Column<37, NScheme::NTypeIds::Uint64> {}; + struct UploadBytesProcessed : Column<38, NScheme::NTypeIds::Uint64> {}; + + struct ReadRowsProcessed : Column<39, NScheme::NTypeIds::Uint64> {}; + struct ReadBytesProcessed : Column<40, NScheme::NTypeIds::Uint64> {}; + using TKey = TableKey; using TColumns = TableColumns< Id, @@ -1357,13 +1366,19 @@ struct Schema : NIceDb::Schema { UnlockTxDone, CancelRequest, MaxRetries, - RowsBilled, - BytesBilled, + UploadRowsBilled, + UploadBytesBilled, BuildKind, AlterMainTableTxId, AlterMainTableTxStatus, AlterMainTableTxDone, - CreationConfig + CreationConfig, + ReadRowsBilled, + ReadBytesBilled, + UploadRowsProcessed, + UploadBytesProcessed, + ReadRowsProcessed, + ReadBytesProcessed >; }; @@ -1431,8 +1446,11 @@ struct Schema : NIceDb::Schema { struct Message : Column<7, NScheme::NTypeIds::Utf8> {}; struct UploadStatus : Column<8, NScheme::NTypeIds::Uint32> { using Type = Ydb::StatusIds::StatusCode; }; - struct RowsProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; - struct BytesProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; + struct UploadRowsProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; + struct UploadBytesProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; + + struct ReadRowsProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; + struct ReadBytesProcessed : Column<11, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -1444,8 +1462,10 @@ struct Schema : NIceDb::Schema { Status, Message, UploadStatus, - RowsProcessed, - BytesProcessed + UploadRowsProcessed, + UploadBytesProcessed, + ReadRowsProcessed, + ReadBytesProcessed >; }; From 32a5e06d5f33c1470485408cb0b8faa11a744490 Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Fri, 1 Nov 2024 07:37:10 +0000 Subject: [PATCH 3/7] Fix billing in SchemeShard --- ydb/core/tx/schemeshard/schemeshard_schema.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index e45c24a8bed7..566a634001f5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1449,8 +1449,8 @@ struct Schema : NIceDb::Schema { struct UploadRowsProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; struct UploadBytesProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; - struct ReadRowsProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; - struct ReadBytesProcessed : Column<11, NScheme::NTypeIds::Uint64> {}; + struct ReadRowsProcessed : Column<11, NScheme::NTypeIds::Uint64> {}; + struct ReadBytesProcessed : Column<12, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey; using TColumns = TableColumns< From fc0f10527e8aa5c00648b7833a8f81a585af3476 Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Fri, 1 Nov 2024 08:01:33 +0000 Subject: [PATCH 4/7] Better id string --- .../schemeshard/schemeshard_build_index_tx_base.cpp | 12 ++++++------ .../tx/schemeshard/ut_index_build/ut_index_build.cpp | 2 +- .../ut_index_build/ut_vector_index_build.cpp | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index 43a3bfacbb91..3b47b452705c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -139,6 +139,12 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTrans continue; } + TString id = TStringBuilder() + << buildId << "-" + << buildInfo.TablePathId.OwnerId << "-" << buildInfo.TablePathId.LocalPathId << "-" + << billed.GetUploadRows() + billed.GetReadRows() << "-" << billed.GetUploadBytes() + billed.GetReadBytes() << "-" + << processed.GetUploadRows() + processed.GetReadRows() << "-" << processed.GetUploadBytes() + processed.GetReadBytes(); + NIceDb::TNiceDb db(txc.DB); billed += toBill; @@ -146,12 +152,6 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTrans ui64 requestUnits = RequestUnits(toBill); - TString id = TStringBuilder() - << buildId << "-" - << buildInfo.TablePathId.OwnerId << "-" << buildInfo.TablePathId.LocalPathId << "-" - << billed.GetUploadRows() + billed.GetReadRows() << "-" << billed.GetUploadBytes() + billed.GetReadBytes() << "-" - << processed.GetUploadRows() + processed.GetReadRows() << "-" << processed.GetUploadBytes() + processed.GetReadBytes(); - const TString billRecord = TBillRecord() .Id(id) .CloudId(cloud_id) diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp index 7ca8c653a3c0..f52f25d20097 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp @@ -299,7 +299,7 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":0,"quantity":179,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-101-1818-101-1818","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; + const TString meteringData = R"({"usage":{"start":0,"quantity":179,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-202-3636","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData + "\n"); diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index bc40d41d6d8b..5ece3601879d 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-404-3486-404-3486","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; + const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-604-3976","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData); From 498896d1e1833920af988713834d0dc275b56dac Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Fri, 1 Nov 2024 09:56:57 +0000 Subject: [PATCH 5/7] Apply review suggestions --- .../tx/schemeshard/schemeshard_build_index.cpp | 12 ++++++------ .../schemeshard_build_index_tx_base.cpp | 6 ++++-- ydb/core/tx/schemeshard/schemeshard_info_types.h | 8 ++++---- ydb/core/tx/schemeshard/schemeshard_schema.h | 16 ++++++++-------- .../ut_index_build/ut_index_build.cpp | 2 +- .../ut_index_build/ut_vector_index_build.cpp | 2 +- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp index eaeb88e5799e..83cf89d80cb3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp @@ -210,8 +210,8 @@ void TSchemeShard::PersistBuildIndexProcessed(NIceDb::TNiceDb& db, const TIndexB void TSchemeShard::PersistBuildIndexBilled(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { db.Table().Key(indexInfo.Id).Update( - NIceDb::TUpdate(indexInfo.Billed.GetUploadRows()), - NIceDb::TUpdate(indexInfo.Billed.GetUploadBytes()), + NIceDb::TUpdate(indexInfo.Billed.GetUploadRows()), + NIceDb::TUpdate(indexInfo.Billed.GetUploadBytes()), NIceDb::TUpdate(indexInfo.Billed.GetReadRows()), NIceDb::TUpdate(indexInfo.Billed.GetReadBytes()) ); @@ -223,8 +223,8 @@ void TSchemeShard::PersistBuildIndexUploadProgress(NIceDb::TNiceDb& db, TIndexBu NIceDb::TUpdate(shardStatus.Status), NIceDb::TUpdate(shardStatus.DebugMessage), NIceDb::TUpdate(shardStatus.UploadStatus), - NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), - NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), NIceDb::TUpdate(shardStatus.Processed.GetReadRows()), NIceDb::TUpdate(shardStatus.Processed.GetReadBytes()) ); @@ -246,8 +246,8 @@ void TSchemeShard::PersistBuildIndexUploadReset(NIceDb::TNiceDb& db, TIndexBuild shardStatus.Processed = {}; db.Table().Key(buildId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update( NIceDb::TUpdate(shardStatus.Status), - NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), - NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadRows()), + NIceDb::TUpdate(shardStatus.Processed.GetUploadBytes()), NIceDb::TUpdate(shardStatus.Processed.GetReadRows()), NIceDb::TUpdate(shardStatus.Processed.GetReadBytes()) ); diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index 3b47b452705c..dec6877e3ff6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -142,8 +142,10 @@ void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTrans TString id = TStringBuilder() << buildId << "-" << buildInfo.TablePathId.OwnerId << "-" << buildInfo.TablePathId.LocalPathId << "-" - << billed.GetUploadRows() + billed.GetReadRows() << "-" << billed.GetUploadBytes() + billed.GetReadBytes() << "-" - << processed.GetUploadRows() + processed.GetReadRows() << "-" << processed.GetUploadBytes() + processed.GetReadBytes(); + << billed.GetUploadRows() << "-" << billed.GetReadRows() << "-" + << billed.GetUploadBytes() << "-" << billed.GetReadBytes() << "-" + << processed.GetUploadRows() << "-" << processed.GetReadRows() << "-" + << processed.GetUploadBytes() << "-" << processed.GetReadBytes(); NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index e31f655d441f..163f1d59a076 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -3413,8 +3413,8 @@ struct TIndexBuildInfo: public TSimpleRefCount { auto& billed = indexInfo->Billed; billed = { - row.template GetValueOrDefault(0), - row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), row.template GetValueOrDefault(0), row.template GetValueOrDefault(0), }; @@ -3462,8 +3462,8 @@ struct TIndexBuildInfo: public TSimpleRefCount { auto& processed = shardStatus.Processed; processed = { - row.template GetValueOrDefault(0), - row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), + row.template GetValueOrDefault(0), row.template GetValueOrDefault(0), row.template GetValueOrDefault(0), }; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 566a634001f5..0e8f3b31ead1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1316,8 +1316,8 @@ struct Schema : NIceDb::Schema { struct MaxRetries : Column<27, NScheme::NTypeIds::Uint32> {}; - struct UploadRowsBilled : Column<28, NScheme::NTypeIds::Uint64> {}; - struct UploadBytesBilled : Column<29, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ RowsBilled : Column<28, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ BytesBilled : Column<29, NScheme::NTypeIds::Uint64> {}; struct BuildKind : Column<30, NScheme::NTypeIds::Uint32> {}; @@ -1366,8 +1366,8 @@ struct Schema : NIceDb::Schema { UnlockTxDone, CancelRequest, MaxRetries, - UploadRowsBilled, - UploadBytesBilled, + RowsBilled, + BytesBilled, BuildKind, AlterMainTableTxId, AlterMainTableTxStatus, @@ -1446,8 +1446,8 @@ struct Schema : NIceDb::Schema { struct Message : Column<7, NScheme::NTypeIds::Utf8> {}; struct UploadStatus : Column<8, NScheme::NTypeIds::Uint32> { using Type = Ydb::StatusIds::StatusCode; }; - struct UploadRowsProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; - struct UploadBytesProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ RowsProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; + struct /*Upload*/ BytesProcessed : Column<10, NScheme::NTypeIds::Uint64> {}; struct ReadRowsProcessed : Column<11, NScheme::NTypeIds::Uint64> {}; struct ReadBytesProcessed : Column<12, NScheme::NTypeIds::Uint64> {}; @@ -1462,8 +1462,8 @@ struct Schema : NIceDb::Schema { Status, Message, UploadStatus, - UploadRowsProcessed, - UploadBytesProcessed, + RowsProcessed, + BytesProcessed, ReadRowsProcessed, ReadBytesProcessed >; diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp index f52f25d20097..39d2f3c98ef6 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp @@ -299,7 +299,7 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":0,"quantity":179,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-202-3636","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; + const TString meteringData = R"({"usage":{"start":0,"quantity":179,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-101-101-1818-1818","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData + "\n"); diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index 5ece3601879d..eda961bbc0d9 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-604-3976","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; + const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-404-1290-2686","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData); From a640494b641ba6a955718dd8e94b31d4924e057d Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Fri, 1 Nov 2024 13:23:28 +0000 Subject: [PATCH 6/7] Canonize tests --- .../flat_schemeshard.schema | 52 ++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 5b08a08f41d9..3b29dea4b7bb 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -4805,6 +4805,36 @@ "ColumnId": 34, "ColumnName": "CreationConfig", "ColumnType": "String" + }, + { + "ColumnId": 35, + "ColumnName": "ReadRowsBilled", + "ColumnType": "Uint64" + }, + { + "ColumnId": 36, + "ColumnName": "ReadBytesBilled", + "ColumnType": "Uint64" + }, + { + "ColumnId": 37, + "ColumnName": "UploadRowsProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 38, + "ColumnName": "UploadBytesProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 39, + "ColumnName": "ReadRowsProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 40, + "ColumnName": "ReadBytesProcessed", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -4844,7 +4874,13 @@ 31, 32, 33, - 34 + 34, + 35, + 36, + 37, + 38, + 39, + 40 ], "RoomID": 0, "Codec": 0, @@ -5112,6 +5148,16 @@ "ColumnId": 10, "ColumnName": "BytesProcessed", "ColumnType": "Uint64" + }, + { + "ColumnId": 11, + "ColumnName": "ReadRowsProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 12, + "ColumnName": "ReadBytesProcessed", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -5127,7 +5173,9 @@ 7, 8, 9, - 10 + 10, + 11, + 12 ], "RoomID": 0, "Codec": 0, From 6819f4780149849c780ef5405c825745fde88508 Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Fri, 1 Nov 2024 10:41:51 +0000 Subject: [PATCH 7/7] Add local kmeans --- .../schemeshard/schemeshard_build_index.cpp | 4 + .../schemeshard_build_index__progress.cpp | 185 +++++++++++++++++- ydb/core/tx/schemeshard/schemeshard_impl.cpp | 1 + ydb/core/tx/schemeshard/schemeshard_impl.h | 3 + .../tx/schemeshard/schemeshard_info_types.h | 2 +- .../ut_index_build/ut_vector_index_build.cpp | 2 +- 6 files changed, 188 insertions(+), 9 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp index 83cf89d80cb3..6b984e002ccb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp @@ -36,6 +36,10 @@ void TSchemeShard::Handle(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& ev, co Execute(CreateTxReply(ev), ctx); } +void TSchemeShard::Handle(TEvDataShard::TEvLocalKMeansResponse::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxReply(ev), ctx); +} + void TSchemeShard::Handle(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& ev, const TActorContext& ctx) { Execute(CreateTxReply(ev), ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 7ee23db86c08..f33fcf1e7f34 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -585,6 +585,46 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev)); } + void SendKMeansLocalRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) { + Y_ASSERT(buildInfo.IsBuildVectorIndex()); + auto ev = MakeHolder(); + ev->Record.SetId(ui64(BuildId)); + + auto path = TPath::Init(buildInfo.TablePathId, Self).Dive(buildInfo.IndexName); + if (buildInfo.KMeans.Level == 0) { + PathIdFromPathId(buildInfo.TablePathId, ev->Record.MutablePathId()); + } else { + PathIdFromPathId(path.Dive(buildInfo.KMeans.ReadFrom())->PathId, ev->Record.MutablePathId()); + path.Rise(); + } + *ev->Record.MutableSettings() = std::get( + buildInfo.SpecializedIndexDescription).GetSettings().settings(); + ev->Record.SetK(buildInfo.KMeans.K); + ev->Record.SetUpload(buildInfo.KMeans.GetUpload()); + ev->Record.SetState(NKikimrTxDataShard::TEvLocalKMeansRequest::SAMPLE); + + ev->Record.SetDoneRounds(0); + ev->Record.SetNeedsRounds(3); // TODO(mbkkt) should be configurable + + ev->Record.SetParent(buildInfo.KMeans.Parent); + ev->Record.SetChild(buildInfo.KMeans.ChildBegin); + + ev->Record.SetPostingName(path.Dive(buildInfo.KMeans.WriteTo()).PathString()); + path.Rise().Dive(NTableIndex::NTableVectorKmeansTreeIndex::LevelTable); + ev->Record.SetLevelName(path.PathString()); + + ev->Record.SetEmbeddingColumn(buildInfo.IndexColumns[0]); + *ev->Record.MutableDataColumns() = { + buildInfo.DataColumns.begin(), buildInfo.DataColumns.end() + }; + + auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo); + ev->Record.SetSeed(ui64(shardId)); + LOG_D("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString()); + + ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev)); + } + void SendBuildIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) { auto ev = MakeHolder(); ev->Record.SetBuildIndexId(ui64(BuildId)); @@ -719,10 +759,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil Y_ABORT("Unreachable"); } } - // TODO(mbkkt) enable detection of Local case - // if (buildInfo.ToUploadShards.size() == 1 && buildInfo.DoneShardsSize == 0) { - // buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local; - // } + if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.size() == 1) { + buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local; + } } bool SendKMeansSample(TIndexBuildInfo& buildInfo) { @@ -739,6 +778,10 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansReshuffleRequest(shardIdx, buildInfo); }); } + bool SendKMeansLocal(TIndexBuildInfo& buildInfo) { + return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest(shardIdx, buildInfo); }); + } + bool SendVectorIndex(TIndexBuildInfo& buildInfo) { switch (buildInfo.KMeans.State) { case TIndexBuildInfo::TKMeans::Sample: @@ -748,9 +791,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil // return SendKMeansRecompute(buildInfo); case TIndexBuildInfo::TKMeans::Reshuffle: return SendKMeansReshuffle(buildInfo); - // TODO(mbkkt) - // case TIndexBuildInfo::TKMeans::Local: - // return SendKMeansLocal(buildInfo); + case TIndexBuildInfo::TKMeans::Local: + return SendKMeansLocal(buildInfo); } return true; } @@ -1357,6 +1399,131 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex } }; +struct TSchemeShard::TIndexBuilder::TTxReplyLocalKMeans: public TSchemeShard::TIndexBuilder::TTxReply { +private: + TEvDataShard::TEvLocalKMeansResponse::TPtr Local; + +public: + explicit TTxReplyLocalKMeans(TSelf* self, TEvDataShard::TEvLocalKMeansResponse::TPtr& local) + : TTxReply(self) + , Local(local) + { + } + + bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + auto& record = Local->Get()->Record; + + LOG_I("TTxReply : TEvLocalKMeansResponse, Id# " << record.GetId()); + + const auto buildId = TIndexBuildId(record.GetId()); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + if (!buildInfoPtr) { + return true; + } + auto& buildInfo = *buildInfoPtr->Get(); + LOG_D("TTxReply : TEvLocalKMeansResponse" + << ", TIndexBuildInfo: " << buildInfo + << ", record: " << record.ShortDebugString()); + + TTabletId shardId = TTabletId(record.GetTabletId()); + if (!Self->TabletIdToShardIdx.contains(shardId)) { + return true; + } + + TShardIdx shardIdx = Self->TabletIdToShardIdx.at(shardId); + if (!buildInfo.Shards.contains(shardIdx)) { + return true; + } + + switch (const auto state = buildInfo.State; state) { + case TIndexBuildInfo::EState::Filling: + { + TIndexBuildInfo::TShardStatus& shardStatus = buildInfo.Shards.at(shardIdx); + + auto actualSeqNo = std::pair(Self->Generation(), shardStatus.SeqNoRound); + auto recordSeqNo = std::pair(record.GetRequestSeqNoGeneration(), record.GetRequestSeqNoRound()); + + if (actualSeqNo != recordSeqNo) { + LOG_D("TTxReply : TEvLocalKMeansResponse" + << " ignore progress message by seqNo" + << ", TIndexBuildInfo: " << buildInfo + << ", actual seqNo for the shard " << shardId << " (" << shardIdx << ") is: " << Self->Generation() << ":" << shardStatus.SeqNoRound + << ", record: " << record.ShortDebugString()); + Y_ABORT_UNLESS(actualSeqNo > recordSeqNo); + return true; + } + + TBillingStats stats{record.GetUploadRows(), record.GetUploadBytes(), record.GetReadRows(), record.GetReadBytes()}; + shardStatus.Processed += stats; + buildInfo.Processed += stats; + + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetIssues(), issues); + shardStatus.DebugMessage = issues.ToString(); + + NIceDb::TNiceDb db(txc.DB); + shardStatus.Status = record.GetStatus(); + + switch (shardStatus.Status) { + case NKikimrIndexBuilder::EBuildStatus::DONE: + if (buildInfo.InProgressShards.erase(shardIdx)) { + buildInfo.DoneShards.emplace_back(shardIdx); + } + break; + case NKikimrIndexBuilder::EBuildStatus::ABORTED: + // datashard gracefully rebooted, reschedule shard + if (buildInfo.InProgressShards.erase(shardIdx)) { + buildInfo.ToUploadShards.emplace_front(shardIdx); + } + break; + case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: + case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: + buildInfo.Issue += TStringBuilder() + << "One of the shards report "<< shardStatus.Status + << " at Filling stage, process has to be canceled" + << ", shardId: " << shardId + << ", shardIdx: " << shardIdx; + Self->PersistBuildIndexIssue(db, buildInfo); + ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); + + Progress(buildId); + return true; + case NKikimrIndexBuilder::EBuildStatus::INVALID: + case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: + case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: + Y_ABORT("Unreachable"); + } + Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(buildId, shardId, ctx); + Progress(buildId); + break; + } + case TIndexBuildInfo::EState::AlterMainTable: + case TIndexBuildInfo::EState::Invalid: + case TIndexBuildInfo::EState::Locking: + case TIndexBuildInfo::EState::GatheringStatistics: + case TIndexBuildInfo::EState::Initiating: + case TIndexBuildInfo::EState::DropBuild: + case TIndexBuildInfo::EState::CreateBuild: + case TIndexBuildInfo::EState::Applying: + case TIndexBuildInfo::EState::Unlocking: + case TIndexBuildInfo::EState::Done: + Y_FAIL_S("Unreachable " << Name(state)); + case TIndexBuildInfo::EState::Cancellation_Applying: + case TIndexBuildInfo::EState::Cancellation_Unlocking: + case TIndexBuildInfo::EState::Cancelled: + case TIndexBuildInfo::EState::Rejection_Applying: + case TIndexBuildInfo::EState::Rejection_Unlocking: + case TIndexBuildInfo::EState::Rejected: + LOG_D("TTxReply : TEvLocalKMeansResponse" + << " superfluous message " << record.ShortDebugString()); + break; + } + + return true; + } +}; + struct TSchemeShard::TIndexBuilder::TTxReplyReshuffleKMeans: public TSchemeShard::TIndexBuilder::TTxReply { private: TEvDataShard::TEvReshuffleKMeansResponse::TPtr Reshuffle; @@ -2112,6 +2279,10 @@ ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvReshuffleKMeansRespon return new TIndexBuilder::TTxReplyReshuffleKMeans(this, reshuffle); } +ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvLocalKMeansResponse::TPtr& local) { + return new TIndexBuilder::TTxReplyLocalKMeans(this, local); +} + ITransaction* TSchemeShard::CreateTxReply(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload) { return new TIndexBuilder::TTxReplyUpload(this, upload); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index ae4044e9a907..d781619000f5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4758,6 +4758,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvPrivate::TEvIndexBuildingMakeABill, Handle); HFuncTraced(TEvDataShard::TEvSampleKResponse, Handle); HFuncTraced(TEvDataShard::TEvReshuffleKMeansResponse, Handle); + HFuncTraced(TEvDataShard::TEvLocalKMeansResponse, Handle); HFuncTraced(TEvIndexBuilder::TEvUploadSampleKResponse, Handle); // } // NIndexBuilder diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 740b0dd9c25d..b4e9df456bd5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1346,6 +1346,7 @@ class TSchemeShard struct TTxReplyRetry; struct TTxReplySampleK; struct TTxReplyReshuffleKMeans; + struct TTxReplyLocalKMeans; struct TTxReplyUpload; struct TTxPipeReset; @@ -1364,6 +1365,7 @@ class TSchemeShard NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& progress); NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvSampleKResponse::TPtr& sampleK); NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& reshuffle); + NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvLocalKMeansResponse::TPtr& local); NTabletFlatExecutor::ITransaction* CreateTxReply(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload); NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId); NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev); @@ -1377,6 +1379,7 @@ class TSchemeShard void Handle(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvSampleKResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvReshuffleKMeansResponse::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvLocalKMeansResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 163f1d59a076..12ee6c637d38 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -3015,7 +3015,7 @@ struct TIndexBuildInfo: public TSimpleRefCount { Sample = 0, // Recompute, Reshuffle, - // Local, + Local, }; ui32 Level = 0; diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index eda961bbc0d9..b7c8b91585ad 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":2,"quantity":330,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-404-1290-2686","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; + const TString meteringData = R"({"usage":{"start":2,"quantity":128,"finish":2,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-0-0-0-0-200-0-1290-0","cloud_id":"CLOUD_ID_VAL","source_wt":2,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData);