Skip to content

Commit a98ff2b

Browse files
authored
use statistics aggregator for basic statistics KIKIMR-18323 (#623)
1 parent 06f9376 commit a98ff2b

21 files changed

+1002
-373
lines changed

ydb/core/protos/counters_statistics_aggregator.proto

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ option java_package = "ru.yandex.kikimr.proto";
77
option (TabletTypeName) = "StatisticsAggregator";
88

99
enum ETxTypes {
10-
TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}];
11-
TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}];
12-
TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}];
10+
TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}];
11+
TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}];
12+
TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}];
13+
TXTYPE_SCHEMESHARD_STATS = 3 [(TxTypeOpts) = {Name: "TxSchemeShardStats"}];
1314
}

ydb/core/protos/statistics.proto

+45-11
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,55 @@ package NKikimrStat;
44

55
option java_package = "ru.yandex.kikimr.proto";
66

7-
message TEvBroadcastStatistics {
8-
message TEntry {
9-
optional NKikimrProto.TPathID PathId = 1;
10-
optional uint64 RowCount = 2;
11-
optional uint64 BytesSize = 3;
7+
message TEvConfigureAggregator {
8+
optional string Database = 1;
9+
}
10+
11+
message TPathEntry {
12+
optional NKikimrProto.TPathID PathId = 1;
13+
optional uint64 RowCount = 2;
14+
optional uint64 BytesSize = 3;
15+
}
16+
17+
message TSchemeShardStats {
18+
repeated TPathEntry Entries = 1;
19+
}
20+
21+
// SS -> SA
22+
message TEvConnectSchemeShard {
23+
optional fixed64 SchemeShardId = 1;
24+
}
25+
26+
// SS -> SA
27+
message TEvSchemeShardStats {
28+
optional fixed64 SchemeShardId = 1;
29+
optional bytes Stats = 2; // serialized TSchemeShardStats
30+
}
31+
32+
// nodes -> SA
33+
message TEvConnectNode {
34+
optional uint32 NodeId = 1;
35+
repeated fixed64 NeedSchemeShards = 2;
36+
message THaveEntry {
37+
optional fixed64 SchemeShardId = 1;
38+
optional uint64 Timestamp = 2;
1239
}
13-
repeated uint32 NodeIds = 1;
14-
repeated TEntry Entries = 2;
40+
repeated THaveEntry HaveSchemeShards = 3;
1541
}
1642

17-
message TEvRegisterNode {
43+
// nodes -> SA
44+
message TEvRequestStats {
1845
optional uint32 NodeId = 1;
19-
optional bool HasStatistics = 2;
46+
repeated fixed64 NeedSchemeShards = 2;
2047
}
2148

22-
message TEvConfigureAggregator {
23-
optional string Database = 1;
49+
// SA -> nodes
50+
message TEvPropagateStatistics {
51+
repeated uint32 NodeIds = 1; // hierarchical propagation
52+
message TStatsEntry {
53+
optional fixed64 SchemeShardId = 1;
54+
optional bytes Stats = 2; // serialized TSchemeShardStats
55+
optional uint64 Timestamp = 3;
56+
}
57+
repeated TStatsEntry Entries = 2;
2458
}

ydb/core/statistics/aggregator/aggregator.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
namespace NKikimr::NStat {
66

77
IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info) {
8-
return new TStatisticsAggregator(tablet, info);
8+
return new TStatisticsAggregator(tablet, info, false);
9+
}
10+
11+
IActor* CreateStatisticsAggregatorForTests(const NActors::TActorId& tablet, TTabletStorageInfo* info) {
12+
return new TStatisticsAggregator(tablet, info, true);
913
}
1014

1115
} // NKikimr::NStat

ydb/core/statistics/aggregator/aggregator.h

+2
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ namespace NKikimr::NStat {
77

88
IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info);
99

10+
IActor* CreateStatisticsAggregatorForTests(const NActors::TActorId& tablet, TTabletStorageInfo* info);
11+
1012
} // NKikimr::NStat

ydb/core/statistics/aggregator/aggregator_impl.cpp

+243-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
#include "aggregator_impl.h"
22

33
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
4+
#include <ydb/core/statistics/stat_service.h>
45

56
#include <library/cpp/monlib/service/pages/templates.h>
67

78
namespace NKikimr::NStat {
89

9-
TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info)
10+
TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info, bool forTests)
1011
: TActor(&TThis::StateInit)
1112
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
12-
{}
13+
{
14+
PropagateInterval = forTests ? TDuration::Seconds(5) : TDuration::Minutes(3);
15+
16+
auto seed = std::random_device{}();
17+
RandomGenerator.seed(seed);
18+
}
1319

1420
void TStatisticsAggregator::OnDetach(const TActorContext& ctx) {
1521
Die(ctx);
@@ -29,8 +35,241 @@ void TStatisticsAggregator::DefaultSignalTabletActive(const TActorContext& ctx)
2935
Y_UNUSED(ctx);
3036
}
3137

32-
void TStatisticsAggregator::Handle(TEvPrivate::TEvProcess::TPtr&) {
33-
SA_LOG_D("[" << TabletID() << "] Handle TEvPrivate::TEvProcess");
38+
void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev) {
39+
auto pipeServerId = ev->Get()->ServerId;
40+
41+
SA_LOG_D("[" << TabletID() << "] EvServerConnected"
42+
<< ", pipe server id = " << pipeServerId);
43+
}
44+
45+
void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev) {
46+
auto pipeServerId = ev->Get()->ServerId;
47+
48+
SA_LOG_D("[" << TabletID() << "] EvServerDisconnected"
49+
<< ", pipe server id = " << pipeServerId);
50+
51+
auto itNodeServer = NodePipes.find(pipeServerId);
52+
if (itNodeServer != NodePipes.end()) {
53+
auto nodeId = itNodeServer->second;
54+
auto itNode = Nodes.find(nodeId);
55+
if (itNode != Nodes.end()) {
56+
--itNode->second;
57+
if (itNode->second == 0) {
58+
Nodes.erase(itNode);
59+
}
60+
}
61+
NodePipes.erase(itNodeServer);
62+
return;
63+
}
64+
65+
auto itShardServer = SchemeShardPipes.find(pipeServerId);
66+
if (itShardServer != SchemeShardPipes.end()) {
67+
auto ssId = itShardServer->second;
68+
auto itShard = SchemeShards.find(ssId);
69+
if (itShard != SchemeShards.end()) {
70+
--itShard->second;
71+
if (itShard->second == 0) {
72+
SchemeShards.erase(itShard);
73+
}
74+
}
75+
SchemeShardPipes.erase(itShardServer);
76+
return;
77+
}
78+
}
79+
80+
void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) {
81+
const auto& record = ev->Get()->Record;
82+
const TNodeId nodeId = record.GetNodeId();
83+
auto pipeServerId = ev->Recipient;
84+
85+
SA_LOG_D("[" << TabletID() << "] EvConnectNode"
86+
<< ", pipe server id = " << pipeServerId
87+
<< ", node id = " << nodeId
88+
<< ", have schemeshards count = " << record.HaveSchemeShardsSize()
89+
<< ", need schemeshards count = " << record.NeedSchemeShardsSize());
90+
91+
if (NodePipes.find(pipeServerId) == NodePipes.end()) {
92+
NodePipes[pipeServerId] = nodeId;
93+
++Nodes[nodeId];
94+
}
95+
96+
for (const auto& ssEntry : record.GetHaveSchemeShards()) {
97+
RequestedSchemeShards.insert(ssEntry.GetSchemeShardId());
98+
}
99+
100+
if (!IsPropagateInFlight) {
101+
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
102+
IsPropagateInFlight = true;
103+
}
104+
105+
std::vector<TSSId> ssIds;
106+
ssIds.reserve(record.NeedSchemeShardsSize());
107+
for (const auto& ssId : record.GetNeedSchemeShards()) {
108+
ssIds.push_back(ssId);
109+
RequestedSchemeShards.insert(ssId);
110+
}
111+
112+
ProcessRequests(nodeId, ssIds);
113+
}
114+
115+
void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) {
116+
const auto& record = ev->Get()->Record;
117+
const auto nodeId = record.GetNodeId();
118+
119+
SA_LOG_D("[" << TabletID() << "] EvRequestStats"
120+
<< ", node id = " << nodeId
121+
<< ", schemeshard count = " << record.NeedSchemeShardsSize());
122+
123+
std::vector<TSSId> ssIds;
124+
ssIds.reserve(record.NeedSchemeShardsSize());
125+
for (const auto& ssId : record.GetNeedSchemeShards()) {
126+
ssIds.push_back(ssId);
127+
}
128+
129+
ProcessRequests(nodeId, ssIds);
130+
}
131+
132+
void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectSchemeShard::TPtr& ev) {
133+
const auto& record = ev->Get()->Record;
134+
const TSSId schemeShardId = record.GetSchemeShardId();
135+
auto pipeServerId = ev->Recipient;
136+
137+
if (SchemeShardPipes.find(pipeServerId) == SchemeShardPipes.end()) {
138+
SchemeShardPipes[pipeServerId] = schemeShardId;
139+
++SchemeShards[schemeShardId];
140+
}
141+
142+
SA_LOG_D("[" << TabletID() << "] EvConnectSchemeShard"
143+
<< ", pipe server id = " << pipeServerId
144+
<< ", schemeshard id = " << schemeShardId);
145+
}
146+
147+
void TStatisticsAggregator::Handle(TEvPrivate::TEvFastPropagateCheck::TPtr&) {
148+
SA_LOG_D("[" << TabletID() << "] EvFastPropagateCheck");
149+
150+
PropagateFastStatistics();
151+
152+
FastCheckInFlight = false;
153+
FastCounter = StatsOptimizeFirstNodesCount;
154+
FastNodes.clear();
155+
FastSchemeShards.clear();
156+
}
157+
158+
void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) {
159+
SA_LOG_D("[" << TabletID() << "] EvPropagate");
160+
161+
PropagateStatistics();
162+
163+
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
164+
}
165+
166+
void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TSSId>& ssIds) {
167+
if (FastCounter > 0) {
168+
--FastCounter;
169+
SendStatisticsToNode(nodeId, ssIds);
170+
} else {
171+
FastNodes.insert(nodeId);
172+
for (const auto& ssId : ssIds) {
173+
FastSchemeShards.insert(ssId);
174+
}
175+
if (!FastCheckInFlight) {
176+
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
177+
FastCheckInFlight = true;
178+
}
179+
}
180+
}
181+
182+
void TStatisticsAggregator::SendStatisticsToNode(TNodeId nodeId, const std::vector<TSSId>& ssIds) {
183+
SA_LOG_D("[" << TabletID() << "] SendStatisticsToNode()"
184+
<< ", node id = " << nodeId
185+
<< ", schemeshard count = " << ssIds.size());
186+
187+
std::vector<TNodeId> nodeIds;
188+
nodeIds.push_back(nodeId);
189+
190+
PropagateStatisticsImpl(nodeIds, ssIds);
191+
}
192+
193+
void TStatisticsAggregator::PropagateStatistics() {
194+
SA_LOG_D("[" << TabletID() << "] PropagateStatistics()"
195+
<< ", node count = " << Nodes.size()
196+
<< ", schemeshard count = " << RequestedSchemeShards.size());
197+
198+
if (Nodes.empty() || RequestedSchemeShards.empty()) {
199+
return;
200+
}
201+
202+
std::vector<TNodeId> nodeIds;
203+
nodeIds.reserve(Nodes.size());
204+
for (const auto& [nodeId, _] : Nodes) {
205+
nodeIds.push_back(nodeId);
206+
}
207+
std::shuffle(std::begin(nodeIds), std::end(nodeIds), RandomGenerator);
208+
209+
std::vector<TSSId> ssIds;
210+
ssIds.reserve(RequestedSchemeShards.size());
211+
for (const auto& ssId : RequestedSchemeShards) {
212+
ssIds.push_back(ssId);
213+
}
214+
215+
PropagateStatisticsImpl(nodeIds, ssIds);
216+
}
217+
218+
void TStatisticsAggregator::PropagateFastStatistics() {
219+
SA_LOG_D("[" << TabletID() << "] PropagateFastStatistics()"
220+
<< ", node count = " << FastNodes.size()
221+
<< ", schemeshard count = " << FastSchemeShards.size());
222+
223+
if (FastNodes.empty() || FastSchemeShards.empty()) {
224+
return;
225+
}
226+
227+
std::vector<TNodeId> nodeIds;
228+
nodeIds.reserve(FastNodes.size());
229+
for (const auto& nodeId : FastNodes) {
230+
nodeIds.push_back(nodeId);
231+
}
232+
std::shuffle(std::begin(nodeIds), std::end(nodeIds), RandomGenerator);
233+
234+
std::vector<TSSId> ssIds;
235+
ssIds.reserve(FastSchemeShards.size());
236+
for (const auto& ssId : FastSchemeShards) {
237+
ssIds.push_back(ssId);
238+
}
239+
240+
PropagateStatisticsImpl(nodeIds, ssIds);
241+
}
242+
243+
void TStatisticsAggregator::PropagateStatisticsImpl(
244+
const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds)
245+
{
246+
TNodeId leadingNodeId = nodeIds[0];
247+
248+
for (size_t index = 0; index < ssIds.size(); ) {
249+
auto propagate = std::make_unique<TEvStatistics::TEvPropagateStatistics>();
250+
auto* record = propagate->MutableRecord();
251+
record->MutableNodeIds()->Reserve(nodeIds.size() - 1);
252+
for (size_t i = 1; i < nodeIds.size(); ++i) {
253+
record->AddNodeIds(nodeIds[i]);
254+
}
255+
for (size_t size = 0; index < ssIds.size(); ++index) {
256+
auto ssId = ssIds[index];
257+
auto* entry = record->AddEntries();
258+
entry->SetSchemeShardId(ssId);
259+
auto itStats = BaseStats.find(ssId);
260+
if (itStats != BaseStats.end()) {
261+
entry->SetStats(itStats->second);
262+
size += itStats->second.size();
263+
} else {
264+
entry->SetStats(TString()); // stats are not sent from SA yet
265+
}
266+
if (size >= StatsSizeLimitBytes) {
267+
++index;
268+
break;
269+
}
270+
}
271+
Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release());
272+
}
34273
}
35274

36275
void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {

0 commit comments

Comments
 (0)