Skip to content

Commit 0787510

Browse files
authored
Add local kmeans actor scan (#8756)
1 parent 4d73159 commit 0787510

18 files changed

+1934
-149
lines changed

ydb/core/protos/out/out.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#include <ydb/public/api/protos/ydb_table.pb.h>
2-
31
#include <ydb/core/protos/blobstorage.pb.h>
42
#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
53
#include <ydb/core/protos/blobstorage_vdisk_config.pb.h>
@@ -254,6 +252,10 @@ Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value)
254252
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
255253
}
256254

257-
Y_DECLARE_OUT_SPEC(, Ydb::Table::IndexBuildState_State, stream, value) {
258-
stream << IndexBuildState_State_Name(value);
255+
Y_DECLARE_OUT_SPEC(, NKikimrIndexBuilder::EBuildStatus, stream, value) {
256+
stream << NKikimrIndexBuilder::EBuildStatus_Name(value);
257+
}
258+
259+
Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvLocalKMeansRequest_EState, stream, value) {
260+
stream << NKikimrTxDataShard::TEvLocalKMeansRequest_EState_Name(value);
259261
}

ydb/core/protos/tx_datashard.proto

+67
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import "ydb/core/protos/subdomains.proto";
1616
import "ydb/core/protos/query_stats.proto";
1717
import "ydb/public/api/protos/ydb_issue_message.proto";
1818
import "ydb/public/api/protos/ydb_status_codes.proto";
19+
import "ydb/public/api/protos/ydb_table.proto";
1920
import "ydb/library/yql/dq/actors/protos/dq_events.proto";
2021
import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
2122
import "ydb/library/yql/dq/proto/dq_tasks.proto";
@@ -1485,6 +1486,72 @@ message TEvSampleKResponse {
14851486
repeated bytes Rows = 11;
14861487
}
14871488

