Skip to content

Commit 87ae150

Browse files
authored
Fix vector index billing (#11174)
1 parent 6595c0e commit 87ae150

20 files changed

+315
-213
lines changed

ydb/core/protos/index_builder.proto

+2-3
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ message TEvUploadSampleKResponse {
124124
optional Ydb.StatusIds.StatusCode UploadStatus = 2;
125125
repeated Ydb.Issue.IssueMessage Issues = 3;
126126

127-
optional uint64 RowsDelta = 4;
128-
optional uint64 BytesDelta = 5;
127+
optional uint64 UploadRows = 4;
128+
optional uint64 UploadBytes = 5;
129129
}
130-

ydb/core/protos/tx_datashard.proto

+14-9
Original file line numberDiff line numberDiff line change
@@ -1500,8 +1500,8 @@ message TEvSampleKResponse {
15001500
optional NKikimrIndexBuilder.EBuildStatus Status = 4;
15011501
repeated Ydb.Issue.IssueMessage Issues = 5;
15021502

1503-
optional uint64 RowsDelta = 6;
1504-
optional uint64 BytesDelta = 7;
1503+
optional uint64 ReadRows = 6;
1504+
optional uint64 ReadBytes = 7;
15051505

15061506
optional uint64 RequestSeqNoGeneration = 8;
15071507
optional uint64 RequestSeqNoRound = 9;
@@ -1568,12 +1568,14 @@ message TEvLocalKMeansResponse {
15681568
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
15691569
repeated Ydb.Issue.IssueMessage Issues = 7;
15701570

1571-
// TODO(mbkkt) implement slow-path (reliable-path)
1572-
// optional uint64 RowsDelta = 8;
1573-
// optional uint64 BytesDelta = 9;
1571+
optional uint64 UploadRows = 8;
1572+
optional uint64 UploadBytes = 9;
1573+
optional uint64 ReadRows = 10;
1574+
optional uint64 ReadBytes = 11;
15741575

1575-
// optional TEvLocalKMeansRequest.EState State = 10;
1576-
// optional uint32 DoneRounds = 11;
1576+
// TODO(mbkkt) implement slow-path (reliable-path)
1577+
// optional TEvLocalKMeansRequest.EState State
1578+
// optional uint32 DoneRounds
15771579
}
15781580

15791581
message TEvReshuffleKMeansRequest {
@@ -1617,9 +1619,12 @@ message TEvReshuffleKMeansResponse {
16171619
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
16181620
repeated Ydb.Issue.IssueMessage Issues = 7;
16191621

1622+
optional uint64 UploadRows = 8;
1623+
optional uint64 UploadBytes = 9;
1624+
optional uint64 ReadRows = 10;
1625+
optional uint64 ReadBytes = 11;
1626+
16201627
// TODO(mbkkt) implement slow-path (reliable-path)
1621-
// optional uint64 RowsDelta = 8;
1622-
// optional uint64 BytesDelta = 9;
16231628
// optional last written primary key
16241629
}
16251630

ydb/core/tx/datashard/kmeans_helper.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void AddRowBuild2Build(TBufferData& buffer, ui32 parent, TArrayRef<const TCell>
5656
}
5757

5858
void AddRowBuild2Posting(TBufferData& buffer, ui32 parent, TArrayRef<const TCell> key, const NTable::TRowState& row,
59-
ui32 dataPos)
59+
ui32 dataPos)
6060
{
6161
std::array<TCell, 1> cells;
6262
cells[0] = TCell::Make(parent);

ydb/core/tx/datashard/kmeans_helper.h

+1-8
Original file line numberDiff line numberDiff line change
@@ -184,19 +184,12 @@ struct TCalculation: TMetric {
184184
}
185185
};
186186

187-
struct TStats {
188-
ui64 Rows = 0;
189-
ui64 Bytes = 0;
190-
};
191-
192187
template <typename TMetric>
193188
ui32 FeedEmbedding(const TCalculation<TMetric>& calculation, std::span<const TString> clusters,
194-
const NTable::TRowState& row, NTable::TPos embeddingPos, TStats& stats)
189+
const NTable::TRowState& row, NTable::TPos embeddingPos)
195190
{
196191
Y_ASSERT(embeddingPos < row.Size());
197192
const auto embedding = row.Get(embeddingPos).AsRef();
198-
stats.Rows += 1;
199-
stats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead?
200193
if (!calculation.IsExpectedSize(embedding)) {
201194
return std::numeric_limits<ui32>::max();
202195
}

ydb/core/tx/datashard/local_kmeans.cpp

+22-10
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::
6060

6161
TLead Lead;
6262

63-
// Sample
64-
TStats ReadStats;
65-
// TODO(mbkkt) Sent or Upload stats?
63+
ui64 ReadRows = 0;
64+
ui64 ReadBytes = 0;
6665

66+
// Sample
6767
ui64 MaxProbability = std::numeric_limits<ui64>::max();
6868
TReallyFastRng32 Rng;
6969

@@ -101,6 +101,9 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::
101101

102102
TUploadStatus UploadStatus;
103103

104+
ui64 UploadRows = 0;
105+
ui64 UploadBytes = 0;
106+
104107
// Response
105108
TActorId ResponseActorId;
106109
TAutoPtr<TEvDataShard::TEvLocalKMeansResponse> Response;
@@ -163,6 +166,10 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::
163166
}
164167

165168
auto& record = Response->Record;
169+
record.SetReadRows(ReadRows);
170+
record.SetReadBytes(ReadBytes);
171+
record.SetUploadRows(UploadRows);
172+
record.SetUploadBytes(UploadBytes);
166173
if (abort != EAbort::None) {
167174
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
168175
} else if (UploadStatus.IsSuccess()) {
@@ -243,6 +250,8 @@ class TLocalKMeansScanBase: public TActor<TLocalKMeansScanBase>, public NTable::
243250
UploadStatus.StatusCode = ev->Get()->Status;
244251
UploadStatus.Issues = ev->Get()->Issues;
245252
if (UploadStatus.IsSuccess()) {
253+
UploadRows += WriteBuf.GetRows();
254+
UploadBytes += WriteBuf.GetBytes();
246255
WriteBuf.Clear();
247256
if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) {
248257
ReadBuf.FlushTo(WriteBuf);
@@ -366,6 +375,9 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
366375
if (!InitAggregatedClusters()) {
367376
// We don't need to do anything,
368377
// because this datashard doesn't have valid embeddings for this parent
378+
if (UploadStatus.IsNone()) {
379+
UploadStatus.StatusCode = Ydb::StatusIds::SUCCESS;
380+
}
369381
return EScan::Final;
370382
}
371383
++Round;
@@ -391,6 +403,8 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
391403
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final
392404
{
393405
LOG_T("Feed " << Debug());
406+
++ReadRows;
407+
ReadBytes += CountBytes(key, row);
394408
switch (State) {
395409
case EState::SAMPLE:
396410
return FeedSample(row);
@@ -467,8 +481,6 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
467481
{
468482
Y_ASSERT(row.Size() == 1);
469483
const auto embedding = row.Get(0).AsRef();
470-
ReadStats.Rows += 1;
471-
ReadStats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead?
472484
if (!this->IsExpectedSize(embedding)) {
473485
return EScan::Feed;
474486
}
@@ -495,14 +507,14 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
495507
EScan FeedKMeans(const TRow& row) noexcept
496508
{
497509
Y_ASSERT(row.Size() == 1);
498-
const ui32 pos = FeedEmbedding(*this, Clusters, row, 0, ReadStats);
510+
const ui32 pos = FeedEmbedding(*this, Clusters, row, 0);
499511
AggregateToCluster(pos, row.Get(0).Data());
500512
return EScan::Feed;
501513
}
502514

503515
EScan FeedUploadMain2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
504516
{
505-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
517+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
506518
if (pos > K) {
507519
return EScan::Feed;
508520
}
@@ -512,7 +524,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
512524

513525
EScan FeedUploadMain2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
514526
{
515-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
527+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
516528
if (pos > K) {
517529
return EScan::Feed;
518530
}
@@ -522,7 +534,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
522534

523535
EScan FeedUploadBuild2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
524536
{
525-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
537+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
526538
if (pos > K) {
527539
return EScan::Feed;
528540
}
@@ -532,7 +544,7 @@ class TLocalKMeansScan final: public TLocalKMeansScanBase, private TCalculation<
532544

533545
EScan FeedUploadBuild2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
534546
{
535-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
547+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
536548
if (pos > K) {
537549
return EScan::Feed;
538550
}

ydb/core/tx/datashard/reshuffle_kmeans.cpp

+17-6
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public
3838

3939
TLead Lead;
4040

41-
TStats ReadStats;
42-
// TODO(mbkkt) Sent or Upload stats?
41+
ui64 ReadRows = 0;
42+
ui64 ReadBytes = 0;
4343

4444
std::vector<TString> Clusters;
4545

@@ -63,6 +63,9 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public
6363

6464
TUploadStatus UploadStatus;
6565

66+
ui64 UploadRows = 0;
67+
ui64 UploadBytes = 0;
68+
6669
// Response
6770
TActorId ResponseActorId;
6871
TAutoPtr<TEvDataShard::TEvReshuffleKMeansResponse> Response;
@@ -138,6 +141,10 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public
138141
}
139142

140143
auto& record = Response->Record;
144+
record.SetReadRows(ReadRows);
145+
record.SetReadBytes(ReadBytes);
146+
record.SetUploadRows(UploadRows);
147+
record.SetUploadBytes(UploadBytes);
141148
if (abort != EAbort::None) {
142149
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
143150
} else if (UploadStatus.IsSuccess()) {
@@ -218,6 +225,8 @@ class TReshuffleKMeansScanBase: public TActor<TReshuffleKMeansScanBase>, public
218225
UploadStatus.StatusCode = ev->Get()->Status;
219226
UploadStatus.Issues = ev->Get()->Issues;
220227
if (UploadStatus.IsSuccess()) {
228+
UploadRows += WriteBuf.GetRows();
229+
UploadBytes += WriteBuf.GetBytes();
221230
WriteBuf.Clear();
222231
if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) {
223232
ReadBuf.FlushTo(WriteBuf);
@@ -282,6 +291,8 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc
282291
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final
283292
{
284293
LOG_T("Feed " << Debug());
294+
++ReadRows;
295+
ReadBytes += CountBytes(key, row);
285296
switch (UploadState) {
286297
case EState::UPLOAD_MAIN_TO_BUILD:
287298
return FeedUploadMain2Build(key, row);
@@ -299,7 +310,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc
299310
private:
300311
EScan FeedUploadMain2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
301312
{
302-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
313+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
303314
if (pos > K) {
304315
return EScan::Feed;
305316
}
@@ -309,7 +320,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc
309320

310321
EScan FeedUploadMain2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
311322
{
312-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
323+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
313324
if (pos > K) {
314325
return EScan::Feed;
315326
}
@@ -319,7 +330,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc
319330

320331
EScan FeedUploadBuild2Build(TArrayRef<const TCell> key, const TRow& row) noexcept
321332
{
322-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
333+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
323334
if (pos > K) {
324335
return EScan::Feed;
325336
}
@@ -329,7 +340,7 @@ class TReshuffleKMeansScan final: public TReshuffleKMeansScanBase, private TCalc
329340

330341
EScan FeedUploadBuild2Posting(TArrayRef<const TCell> key, const TRow& row) noexcept
331342
{
332-
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos, ReadStats);
343+
const ui32 pos = FeedEmbedding(*this, Clusters, row, EmbeddingPos);
333344
if (pos > K) {
334345
return EScan::Feed;
335346
}

ydb/core/tx/datashard/sample_k.cpp

+13-21
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
4444
auto operator<=>(const TProbability&) const noexcept = default;
4545
};
4646

47-
ui64 RowsCount = 0;
48-
ui64 RowsBytes = 0;
47+
ui64 ReadRows = 0;
48+
ui64 ReadBytes = 0;
4949

5050
// We are using binary heap, because we don't want to do batch processing here,
5151
// serialization is more expensive than compare
@@ -116,27 +116,28 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
116116

117117
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final {
118118
LOG_T("Feed key " << DebugPrintPoint(KeyTypes, key, *AppData()->TypeRegistry) << " " << Debug());
119-
++RowsCount;
119+
++ReadRows;
120+
ReadBytes += CountBytes(key, row);
120121

121122
const auto probability = GetProbability();
122123
if (probability > MaxProbability) {
123-
// TODO(mbkkt) it's not nice that we need to compute this, probably can be precomputed in TRow
124-
RowsBytes += TSerializedCellVec::SerializedSize(*row);
125124
return EScan::Feed;
126125
}
127126

128-
auto serialized = TSerializedCellVec::Serialize(*row);
129-
RowsBytes += serialized.size();
130-
131127
if (DataRows.size() < K) {
132128
MaxRows.push_back({probability, DataRows.size()});
133-
DataRows.emplace_back(std::move(serialized));
129+
DataRows.emplace_back(TSerializedCellVec::Serialize(*row));
134130
if (DataRows.size() == K) {
135131
std::make_heap(MaxRows.begin(), MaxRows.end());
136132
MaxProbability = MaxRows.front().P;
137133
}
138134
} else {
139-
ReplaceRow(std::move(serialized), probability);
135+
// TODO(mbkkt) use tournament tree to make less compare and swaps
136+
std::pop_heap(MaxRows.begin(), MaxRows.end());
137+
TSerializedCellVec::Serialize(DataRows[MaxRows.back().I], *row);
138+
MaxRows.back().P = probability;
139+
std::push_heap(MaxRows.begin(), MaxRows.end());
140+
MaxProbability = MaxRows.front().P;
140141
}
141142

142143
if (MaxProbability == 0) {
@@ -147,6 +148,8 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
147148

148149
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final {
149150
Y_ABORT_UNLESS(Response);
151+
Response->Record.SetReadRows(ReadRows);
152+
Response->Record.SetReadBytes(ReadBytes);
150153
if (abort == EAbort::None) {
151154
FillResponse();
152155
} else {
@@ -187,24 +190,13 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
187190
}
188191
}
189192

190-
void ReplaceRow(TString&& row, ui64 p) {
191-
// TODO(mbkkt) use tournament tree to make less compare and swaps
192-
std::pop_heap(MaxRows.begin(), MaxRows.end());
193-
DataRows[MaxRows.back().I] = std::move(row);
194-
MaxRows.back().P = p;
195-
std::push_heap(MaxRows.begin(), MaxRows.end());
196-
MaxProbability = MaxRows.front().P;
197-
}
198-
199193
void FillResponse() {
200194
std::sort(MaxRows.begin(), MaxRows.end());
201195
auto& record = Response->Record;
202196
for (auto& [p, i] : MaxRows) {
203197
record.AddProbabilities(p);
204198
record.AddRows(std::move(DataRows[i]));
205199
}
206-
record.SetRowsDelta(RowsCount);
207-
record.SetBytesDelta(RowsBytes);
208200
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE);
209201
}
210202

ydb/core/tx/datashard/scan_common.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,15 @@ TColumnsTypes GetAllTypes(const TUserTable& tableInfo) {
2929
return result;
3030
}
3131

32+
ui64 CountBytes(TArrayRef<const TCell> key, const NTable::TRowState& row) {
33+
ui64 bytes = 0;
34+
for (auto& cell : key) {
35+
bytes += cell.Size();
36+
}
37+
for (auto& cell : *row) {
38+
bytes += cell.Size();
39+
}
40+
return bytes;
41+
}
42+
3243
}

ydb/core/tx/datashard/scan_common.h

+5
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,9 @@ using TColumnsTypes = THashMap<TString, NScheme::TTypeInfo>;
8080

8181
TColumnsTypes GetAllTypes(const TUserTable& tableInfo);
8282

83+
// TODO(mbkkt) unfortunately key can have same columns as row
84+
// I can detect this but maybe better
85+
// if IScan will provide for us "how much data did we read"?
86+
ui64 CountBytes(TArrayRef<const TCell> key, const NTable::TRowState& row);
87+
8388
}

0 commit comments

Comments
 (0)