Skip to content

Commit 3311d10

Browse files
authored
create statistics aggregator tablet during migration (#1393)
1 parent ae12905 commit 3311d10

11 files changed

+221
-94
lines changed

ydb/core/protos/statistics.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,7 @@ message TEvPropagateStatistics {
5656
}
5757
repeated TStatsEntry Entries = 2;
5858
}
59+
60+
// SA -> nodes
61+
message TEvStatisticsIsDisabled {
62+
}

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,27 @@ void TStatisticsAggregator::DefaultSignalTabletActive(const TActorContext& ctx)
3535
Y_UNUSED(ctx);
3636
}
3737

38+
void TStatisticsAggregator::SubscribeForConfigChanges(const TActorContext& ctx) {
39+
ui32 configKind = (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem;
40+
ctx.Send(NConsole::MakeConfigsDispatcherID(ctx.SelfID.NodeId()),
41+
new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({configKind}));
42+
}
43+
44+
void TStatisticsAggregator::HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
45+
SA_LOG_I("[" << TabletID() << "] Subscribed for config changes");
46+
}
47+
48+
void TStatisticsAggregator::HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
49+
const auto& record = ev->Get()->Record;
50+
const auto& config = record.GetConfig();
51+
if (config.HasFeatureFlags()) {
52+
const auto& featureFlags = config.GetFeatureFlags();
53+
EnableStatistics = featureFlags.GetEnableStatistics();
54+
}
55+
auto response = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationResponse>(record);
56+
Send(ev->Sender, response.release(), 0, ev->Cookie);
57+
}
58+
3859
void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev) {
3960
auto pipeServerId = ev->Get()->ServerId;
4061

@@ -97,6 +118,12 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) {
97118
RequestedSchemeShards.insert(ssEntry.GetSchemeShardId());
98119
}
99120

121+
if (!EnableStatistics) {
122+
auto disabled = std::make_unique<TEvStatistics::TEvStatisticsIsDisabled>();
123+
Send(NStat::MakeStatServiceID(nodeId), disabled.release());
124+
return;
125+
}
126+
100127
if (!IsPropagateInFlight) {
101128
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
102129
IsPropagateInFlight = true;
@@ -124,6 +151,12 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) {
124151
<< ", node id = " << nodeId
125152
<< ", schemeshard count = " << record.NeedSchemeShardsSize());
126153

154+
if (!EnableStatistics) {
155+
auto disabled = std::make_unique<TEvStatistics::TEvStatisticsIsDisabled>();
156+
Send(NStat::MakeStatServiceID(nodeId), disabled.release());
157+
return;
158+
}
159+
127160
std::vector<TSSId> ssIds;
128161
ssIds.reserve(record.NeedSchemeShardsSize());
129162
for (const auto& ssId : record.GetNeedSchemeShards()) {
@@ -151,6 +184,10 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectSchemeShard::TPtr& e
151184
void TStatisticsAggregator::Handle(TEvPrivate::TEvFastPropagateCheck::TPtr&) {
152185
SA_LOG_D("[" << TabletID() << "] EvFastPropagateCheck");
153186

187+
if (!EnableStatistics) {
188+
return;
189+
}
190+
154191
PropagateFastStatistics();
155192

156193
FastCheckInFlight = false;
@@ -162,7 +199,9 @@ void TStatisticsAggregator::Handle(TEvPrivate::TEvFastPropagateCheck::TPtr&) {
162199
void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) {
163200
SA_LOG_D("[" << TabletID() << "] EvPropagate");
164201

165-
PropagateStatistics();
202+
if (EnableStatistics) {
203+
PropagateStatistics();
204+
}
166205

167206
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
168207
}
@@ -176,10 +215,10 @@ void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TS
176215
for (const auto& ssId : ssIds) {
177216
FastSchemeShards.insert(ssId);
178217
}
179-
if (!FastCheckInFlight) {
180-
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
181-
FastCheckInFlight = true;
182-
}
218+
}
219+
if (!FastCheckInFlight) {
220+
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
221+
FastCheckInFlight = true;
183222
}
184223
}
185224

