Skip to content

Commit f773909

Browse files
authored
Statistics proto refactoring + ANALYZE command (#7132)
1 parent 9d52924 commit f773909

16 files changed

+181
-133
lines changed

ydb/core/protos/statistics.proto

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -68,56 +68,85 @@ message TEvPropagateStatisticsResponse {
6868
message TEvStatisticsIsDisabled {
6969
}
7070

71-
message TEvScanTable {
72-
optional NKikimrProto.TPathID PathId = 1;
71+
enum EColumnStatisticType {
72+
TYPE_UNSPECIFIED = 0;
73+
TYPE_COUNT_MIN_SKETCH = 1;
74+
}
75+
76+
// table to gather statistics from
77+
message TTable {
78+
optional NKikimrProto.TPathID PathId = 1; // table path
79+
repeated uint32 ColumnTags = 2; // list of columns to gather statistics from. Empty means asking for every column.
7380
}
7481

75-
message TEvScanTableAccepted {
76-
optional uint64 OperationId = 1;
82+
// KQP -> SA
83+
message TEvAnalyze {
84+
optional uint64 Cookie = 1; // request cookie to match response item
85+
repeated TTable Tables = 2; // list of analyzed tables and columns
86+
repeated EColumnStatisticType Types = 3; // list of statistics types requested. Empty means asking for all available.
7787
}
7888

79-
message TEvScanTableResponse {
89+
// SA -> KQP
90+
message TEvAnalyzeResponse {
91+
optional uint64 Cookie = 1;
8092
}
8193

82-
message TEvGetScanStatus {
94+
// KQP -> SA
95+
message TEvAnalyzeStatus {
8396
optional NKikimrProto.TPathID PathId = 1;
8497
}
8598

86-
message TEvGetScanStatusResponse {
99+
// SA -> KQP
100+
message TEvAnalyzeStatusResponse {
101+
optional NKikimrProto.TPathID PathId = 1;
102+
87103
enum EStatus {
88-
NO_OPERATION = 0;
89-
ENQUEUED = 1;
90-
IN_PROGRESS = 2;
104+
STATUS_UNSPECIFIED = 0;
105+
STATUS_NO_OPERATION = 1;
106+
STATUS_ENQUEUED = 2;
107+
STATUS_IN_PROGRESS = 3;
91108
}
92-
optional EStatus Status = 1;
109+
optional EStatus Status = 2;
93110
}
94111

112+
// SA -> Shard
113+
message TEvAnalyzeTable {
114+
optional TTable Table = 1; // analyzed table
115+
repeated EColumnStatisticType Types = 2; // list of statistics types requested. Empty means asking for all available.
116+
}
117+
118+
// Shard -> SA
119+
message TEvAnalyzeTableResponse {
120+
optional NKikimrProto.TPathID PathId = 1;
121+
}
122+
123+
95124
message TEvStatisticsRequest {
96-
optional NKikimrDataEvents.TTableId TableId = 1;
125+
optional TTable Table = 1;
126+
97127
optional bytes StartKey = 2;
98-
// list of columns to gather statistics from. Empty means asking for every column.
99-
repeated uint32 ColumnTags = 3;
100-
// list of statistics types requested. Empty means asking for all available.
101-
repeated uint32 Types = 4;
128+
129+
repeated EColumnStatisticType Types = 3;
102130
}
103131

104132
message TStatistic {
105133
optional uint32 Type = 1;
106134
optional bytes Data = 2;
107135
}
108136

109-
message TColumn {
137+
message TColumnStatistics {
110138
optional uint32 Tag = 1;
111139
repeated TStatistic Statistics = 2;
112140
}
113141

114142
message TEvStatisticsResponse {
115-
repeated TColumn Columns = 1;
143+
repeated TColumnStatistics Columns = 1;
116144

117145
enum EStatus {
118-
SUCCESS = 1;
119-
ABORTED = 2;
120-
ERROR = 3;
146+
STATUS_UNSPECIFIED = 0;
147+
STATUS_SUCCESS = 1;
148+
STATUS_ABORTED = 2;
149+
STATUS_ERROR = 3;
121150
}
122151
optional EStatus Status = 2;
123152
optional fixed64 ShardTabletId = 3;
@@ -144,10 +173,11 @@ message TEvAggregateKeepAliveAck {
144173

145174
message TEvAggregateStatisticsResponse {
146175
optional uint64 Round = 1;
147-
repeated TColumn Columns = 2;
176+
repeated TColumnStatistics Columns = 2;
148177
enum EErrorType {
149-
UnavailableNode = 1;
150-
NonLocalTablet = 2;
178+
TYPE_UNSPECIFIED = 0;
179+
TYPE_UNAVAILABLE_NODE = 1;
180+
TYPE_NON_LOCAL_TABLET = 2;
151181
}
152182
message TFailedTablet {
153183
optional EErrorType Error = 1;

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -431,21 +431,21 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvStatTableCreationResponse::
431431
}
432432
}
433433

434-
void TStatisticsAggregator::Handle(TEvStatistics::TEvGetScanStatus::TPtr& ev) {
434+
void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
435435
auto& inRecord = ev->Get()->Record;
436436
auto pathId = PathIdFromPathId(inRecord.GetPathId());
437437

438-
auto response = std::make_unique<TEvStatistics::TEvGetScanStatusResponse>();
438+
auto response = std::make_unique<TEvStatistics::TEvAnalyzeStatusResponse>();
439439
auto& outRecord = response->Record;
440440

441441
if (ScanTableId.PathId == pathId) {
442-
outRecord.SetStatus(NKikimrStat::TEvGetScanStatusResponse::IN_PROGRESS);
442+
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
443443
} else {
444444
auto it = ScanOperationsByPathId.find(pathId);
445445
if (it != ScanOperationsByPathId.end()) {
446-
outRecord.SetStatus(NKikimrStat::TEvGetScanStatusResponse::ENQUEUED);
446+
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
447447
} else {
448-
outRecord.SetStatus(NKikimrStat::TEvGetScanStatusResponse::NO_OPERATION);
448+
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
449449
}
450450
}
451451
Send(ev->Sender, response.release(), 0, ev->Cookie);
@@ -518,8 +518,9 @@ void TStatisticsAggregator::NextRange() {
518518
auto& range = ShardRanges.front();
519519
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
520520
auto& record = request->Record;
521-
record.MutableTableId()->SetOwnerId(ScanTableId.PathId.OwnerId);
522-
record.MutableTableId()->SetTableId(ScanTableId.PathId.LocalPathId);
521+
auto* path = record.MutableTable()->MutablePathId();
522+
path->SetOwnerId(ScanTableId.PathId.OwnerId);
523+
path->SetLocalId(ScanTableId.PathId.LocalPathId);
523524
record.SetStartKey(StartKey.GetBuffer());
524525

525526
Send(MakePipePerNodeCacheID(false),

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
120120
size_t PropagatePart(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds,
121121
size_t lastSSIndex, bool useSizeLimit);
122122

123-
void Handle(TEvStatistics::TEvScanTable::TPtr& ev);
123+
void Handle(TEvStatistics::TEvAnalyze::TPtr& ev);
124124
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
125125
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev);
126126
void Handle(NStat::TEvStatistics::TEvStatisticsResponse::TPtr& ev);
@@ -129,7 +129,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
129129
void Handle(TEvStatistics::TEvSaveStatisticsQueryResponse::TPtr& ev);
130130
void Handle(TEvStatistics::TEvDeleteStatisticsQueryResponse::TPtr& ev);
131131
void Handle(TEvPrivate::TEvScheduleScan::TPtr& ev);
132-
void Handle(TEvStatistics::TEvGetScanStatus::TPtr& ev);
132+
void Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev);
133133
void Handle(TEvHive::TEvResponseTabletDistribution::TPtr& ev);
134134
void Handle(TEvStatistics::TEvAggregateStatisticsResponse::TPtr& ev);
135135
void Handle(TEvPrivate::TEvResolve::TPtr& ev);
@@ -176,7 +176,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
176176
hFunc(TEvPrivate::TEvProcessUrgent, Handle);
177177
hFunc(TEvPrivate::TEvPropagateTimeout, Handle);
178178

179-
hFunc(TEvStatistics::TEvScanTable, Handle);
179+
hFunc(TEvStatistics::TEvAnalyze, Handle);
180180
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
181181
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
182182
hFunc(NStat::TEvStatistics::TEvStatisticsResponse, Handle);
@@ -185,7 +185,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
185185
hFunc(TEvStatistics::TEvSaveStatisticsQueryResponse, Handle);
186186
hFunc(TEvStatistics::TEvDeleteStatisticsQueryResponse, Handle);
187187
hFunc(TEvPrivate::TEvScheduleScan, Handle);
188-
hFunc(TEvStatistics::TEvGetScanStatus, Handle);
188+
hFunc(TEvStatistics::TEvAnalyzeStatus, Handle);
189189
hFunc(TEvHive::TEvResponseTabletDistribution, Handle);
190190
hFunc(TEvStatistics::TEvAggregateStatisticsResponse, Handle);
191191
hFunc(TEvPrivate::TEvResolve, Handle);

ydb/core/statistics/aggregator/tx_aggr_stat_response.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,16 @@ struct TStatisticsAggregator::TTxAggregateStatisticsResponse : public TTxBase {
6363
for (auto& tablet : Record.GetFailedTablets()) {
6464
auto error = tablet.GetError();
6565
switch (error) {
66-
case NKikimrStat::TEvAggregateStatisticsResponse::UnavailableNode:
66+
case NKikimrStat::TEvAggregateStatisticsResponse::TYPE_UNSPECIFIED:
67+
SA_LOG_CRIT("[" << Self->TabletID() << "] Unspecified TEvAggregateStatisticsResponse status");
68+
return false;
69+
70+
case NKikimrStat::TEvAggregateStatisticsResponse::TYPE_UNAVAILABLE_NODE:
6771
Self->TabletsForReqDistribution.insert(tablet.GetTabletId());
6872
Action = EAction::SendReqDistribution;
6973
break;
7074

71-
case NKikimrStat::TEvAggregateStatisticsResponse::NonLocalTablet:
75+
case NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET:
7276
auto nodeId = tablet.GetNodeId();
7377
if (nodeId == 0) {
7478
// we cannot reach this tablet

ydb/core/statistics/aggregator/tx_delete_query_response.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct TStatisticsAggregator::TTxDeleteQueryResponse : public TTxBase {
2828
SA_LOG_D("[" << Self->TabletID() << "] TTxDeleteQueryResponse::Complete");
2929

3030
for (auto& id : ReplyToActorIds) {
31-
ctx.Send(id, new TEvStatistics::TEvScanTableResponse);
31+
ctx.Send(id, new TEvStatistics::TEvAnalyzeResponse);
3232
}
3333
}
3434
};

ydb/core/statistics/aggregator/tx_save_query_response.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct TStatisticsAggregator::TTxSaveQueryResponse : public TTxBase {
2828
SA_LOG_D("[" << Self->TabletID() << "] TTxSaveQueryResponse::Complete");
2929

3030
for (auto& id : ReplyToActorIds) {
31-
ctx.Send(id, new TEvStatistics::TEvScanTableResponse);
31+
ctx.Send(id, new TEvStatistics::TEvAnalyzeResponse);
3232
}
3333
}
3434
};

ydb/core/statistics/aggregator/tx_scan_table.cpp

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
namespace NKikimr::NStat {
66

77
struct TStatisticsAggregator::TTxScanTable : public TTxBase {
8-
NKikimrStat::TEvScanTable Record;
8+
TPathId PathId;
99
TActorId ReplyToActorId;
1010
ui64 OperationId = 0;
1111

12-
TTxScanTable(TSelf* self, NKikimrStat::TEvScanTable&& record, TActorId replyToActorId)
12+
TTxScanTable(TSelf* self, const TPathId& pathId, TActorId replyToActorId)
1313
: TTxBase(self)
14-
, Record(std::move(record))
14+
, PathId(pathId)
1515
, ReplyToActorId(replyToActorId)
1616
{}
1717

@@ -24,9 +24,7 @@ struct TStatisticsAggregator::TTxScanTable : public TTxBase {
2424
return true;
2525
}
2626

27-
auto pathId = PathIdFromPathId(Record.GetPathId());
28-
29-
auto itOp = Self->ScanOperationsByPathId.find(pathId);
27+
auto itOp = Self->ScanOperationsByPathId.find(PathId);
3028
if (itOp != Self->ScanOperationsByPathId.end()) {
3129
itOp->second.ReplyToActorIds.insert(ReplyToActorId);
3230
OperationId = itOp->second.OperationId;
@@ -35,17 +33,17 @@ struct TStatisticsAggregator::TTxScanTable : public TTxBase {
3533

3634
NIceDb::TNiceDb db(txc.DB);
3735

38-
TScanOperation& operation = Self->ScanOperationsByPathId[pathId];
39-
operation.PathId = pathId;
36+
TScanOperation& operation = Self->ScanOperationsByPathId[PathId];
37+
operation.PathId = PathId;
4038
operation.OperationId = ++Self->LastScanOperationId;
4139
operation.ReplyToActorIds.insert(ReplyToActorId);
4240
Self->ScanOperations.PushBack(&operation);
4341

4442
Self->PersistLastScanOperationId(db);
4543

4644
db.Table<Schema::ScanOperations>().Key(operation.OperationId).Update(
47-
NIceDb::TUpdate<Schema::ScanOperations::OwnerId>(pathId.OwnerId),
48-
NIceDb::TUpdate<Schema::ScanOperations::LocalPathId>(pathId.LocalPathId));
45+
NIceDb::TUpdate<Schema::ScanOperations::OwnerId>(PathId.OwnerId),
46+
NIceDb::TUpdate<Schema::ScanOperations::LocalPathId>(PathId.LocalPathId));
4947

5048
OperationId = operation.OperationId;
5149

@@ -54,21 +52,17 @@ struct TStatisticsAggregator::TTxScanTable : public TTxBase {
5452

5553
void Complete(const TActorContext& ctx) override {
5654
SA_LOG_D("[" << Self->TabletID() << "] TTxScanTable::Complete");
55+
}
56+
};
5757

58-
if (!Self->EnableColumnStatistics) {
59-
return;
60-
}
58+
void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyze::TPtr& ev) {
59+
const auto& record = ev->Get()->Record;
6160

62-
auto accepted = std::make_unique<TEvStatistics::TEvScanTableAccepted>();
63-
accepted->Record.SetOperationId(OperationId);
64-
ctx.Send(ReplyToActorId, accepted.release());
61+
// TODO: replace by queue
62+
for (const auto& table : record.GetTables()) {
63+
Execute(new TTxScanTable(this, PathIdFromPathId(table.GetPathId()), ev->Sender), TActivationContext::AsActorContext());
6564
}
66-
};
6765

68-
void TStatisticsAggregator::Handle(TEvStatistics::TEvScanTable::TPtr& ev) {
69-
auto& record = ev->Get()->Record;
70-
Execute(new TTxScanTable(this, std::move(record), ev->Sender),
71-
TActivationContext::AsActorContext());
7266
}
7367

7468
} // NKikimr::NStat

ydb/core/statistics/aggregator/ut/ut_aggregator.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,18 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
175175
runtime.SimulateSleep(TDuration::Seconds(5));
176176
initThread.join();
177177

178-
ui64 tabletId = 0;
178+
ui64 tabletId;
179179
auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &tabletId);
180180

181181
runtime.SimulateSleep(TDuration::Seconds(30));
182182

183-
auto ev = std::make_unique<TEvStatistics::TEvScanTable>();
183+
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
184184
auto& record = ev->Record;
185-
PathIdFromPathId(pathId, record.MutablePathId());
185+
PathIdFromPathId(pathId, record.AddTables()->MutablePathId());
186186

187187
auto sender = runtime.AllocateEdgeActor();
188188
runtime.SendToPipe(tabletId, sender, ev.release());
189-
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvScanTableResponse>(sender);
189+
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
190190

191191
ValidateCountMin(runtime, pathId);
192192
}
@@ -198,17 +198,30 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
198198
CreateUniformTable(env, "Database", "Table1");
199199
CreateUniformTable(env, "Database", "Table2");
200200
};
201+
// TODO remove thread
201202
std::thread initThread(init);
202203

203204
auto& runtime = *env.GetServer().GetRuntime();
204205
runtime.SimulateSleep(TDuration::Seconds(5));
205206
initThread.join();
206207

207-
runtime.SimulateSleep(TDuration::Seconds(60));
208+
// TODO remove sleep
209+
runtime.SimulateSleep(TDuration::Seconds(30));
208210

209-
auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1");
211+
ui64 tabletId1;
212+
auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1", nullptr, &tabletId1);
210213
auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2");
211214

215+
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
216+
auto& record = ev->Record;
217+
PathIdFromPathId(pathId1, record.AddTables()->MutablePathId());
218+
PathIdFromPathId(pathId2, record.AddTables()->MutablePathId());
219+
220+
auto sender = runtime.AllocateEdgeActor();
221+
runtime.SendToPipe(tabletId1, sender, ev.release());
222+
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
223+
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
224+
212225
ValidateCountMin(runtime, pathId1);
213226
ValidateCountMin(runtime, pathId2);
214227
}
@@ -333,9 +346,9 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
333346
runtime.SimulateSleep(TDuration::Seconds(5));
334347
init2Thread.join();
335348

336-
auto ev = std::make_unique<TEvStatistics::TEvScanTable>();
349+
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
337350
auto& record = ev->Record;
338-
PathIdFromPathId(pathId, record.MutablePathId());
351+
PathIdFromPathId(pathId, record.AddTables()->MutablePathId());
339352

340353
auto sender = runtime.AllocateEdgeActor();
341354
runtime.SendToPipe(tabletId, sender, ev.release());

0 commit comments

Comments
 (0)