Skip to content

Fix vector index billing #11174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions ydb/core/protos/index_builder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ message TEvUploadSampleKResponse {
optional Ydb.StatusIds.StatusCode UploadStatus = 2;
repeated Ydb.Issue.IssueMessage Issues = 3;

optional uint64 RowsDelta = 4;
optional uint64 BytesDelta = 5;
optional uint64 UploadRows = 4;
optional uint64 UploadBytes = 5;
}

23 changes: 14 additions & 9 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1500,8 +1500,8 @@ message TEvSampleKResponse {
optional NKikimrIndexBuilder.EBuildStatus Status = 4;
repeated Ydb.Issue.IssueMessage Issues = 5;

optional uint64 RowsDelta = 6;
optional uint64 BytesDelta = 7;
optional uint64 ReadRows = 6;
optional uint64 ReadBytes = 7;

optional uint64 RequestSeqNoGeneration = 8;
optional uint64 RequestSeqNoRound = 9;
Expand Down Expand Up @@ -1568,12 +1568,14 @@ message TEvLocalKMeansResponse {
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
repeated Ydb.Issue.IssueMessage Issues = 7;

// TODO(mbkkt) implement slow-path (reliable-path)
// optional uint64 RowsDelta = 8;
// optional uint64 BytesDelta = 9;
optional uint64 UploadRows = 8;
optional uint64 UploadBytes = 9;
optional uint64 ReadRows = 10;
optional uint64 ReadBytes = 11;

// optional TEvLocalKMeansRequest.EState State = 10;
// optional uint32 DoneRounds = 11;
// TODO(mbkkt) implement slow-path (reliable-path)
// optional TEvLocalKMeansRequest.EState State
// optional uint32 DoneRounds
}

message TEvReshuffleKMeansRequest {
Expand Down Expand Up @@ -1617,9 +1619,12 @@ message TEvReshuffleKMeansResponse {
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
repeated Ydb.Issue.IssueMessage Issues = 7;

optional uint64 UploadRows = 8;
optional uint64 UploadBytes = 9;
optional uint64 ReadRows = 10;
optional uint64 ReadBytes = 11;

// TODO(mbkkt) implement slow-path (reliable-path)
// optional uint64 RowsDelta = 8;
// optional uint64 BytesDelta = 9;
// optional last written primary key
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/kmeans_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void AddRowBuild2Build(TBufferData& buffer, ui32 parent, TArrayRef<const TCell>
}

void AddRowBuild2Posting(TBufferData& buffer, ui32 parent, TArrayRef<const TCell> key, const NTable::TRowState& row,
ui32 dataPos)
ui32 dataPos)
{
std::array<TCell, 1> cells;
cells[0] = TCell::Make(parent);
Expand Down
9 changes: 1 addition & 8 deletions ydb/core/tx/datashard/kmeans_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,12 @@ struct TCalculation: TMetric {
}
};

struct TStats {
ui64 Rows = 0;
ui64 Bytes = 0;
};

template <typename TMetric>
ui32 FeedEmbedding(const TCalculation<TMetric>& calculation, std::span<const TString> clusters,
const NTable::TRowState& row, NTable::TPos embeddingPos, TStats& stats)
const NTable::TRowState& row, NTable::TPos embeddingPos)
{
Y_ASSERT(embeddingPos < row.Size());
const auto embedding = row.Get(embeddingPos).AsRef();
stats.Rows += 1;
stats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead?
if (!calculation.IsExpectedSize(embedding)) {
return std::numeric_limits<ui32>::max();
}
Expand Down
32 changes: 22 additions & 10 deletions ydb/core/tx/datashard/local_kmeans.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::

TLead Lead;

// Sample
TStats ReadStats;
// TODO(mbkkt) Sent or Upload stats?
ui64 ReadRows = 0;
ui64 ReadBytes = 0;

// Sample
ui64 MaxProbability = std::numeric_limits<ui64>::max();
TReallyFastRng32 Rng;

Expand Down Expand Up @@ -101,6 +101,9 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::

TUploadStatus UploadStatus;

ui64 UploadRows = 0;
ui64 UploadBytes = 0;

// Response
TActorId ResponseActorId;
TAutoPtr<TEvDataShard::TEvLocalKMeansResponse> Response;
Expand Down Expand Up @@ -163,6 +166,10 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::
}

auto& record = Response->Record;
record.SetReadRows(ReadRows);
record.SetReadBytes(ReadBytes);
record.SetUploadRows(UploadRows);
record.SetUploadBytes(UploadBytes);
if (abort != EAbort::None) {
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
} else if (UploadStatus.IsSuccess()) {
Expand Down Expand Up @@ -243,6 +250,8 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::
UploadStatus.StatusCode = ev->Get()->Status;
UploadStatus.Issues = ev->Get()->Issues;
if (UploadStatus.IsSuccess()) {
UploadRows += WriteBuf.GetRows();
UploadBytes += WriteBuf.GetBytes();
WriteBuf.Clear();
if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) {
ReadBuf.FlushTo(WriteBuf);
Expand Down Expand Up @@ -366,6 +375,9 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
if (!InitAggregatedClusters()) {
// We don't need to do anything,
// because this datashard doesn't have valid embeddings for this parent
if (UploadStatus.IsNone()) {
UploadStatus.StatusCode = Ydb::StatusIds::SUCCESS;
}
return EScan::Final;
}
++Round;
Expand All @@ -391,6 +403,8 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final
{
LOG_T("Feed " << Debug());
++ReadRows;
ReadBytes += CountBytes(key, row);
switch (State) {
case EState::SAMPLE:
return FeedSample(row);
Expand Down Expand Up @@ -467,8 +481,6 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
{
Y_ASSERT(row.Size() == 1);
const auto embedding = row.Get(0).AsRef();
ReadStats.Rows += 1;
ReadStats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead?
if (!this->IsExpectedSize(embedding)) {
return EScan::Feed;
}
Expand All @@ -495,14 +507,14 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
EScan FeedKMeans(const TRow& row) noexcept
{
Y_ASSERT(row.Size() == 1);
const ui32 pos = FeedEmbedding(*this, Clusters, row, 0, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, 0);
AggregateToCluster(pos, row.Get(0).Data());
return EScan::Feed;
}

EScan FeedUploadMain2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand All @@ -512,7 +524,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<

EScan FeedUploadMain2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand All @@ -522,7 +534,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<

EScan FeedUploadBuild2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand All @@ -532,7 +544,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<

EScan FeedUploadBuild2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand Down
23 changes: 17 additions & 6 deletions ydb/core/tx/datashard/reshuffle_kmeans.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public

TLead Lead;

TStats ReadStats;
// TODO(mbkkt) Sent or Upload stats?
ui64 ReadRows = 0;
ui64 ReadBytes = 0;

std::vector<TString> Clusters;

Expand All @@ -63,6 +63,9 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public

TUploadStatus UploadStatus;

ui64 UploadRows = 0;
ui64 UploadBytes = 0;

// Response
TActorId ResponseActorId;
TAutoPtr<TEvDataShard::TEvReshuffleKMeansResponse> Response;
Expand Down Expand Up @@ -138,6 +141,10 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public
}

auto& record = Response->Record;
record.SetReadRows(ReadRows);
record.SetReadBytes(ReadBytes);
record.SetUploadRows(UploadRows);
record.SetUploadBytes(UploadBytes);
if (abort != EAbort::None) {
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
} else if (UploadStatus.IsSuccess()) {
Expand Down Expand Up @@ -218,6 +225,8 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public
UploadStatus.StatusCode = ev->Get()->Status;
UploadStatus.Issues = ev->Get()->Issues;
if (UploadStatus.IsSuccess()) {
UploadRows += WriteBuf.GetRows();
UploadBytes += WriteBuf.GetBytes();
WriteBuf.Clear();
if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) {
ReadBuf.FlushTo(WriteBuf);
Expand Down Expand Up @@ -282,6 +291,8 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final
{
LOG_T("Feed " << Debug());
++ReadRows;
ReadBytes += CountBytes(key, row);
switch (UploadState) {
case EState::UPLOAD_MAIN_TO_BUILD:
return FeedUploadMain2Build(key, row);
Expand All @@ -299,7 +310,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc
private:
EScan FeedUploadMain2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand All @@ -309,7 +320,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc

EScan FeedUploadMain2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand All @@ -319,7 +330,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc

EScan FeedUploadBuild2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand All @@ -329,7 +340,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc

EScan FeedUploadBuild2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
{
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
if (pos > K) {
return EScan::Feed;
}
Expand Down
34 changes: 13 additions & 21 deletions ydb/core/tx/datashard/sample_k.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
auto operator<=>(const TProbability&) const noexcept = default;
};

ui64 RowsCount = 0;
ui64 RowsBytes = 0;
ui64 ReadRows = 0;
ui64 ReadBytes = 0;

// We are using binary heap, because we don't want to do batch processing here,
// serialization is more expensive than compare
Expand Down Expand Up @@ -116,27 +116,28 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {

EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final {
LOG_T("Feed key " << DebugPrintPoint(KeyTypes, key, *AppData()->TypeRegistry) << " " << Debug());
++RowsCount;
++ReadRows;
ReadBytes += CountBytes(key, row);

const auto probability = GetProbability();
if (probability > MaxProbability) {
// TODO(mbkkt) it's not nice that we need to compute this, probably can be precomputed in TRow
RowsBytes += TSerializedCellVec::SerializedSize(*row);
return EScan::Feed;
}

auto serialized = TSerializedCellVec::Serialize(*row);
RowsBytes += serialized.size();

if (DataRows.size() < K) {
MaxRows.push_back({probability, DataRows.size()});
DataRows.emplace_back(std::move(serialized));
DataRows.emplace_back(TSerializedCellVec::Serialize(*row));
if (DataRows.size() == K) {
std::make_heap(MaxRows.begin(), MaxRows.end());
MaxProbability = MaxRows.front().P;
}
} else {
ReplaceRow(std::move(serialized), probability);
// TODO(mbkkt) use tournament tree to make less compare and swaps
std::pop_heap(MaxRows.begin(), MaxRows.end());
TSerializedCellVec::Serialize(DataRows[MaxRows.back().I], *row);
MaxRows.back().P = probability;
std::push_heap(MaxRows.begin(), MaxRows.end());
MaxProbability = MaxRows.front().P;
}

if (MaxProbability == 0) {
Expand All @@ -147,6 +148,8 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {

TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final {
Y_ABORT_UNLESS(Response);
Response->Record.SetReadRows(ReadRows);
Response->Record.SetReadBytes(ReadBytes);
if (abort == EAbort::None) {
FillResponse();
} else {
Expand Down Expand Up @@ -187,24 +190,13 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
}
}

void ReplaceRow(TString&& row, ui64 p) {
// TODO(mbkkt) use tournament tree to make less compare and swaps
std::pop_heap(MaxRows.begin(), MaxRows.end());
DataRows[MaxRows.back().I] = std::move(row);
MaxRows.back().P = p;
std::push_heap(MaxRows.begin(), MaxRows.end());
MaxProbability = MaxRows.front().P;
}

void FillResponse() {
std::sort(MaxRows.begin(), MaxRows.end());
auto& record = Response->Record;
for (auto& [p, i] : MaxRows) {
record.AddProbabilities(p);
record.AddRows(std::move(DataRows[i]));
}
record.SetRowsDelta(RowsCount);
record.SetBytesDelta(RowsBytes);
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE);
}

Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/datashard/scan_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ TColumnsTypes GetAllTypes(const TUserTable& tableInfo) {
return result;
}

ui64 CountBytes(TArrayRef<const TCell> key, const NTable::TRowState& row) {
ui64 bytes = 0;
for (auto& cell : key) {
bytes += cell.Size();
}
for (auto& cell : *row) {
bytes += cell.Size();
}
return bytes;
}

}
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/scan_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ using TColumnsTypes = THashMap<TString, NScheme::TTypeInfo>;

TColumnsTypes GetAllTypes(const TUserTable& tableInfo);

// TODO(mbkkt) unfortunately key can have same columns as row
// I can detect this but maybe better
// if IScan will provide for us "how much data did we read"?
ui64 CountBytes(TArrayRef<const TCell> key, const NTable::TRowState& row);

}
Loading
Loading