@@ -247,6 +286,10 @@ void TStatisticsAggregator::PropagateFastStatistics() {
247286
void TStatisticsAggregator::PropagateStatisticsImpl(
248287
const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds)
249288
{
289+
if (nodeIds.empty() || ssIds.empty()) {
290+
return;
291+
}
292+
250293
TNodeId leadingNodeId = nodeIds[0];
251294

252295
for (size_t index = 0; index < ssIds.size(); ) {

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
#include <ydb/core/statistics/common.h>
1010
#include <ydb/core/statistics/events.h>
1111

12+
#include <ydb/core/cms/console/configs_dispatcher.h>
13+
#include <ydb/core/cms/console/console.h>
14+
1215
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
1316

1417
#include <random>
@@ -53,10 +56,14 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
5356
void OnActivateExecutor(const TActorContext& ctx) override;
5457
void DefaultSignalTabletActive(const TActorContext& ctx) override;
5558
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override;
59+
void SubscribeForConfigChanges(const TActorContext& ctx);
5660

5761
NTabletFlatExecutor::ITransaction* CreateTxInitSchema();
5862
NTabletFlatExecutor::ITransaction* CreateTxInit();
5963

64+
void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev);
65+
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev);
66+
6067
void Handle(TEvStatistics::TEvConfigureAggregator::TPtr& ev);
6168
void Handle(TEvStatistics::TEvSchemeShardStats::TPtr& ev);
6269
void Handle(TEvPrivate::TEvPropagate::TPtr& ev);
@@ -76,11 +83,13 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
7683
void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
7784

7885
STFUNC(StateInit) {
79-
StateInitImpl(ev,SelfId());
86+
StateInitImpl(ev, SelfId());
8087
}
8188

8289
STFUNC(StateWork) {
8390
switch(ev->GetTypeRewrite()) {
91+
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleConfig)
92+
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, HandleConfig)
8493
hFunc(TEvStatistics::TEvConfigureAggregator, Handle);
8594
hFunc(TEvStatistics::TEvSchemeShardStats, Handle);
8695
hFunc(TEvPrivate::TEvPropagate, Handle);
@@ -103,10 +112,12 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
103112

104113
std::mt19937_64 RandomGenerator;
105114

115+
bool EnableStatistics = false;
116+
106117
static constexpr size_t StatsOptimizeFirstNodesCount = 3; // optimize first nodes - fast propagation
107118
static constexpr size_t StatsSizeLimitBytes = 2 << 20; // limit for stats size in one message
108119

109-
TDuration PropagateInterval = TDuration::Minutes(3);
120+
TDuration PropagateInterval;
110121
bool IsPropagateInFlight = false; // is slow propagation started
111122

112123
std::unordered_map<TSSId, TString> BaseStats; // schemeshard id -> serialized stats for all paths

ydb/core/statistics/aggregator/tx_init.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include "aggregator_impl.h"
22

3+
#include <ydb/core/base/appdata_fwd.h>
4+
#include <ydb/core/base/feature_flags.h>
5+
36
namespace NKikimr::NStat {
47

58
struct TStatisticsAggregator::TTxInit : public TTxBase {
@@ -82,6 +85,10 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
8285
SA_LOG_D("[" << Self->TabletID() << "] TTxInit::Complete");
8386

8487
Self->SignalTabletActive(ctx);
88+
89+
Self->EnableStatistics = AppData(ctx)->FeatureFlags.GetEnableStatistics();
90+
Self->SubscribeForConfigChanges(ctx);
91+
8592
Self->Become(&TThis::StateWork);
8693
}
8794
};

ydb/core/statistics/events.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ struct TEvStatistics {
5454
EvConnectNode,
5555
EvRequestStats,
5656
EvPropagateStatistics,
57+
EvStatisticsIsDisabled,
5758

5859
EvEnd
5960
};
@@ -108,6 +109,12 @@ struct TEvStatistics {
108109
NKikimrStat::TEvPropagateStatistics,
109110
EvPropagateStatistics>
110111
{};
112+
113+
struct TEvStatisticsIsDisabled : public TEventPB<
114+
TEvStatisticsIsDisabled,
115+
NKikimrStat::TEvStatisticsIsDisabled,
116+
EvStatisticsIsDisabled>
117+
{};
111118
};
112119

113120
} // NStat

ydb/core/statistics/stat_service.cpp

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
4343
hFunc(TEvStatistics::TEvPropagateStatistics, Handle);
4444
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
4545
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
46+
hFunc(TEvStatistics::TEvStatisticsIsDisabled, Handle);
4647
cFunc(TEvents::TEvPoison::EventType, PassAway);
4748
default:
4849
LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
@@ -51,20 +52,21 @@ class TStatService : public TActorBootstrapped<TStatService> {
5152
}
5253

5354
private:
54-
bool IsSAUnavailable() {
55-
return ResolveSAStage == RSA_FINISHED && StatisticsAggregatorId == 0;
56-
}
57-
5855
void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
5956
LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
60-
"Subscribed for config changes");
57+
"Subscribed for config changes on node " << SelfId().NodeId());
6158
}
6259

6360
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
6461
const auto& record = ev->Get()->Record;
65-
const auto& featureFlags = record.GetConfig().GetFeatureFlags();
66-
EnableStatistics = featureFlags.GetEnableStatistics();
67-
62+
const auto& config = record.GetConfig();
63+
if (config.HasFeatureFlags()) {
64+
const auto& featureFlags = config.GetFeatureFlags();
65+
EnableStatistics = featureFlags.GetEnableStatistics();
66+
if (!EnableStatistics) {
67+
ReplyAllFailed();
68+
}
69+
}
6870
auto response = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationResponse>(record);
6971
Send(ev->Sender, response.release(), 0, ev->Cookie);
7072
}
@@ -77,7 +79,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
7779
request.EvCookie = ev->Cookie;
7880
request.StatRequests.swap(ev->Get()->StatRequests);
7981