1489+
message TEvLocalKMeansRequest {
1490+
optional uint64 Id = 1;
1491+
1492+
optional uint64 TabletId = 2;
1493+
optional NKikimrProto.TPathID PathId = 3;
1494+
1495+
optional uint64 SnapshotTxId = 4;
1496+
optional uint64 SnapshotStep = 5;
1497+
1498+
optional uint64 SeqNoGeneration = 6;
1499+
optional uint64 SeqNoRound = 7;
1500+
1501+
optional Ydb.Table.VectorIndexSettings Settings = 8;
1502+
1503+
optional uint64 Seed = 9;
1504+
optional uint32 K = 10;
1505+
1506+
enum EState {
1507+
UNSPECIFIED = 0;
1508+
SAMPLE = 1;
1509+
KMEANS = 2;
1510+
UPLOAD_MAIN_TO_TMP = 3;
1511+
UPLOAD_MAIN_TO_POSTING = 4;
1512+
UPLOAD_TMP_TO_TMP = 5;
1513+
UPLOAD_TMP_TO_POSTING = 6;
1514+
DONE = 7;
1515+
};
1516+
optional EState Upload = 11;
1517+
// State != DONE
1518+
optional EState State = 12;
1519+
// State != KMEANS || DoneRounds < NeedsRounds
1520+
optional uint32 DoneRounds = 13;
1521+
optional uint32 NeedsRounds = 14;
1522+
1523+
// id of parent cluster
1524+
optional uint32 Parent = 15;
1525+
// [Child ... Child + K] ids reserved for our clusters
1526+
optional uint32 Child = 16;
1527+
1528+
optional string LevelName = 17;
1529+
optional string PostingName = 18;
1530+
1531+
optional string EmbeddingColumn = 19;
1532+
repeated string DataColumns = 20;
1533+
}
1534+
1535+
message TEvLocalKMeansProgressResponse {
1536+
optional uint64 Id = 1;
1537+
1538+
optional uint64 TabletId = 2;
1539+
optional NKikimrProto.TPathID PathId = 3;
1540+
1541+
optional uint64 RequestSeqNoGeneration = 4;
1542+
optional uint64 RequestSeqNoRound = 5;
1543+
1544+
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
1545+
repeated Ydb.Issue.IssueMessage Issues = 7;
1546+
1547+
// TODO(mbkkt) implement slow-path (reliable-path)
1548+
// optional uint64 RowsDelta = 8;
1549+
// optional uint64 BytesDelta = 9;
1550+
1551+
// optional TEvLocalKMeansRequest.EState State = 10;
1552+
// optional uint32 DoneRounds = 11;
1553+
}
1554+
14881555
message TEvCdcStreamScanRequest {
14891556
message TLimits {
14901557
optional uint32 BatchMaxBytes = 1 [default = 512000];

ydb/core/tablet_flat/flat_scan_lead.h

+12-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ namespace NTable {
99

1010
struct TLead {
1111
void To(TTagsRef tags, TArrayRef<const TCell> key, ESeek seek)
12+
{
13+
To(key, seek);
14+
SetTags(tags);
15+
}
16+
17+
void To(TArrayRef<const TCell> key, ESeek seek)
1218
{
1319
Valid = true;
14-
Tags.assign(tags.begin(), tags.end());
1520
Relation = seek;
1621
Key = TSerializedCellVec(key);
1722
StopKey = { };
@@ -24,6 +29,10 @@ namespace NTable {
2429
StopKeyInclusive = inclusive;
2530
}
2631

32+
void SetTags(TTagsRef tags) {
33+
Tags.assign(tags.begin(), tags.end());
34+
}
35+
2736
explicit operator bool() const noexcept
2837
{
2938
return Valid;
@@ -34,12 +43,12 @@ namespace NTable {
3443
Valid = false;
3544
}
3645

37-
bool Valid = false;
3846
ESeek Relation = ESeek::Exact;
47+
bool Valid = false;
48+
bool StopKeyInclusive = true;
3949
TVector<ui32> Tags;
4050
TSerializedCellVec Key;
4151
TSerializedCellVec StopKey;
42-
bool StopKeyInclusive = true;
4352
};
4453

4554
}

ydb/core/tx/datashard/buffer_data.h

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#include "ydb/core/scheme/scheme_tablecell.h"
2+
#include "ydb/core/tx/datashard/upload_stats.h"
3+
#include "ydb/core/tx/tx_proxy/upload_rows.h"
4+
5+
namespace NKikimr::NDataShard {
6+
7+
class TBufferData: public IStatHolder, public TNonCopyable {
8+
public:
9+
TBufferData()
10+
: Rows{std::make_shared<NTxProxy::TUploadRows>()} {
11+
}
12+
13+
ui64 GetRows() const override final {
14+
return Rows->size();
15+
}
16+
17+
auto GetRowsData() const {
18+
return Rows;
19+
}
20+
21+
ui64 GetBytes() const override final {
22+
return ByteSize;
23+
}
24+
25+
void FlushTo(TBufferData& other) {
26+
Y_ABORT_UNLESS(this != &other);
27+
Y_ABORT_UNLESS(other.IsEmpty());
28+
other.Rows.swap(Rows);
29+
other.ByteSize = std::exchange(ByteSize, 0);
30+
other.LastKey = std::exchange(LastKey, {});
31+
}
32+
33+
void Clear() {
34+
Rows->clear();
35+
ByteSize = 0;
36+
LastKey = {};
37+
}
38+
39+
void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
40+
Rows->emplace_back(std::move(targetPk), std::move(targetValue));
41+
ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
42+
LastKey = std::move(key);
43+
}
44+
45+
bool IsEmpty() const {
46+
return Rows->empty();
47+
}
48+
49+
size_t Size() const {
50+
return Rows->size();
51+
}
52+
53+
bool IsReachLimits(const TUploadLimits& Limits) {
54+
// TODO(mbkkt) why [0..BatchRowsLimit) but [0..BatchBytesLimit]
55+
return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
56+
}
57+
58+
auto&& ExtractLastKey() {
59+
return std::move(LastKey);
60+
}
61+
62+
const auto& GetLastKey() const {
63+
return LastKey;
64+
}
65+
66+
private:
67+
std::shared_ptr<NTxProxy::TUploadRows> Rows;
68+
ui64 ByteSize = 0;
69+
TSerializedCellVec LastKey;
70+
};
71+
72+
}

0 commit comments

Comments
 (0)