Skip to content

Commit a536da4

Browse files
authored
YQ-3492 support resource pool classifiers objects saving (#7491)
1 parent 6e0b76c commit a536da4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1547
-118
lines changed

ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,18 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso
118118

119119
TPoolSettings resourcePoolSettings;
120120
auto& properties = *resourcePoolDescription.MutableProperties()->MutableProperties();
121-
for (const auto& [property, setting] : GetPropertiesMap(resourcePoolSettings, true)) {
121+
for (const auto& [property, setting] : resourcePoolSettings.GetPropertiesMap(true)) {
122122
if (std::optional<TString> value = featuresExtractor.Extract(property)) {
123123
try {
124-
std::visit(TSettingsParser{*value}, setting);
124+
std::visit(TPoolSettings::TParser{*value}, setting);
125125
} catch (...) {
126126
throw yexception() << "Failed to parse property " << property << ": " << CurrentExceptionMessage();
127127
}
128128
} else if (!featuresExtractor.ExtractResetFeature(property)) {
129129
continue;
130130
}
131131

132-
TString value = std::visit(TSettingsExtractor(), setting);
132+
const TString value = std::visit(TPoolSettings::TExtractor(), setting);
133133
properties.insert({property, value});
134134
}
135135

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#include "behaviour.h"
2+
#include "initializer.h"
3+
#include "manager.h"
4+
5+
6+
namespace NKikimr::NKqp {
7+
8+
TResourcePoolClassifierBehaviour::TFactory::TRegistrator<TResourcePoolClassifierBehaviour> TResourcePoolClassifierBehaviour::Registrator(TResourcePoolClassifierConfig::GetTypeId());
9+
10+
NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolClassifierBehaviour::ConstructInitializer() const {
11+
return std::make_shared<TResourcePoolClassifierInitializer>();
12+
}
13+
14+
NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolClassifierBehaviour::ConstructOperationsManager() const {
15+
return std::make_shared<TResourcePoolClassifierManager>();
16+
}
17+
18+
TString TResourcePoolClassifierBehaviour::GetInternalStorageTablePath() const {
19+
return "resource_pools/resource_pools_classifiers";
20+
}
21+
22+
TString TResourcePoolClassifierBehaviour::GetTypeId() const {
23+
return TResourcePoolClassifierConfig::GetTypeId();
24+
}
25+
26+
NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierBehaviour::GetInstance() {
27+
static std::shared_ptr<TResourcePoolClassifierBehaviour> result = std::make_shared<TResourcePoolClassifierBehaviour>();
28+
return result;
29+
}
30+
31+
} // namespace NKikimr::NKqp
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include "object.h"
4+
5+
#include <ydb/services/metadata/abstract/initialization.h>
6+
#include <ydb/services/metadata/abstract/kqp_common.h>
7+
8+
9+
namespace NKikimr::NKqp {
10+
11+
class TResourcePoolClassifierBehaviour : public NMetadata::TClassBehaviour<TResourcePoolClassifierConfig> {
12+
static TFactory::TRegistrator<TResourcePoolClassifierBehaviour> Registrator;
13+
14+
protected:
15+
virtual NMetadata::NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override;
16+
virtual NMetadata::NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override;
17+
virtual TString GetInternalStorageTablePath() const override;
18+
19+
public:
20+
virtual TString GetTypeId() const override;
21+
22+
static IClassBehaviour::TPtr GetInstance();
23+
};
24+
25+
} // namespace NKikimr::NKqp
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
#include "checker.h"
2+
3+
#include <ydb/core/base/path.h>
4+
#include <ydb/core/cms/console/configs_dispatcher.h>
5+
#include <ydb/core/kqp/workload_service/actors/actors.h>
6+
#include <ydb/core/kqp/workload_service/common/events.h>
7+
#include <ydb/core/protos/console_config.pb.h>
8+
#include <ydb/core/resource_pools/resource_pool_classifier_settings.h>
9+
10+
#include <ydb/library/query_actor/query_actor.h>
11+
12+
13+
namespace NKikimr::NKqp {
14+
15+
namespace {
16+
17+
using namespace NActors;
18+
using namespace NResourcePool;
19+
using namespace NWorkload;
20+
21+
22+
class TRanksCheckerActor : public NKikimr::TQueryBase {
23+
using TBase = NKikimr::TQueryBase;
24+
25+
public:
26+
TRanksCheckerActor(const TString& database, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
27+
: TBase(NKikimrServices::KQP_GATEWAY, sessionId)
28+
, Database(database)
29+
, RanksToCheck(ranksToCheck)
30+
{
31+
TxId = transactionId;
32+
SetOperationInfo(__func__, Database);
33+
}
34+
35+
void OnRunQuery() override {
36+
const auto& tablePath = TResourcePoolClassifierConfig::GetBehaviour()->GetStorageTablePath();
37+
38+
TStringBuilder sql = TStringBuilder() << R"(
39+
-- TRanksCheckerActor::OnRunQuery
40+
DECLARE $database AS Text;
41+
)";
42+
43+
NYdb::TParamsBuilder params;
44+
params
45+
.AddParam("$database")
46+
.Utf8(CanonizePath(Database))
47+
.Build();
48+
49+
if (!RanksToCheck.empty()) {
50+
sql << R"(
51+
DECLARE $ranks AS List<Int64>;
52+
PRAGMA AnsiInForEmptyOrNullableItemsCollections;
53+
54+
SELECT
55+
rank, name
56+
FROM `)" << tablePath << R"(`
57+
WHERE database = $database
58+
AND rank IN $ranks;
59+
)";
60+
61+
auto& param = params.AddParam("$ranks").BeginList();
62+
for (const auto& [rank, _] : RanksToCheck) {
63+
param.AddListItem().Int64(rank);
64+
}
65+
param.EndList().Build();
66+
67+
ExpectedResultSets++;
68+
}
69+
70+
sql << R"(
71+
SELECT
72+
MAX(rank) AS MaxRank,
73+
COUNT(*) AS NumberClassifiers
74+
FROM `)" << tablePath << R"(`
75+
WHERE database = $database;
76+
)";
77+
78+
RunDataQuery(sql, &params, TTxControl::ContinueTx());
79+
}
80+
81+
void OnQueryResult() override {
82+
if (ResultSets.size() != ExpectedResultSets) {
83+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
84+
return;
85+
}
86+
87+
ui64 resultSetId = 0;
88+
if (!RanksToCheck.empty()) {
89+
NYdb::TResultSetParser result(ResultSets[resultSetId++]);
90+
while (result.TryNextRow()) {
91+
TMaybe<i64> rank = result.ColumnParser("rank").GetOptionalInt64();
92+
if (!rank) {
93+
continue;
94+
}
95+
96+
TMaybe<TString> name = result.ColumnParser("name").GetOptionalUtf8();
97+
if (!name) {
98+
continue;
99+
}
100+
101+
if (auto it = RanksToCheck.find(*rank); it != RanksToCheck.end() && it->second != *name) {
102+
Finish(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Classifier with rank " << *rank << " already exists, its name " << *name);
103+
return;
104+
}
105+
}
106+
}
107+
108+
{ // Classifiers stats
109+
NYdb::TResultSetParser result(ResultSets[resultSetId++]);
110+
if (!result.TryNextRow()) {
111+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
112+
return;
113+
}
114+
115+
MaxRank = result.ColumnParser("MaxRank").GetOptionalInt64().GetOrElse(0);
116+
NumberClassifiers = result.ColumnParser("NumberClassifiers").GetUint64();
117+
}
118+
119+
Finish();
120+
}
121+
122+
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
123+
Send(Owner, new TEvPrivate::TEvRanksCheckerResponse(status, MaxRank, NumberClassifiers, std::move(issues)));
124+
}
125+
126+
private:
127+
const TString Database;
128+
const std::unordered_map<i64, TString> RanksToCheck;
129+
130+
ui64 ExpectedResultSets = 1;
131+
i64 MaxRank = 0;
132+
ui64 NumberClassifiers = 0;
133+
};
134+
135+
class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResourcePoolClassifierPreparationActor> {
136+
public:
137+
TResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext)
138+
: Context(context)
139+
, AlterContext(alterContext)
140+
, Controller(std::move(controller))
141+
, PatchedObjects(std::move(patchedObjects))
142+
{}
143+
144+
void Bootstrap() {
145+
Become(&TResourcePoolClassifierPreparationActor::StateFunc);
146+
ValidateRanks();
147+
GetDatabaseInfo();
148+
}
149+
150+
void Handle(TEvPrivate::TEvRanksCheckerResponse::TPtr& ev) {
151+
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
152+
FailAndPassAway("Resource pool classifier rank check failed", ev->Get()->Status, ev->Get()->Issues);
153+
return;
154+
}
155+
156+
if (ev->Get()->NumberClassifiers >= CLASSIFIER_COUNT_LIMIT) {
157+
FailAndPassAway(TStringBuilder() << "Number of resource pool classifiers reached limit in " << CLASSIFIER_COUNT_LIMIT);
158+
return;
159+
}
160+
161+
i64 maxRank = ev->Get()->MaxRank;
162+
for (auto& object : PatchedObjects) {
163+
if (object.GetRank() != -1) {
164+
continue;
165+
}
166+
if (maxRank > std::numeric_limits<i64>::max() - CLASSIFIER_RANK_OFFSET) {
167+
FailAndPassAway(TStringBuilder() << "The rank could not be set automatically, the maximum rank of the resource pool classifier is too high: " << ev->Get()->MaxRank);
168+
return;
169+
}
170+
171+
maxRank += CLASSIFIER_RANK_OFFSET;
172+
object.SetRank(maxRank);
173+
}
174+
175+
RanksChecked = true;
176+
TryFinish();
177+
}
178+
179+
void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
180+
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
181+
FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues);
182+
return;
183+
}
184+
185+
Serverless = ev->Get()->Serverless;
186+
187+
Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvGetConfigRequest(
188+
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
189+
), IEventHandle::FlagTrackDelivery);
190+
}
191+
192+
void Handle(TEvents::TEvUndelivered::TPtr& ev) {
193+
switch (ev->Get()->SourceType) {
194+
case NConsole::TEvConfigsDispatcher::EvGetConfigRequest:
195+
CheckFeatureFlag(AppData()->FeatureFlags);
196+
break;
197+
198+
default:
199+
break;
200+
}
201+
}
202+
203+
void Handle(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse::TPtr& ev) {
204+
CheckFeatureFlag(ev->Get()->Config->GetFeatureFlags());
205+
}
206+
207+
STRICT_STFUNC(StateFunc,
208+
hFunc(TEvPrivate::TEvRanksCheckerResponse, Handle);
209+
hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
210+
hFunc(TEvents::TEvUndelivered, Handle);
211+
hFunc(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse, Handle);
212+
)
213+
214+
private:
215+
void GetDatabaseInfo() const {
216+
const auto& externalContext = Context.GetExternalData();
217+
const auto userToken = externalContext.GetUserToken() ? MakeIntrusive<NACLib::TUserToken>(*externalContext.GetUserToken()) : nullptr;
218+
Register(CreateDatabaseFetcherActor(SelfId(), externalContext.GetDatabase(), userToken, NACLib::EAccessRights::GenericFull));
219+
}
220+
221+
void ValidateRanks() {
222+
if (Context.GetActivityType() == NMetadata::NModifications::IOperationsManager::EActivityType::Drop) {
223+
RanksChecked = true;
224+
TryFinish();
225+
return;
226+
}
227+
228+
std::unordered_map<i64, TString> ranksToNames;
229+
for (const auto& object : PatchedObjects) {
230+
const auto rank = object.GetRank();
231+
if (rank == -1) {
232+
continue;
233+
}
234+
if (!ranksToNames.insert({rank, object.GetName()}).second) {
235+
FailAndPassAway(TStringBuilder() << "Found duplicate rank " << rank);
236+
}
237+
}
238+
239+
Register(new TQueryRetryActor<TRanksCheckerActor, TEvPrivate::TEvRanksCheckerResponse, TString, TString, TString, std::unordered_map<i64, TString>>(
240+
SelfId(), Context.GetExternalData().GetDatabase(), AlterContext.SessionId, AlterContext.TransactionId, ranksToNames
241+
));
242+
}
243+
244+
void CheckFeatureFlag(const NKikimrConfig::TFeatureFlags& featureFlags) {
245+
if (Context.GetActivityType() == NMetadata::NModifications::IOperationsManager::EActivityType::Drop) {
246+
FeatureFlagChecked = true;
247+
TryFinish();
248+
return;
249+
}
250+
251+
if (!featureFlags.GetEnableResourcePools()) {
252+
FailAndPassAway("Resource pool classifiers are disabled. Please contact your system administrator to enable it");
253+
return;
254+
}
255+
if (Serverless && !featureFlags.GetEnableResourcePoolsOnServerless()) {
256+
FailAndPassAway("Resource pool classifiers are disabled for serverless domains. Please contact your system administrator to enable it");
257+
return;
258+
}
259+
260+
FeatureFlagChecked = true;
261+
TryFinish();
262+
}
263+
264+
void FailAndPassAway(const TString& message, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
265+
FailAndPassAway(TStringBuilder() << message << ", status: " << status << ", reason: " << issues.ToOneLineString());
266+
}
267+
268+
void FailAndPassAway(const TString& message) {
269+
Controller->OnPreparationProblem(message);
270+
PassAway();
271+
}
272+
273+
void TryFinish() {
274+
if (!FeatureFlagChecked || !RanksChecked) {
275+
return;
276+
}
277+
278+
Controller->OnPreparationFinished(std::move(PatchedObjects));
279+
PassAway();
280+
}
281+
282+
private:
283+
const NMetadata::NModifications::IOperationsManager::TInternalModificationContext Context;
284+
const NMetadata::NModifications::TAlterOperationContext AlterContext;
285+
286+
bool Serverless = false;
287+
bool FeatureFlagChecked = false;
288+
bool RanksChecked = false;
289+
290+
NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr Controller;
291+
std::vector<TResourcePoolClassifierConfig> PatchedObjects;
292+
};
293+
294+
} // anonymous namespace
295+
296+
IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext) {
297+
return new TResourcePoolClassifierPreparationActor(std::move(patchedObjects), std::move(controller), context, alterContext);
298+
}
299+
300+
} // namespace NKikimr::NKqp
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include "object.h"
4+
5+
#include <ydb/services/metadata/manager/generic_manager.h>
6+
7+
8+
namespace NKikimr::NKqp {
9+
10+
NActors::IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext);
11+
12+
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)