80-
if (!EnableStatistics || IsSAUnavailable()) {
82+
if (!EnableStatistics) {
8183
ReplyFailed(requestId, true);
8284
return;
8385
}
@@ -106,12 +108,12 @@ class TStatService : public TActorBootstrapped<TStatService> {
106108
auto& entry = navigate->ResultSet.back();
107109
if (entry.Status != TNavigate::EStatus::Ok) {
108110
StatisticsAggregatorId = 0;
109-
} else {
111+
} else if (entry.DomainInfo->Params.HasStatisticsAggregator()) {
110112
StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator();
111113
}
112-
ResolveSAStage = RSA_FINISHED;
114+
ResolveSAStage = StatisticsAggregatorId ? RSA_FINISHED : RSA_INITIAL;
113115

114-
if (StatisticsAggregatorId != 0) {
116+
if (StatisticsAggregatorId) {
115117
ConnectToSA();
116118
SyncNode();
117119
} else {
@@ -127,15 +129,15 @@ class TStatService : public TActorBootstrapped<TStatService> {
127129
}
128130
auto& request = itRequest->second;
129131

130-
if (!EnableStatistics || IsSAUnavailable()) {
132+
if (!EnableStatistics) {
131133
ReplyFailed(requestId, true);
132134
return;
133135
}
134136

135137
std::unordered_set<ui64> ssIds;
136138
bool isServerless = false;
137139
ui64 aggregatorId = 0;
138-
TPathId resourcesDomainKey;
140+
TPathId domainKey, resourcesDomainKey;
139141
for (const auto& entry : navigate->ResultSet) {
140142
if (entry.Status != TNavigate::EStatus::Ok) {
141143
continue;
@@ -144,6 +146,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
144146
ssIds.insert(domainInfo->ExtractSchemeShard());
145147
aggregatorId = domainInfo->Params.GetStatisticsAggregator();
146148
isServerless = domainInfo->IsServerless();
149+
domainKey = domainInfo->DomainKey;
147150
resourcesDomainKey = domainInfo->ResourcesDomainKey;
148151
}
149152
if (ssIds.size() != 1) {
@@ -157,22 +160,31 @@ class TStatService : public TActorBootstrapped<TStatService> {
157160
return;
158161
}
159162

163+
auto navigateDomainKey = [this] (TPathId domainKey) {
164+
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
165+
auto navigate = std::make_unique<TNavigate>();
166+
auto& entry = navigate->ResultSet.emplace_back();
167+
entry.TableId = TTableId(domainKey.OwnerId, domainKey.LocalPathId);
168+
entry.Operation = TNavigate::EOp::OpPath;
169+
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
170+
entry.RedirectRequired = false;
171+
navigate->Cookie = ResolveSACookie;
172+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
173+
ResolveSAStage = RSA_IN_FLIGHT;
174+
};
175+
160176
switch (ResolveSAStage) {
161-
case RSA_NOT_RUN:
177+
case RSA_INITIAL:
162178
if (!isServerless) {
163-
StatisticsAggregatorId = aggregatorId;
164-
ResolveSAStage = RSA_FINISHED;
179+
if (aggregatorId) {
180+
StatisticsAggregatorId = aggregatorId;
181+
ResolveSAStage = RSA_FINISHED;
182+
} else {
183+
navigateDomainKey(domainKey);
184+
return;
185+
}
165186
} else {
166-
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
167-
auto navigate = std::make_unique<TNavigate>();
168-
auto& entry = navigate->ResultSet.emplace_back();
169-
entry.TableId = TTableId(resourcesDomainKey.OwnerId, resourcesDomainKey.LocalPathId);
170-
entry.Operation = TNavigate::EOp::OpPath;
171-
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
172-
entry.RedirectRequired = false;
173-
navigate->Cookie = ResolveSACookie;
174-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
175-
ResolveSAStage = RSA_IN_FLIGHT;
187+
navigateDomainKey(resourcesDomainKey);
176188
return;
177189
}
178190
break;
@@ -182,7 +194,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
182194
break;
183195
}
184196

185-
if (IsSAUnavailable()) {
197+
if (!StatisticsAggregatorId) {
186198
ReplyFailed(requestId, true);
187199
return;
188200
}
@@ -303,6 +315,10 @@ class TStatService : public TActorBootstrapped<TStatService> {
303315
SyncNode();
304316
}
305317

318+
void Handle(TEvStatistics::TEvStatisticsIsDisabled::TPtr&) {
319+
ReplyAllFailed();
320+
}
321+
306322
void ConnectToSA() {
307323
if (SAPipeClientId || !StatisticsAggregatorId) {
308324
return;
@@ -465,11 +481,11 @@ class TStatService : public TActorBootstrapped<TStatService> {
465481

466482
static const ui64 ResolveSACookie = std::numeric_limits<ui64>::max();
467483
enum EResolveSAStage {
468-
RSA_NOT_RUN,
484+
RSA_INITIAL,
469485
RSA_IN_FLIGHT,
470486
RSA_FINISHED
471487
};
472-
EResolveSAStage ResolveSAStage = RSA_NOT_RUN;
488+
EResolveSAStage ResolveSAStage = RSA_INITIAL;
473489
};
474490

475491
THolder<IActor> CreateStatService() {

0 commit comments

Comments
 (0)