Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 487181b

Browse files
authoredApr 5, 2025
Merge 3adc0a4 into 64938b5
2 parents 64938b5 + 3adc0a4 commit 487181b

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed
 

‎ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -791,12 +791,13 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
791791
{
792792
using TBase = TControlPlaneStorageBase;
793793

794+
using TQueryQuotasMap = THashMap<TString, std::array<ui32, 3>>; // 3 = max(FederatedQuery::QueryContent::QueryType) + 1
795+
794796
::NFq::TYqSharedResources::TPtr YqSharedResources;
795797

796798
NKikimr::TYdbCredentialsProviderFactory CredProviderFactory;
797-
798799
// Query Quota
799-
THashMap<TString, ui32> QueryQuotas;
800+
TQueryQuotasMap QueryQuotas;
800801
THashMap<TString, TEvQuotaService::TQuotaUsageRequest::TPtr> QueryQuotaRequests;
801802
TInstant QuotasUpdatedAt = TInstant::Zero();
802803
bool QuotasUpdating = false;

‎ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp

+24-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
namespace NFq {
2323

24-
using TQuotaCountExecuter = TDbExecuter<THashMap<TString, ui32>>;
2524

2625
void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) {
2726

@@ -31,7 +30,15 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
3130
}
3231

3332
if (QuotasUpdatedAt + Config->QuotaTtl > Now()) {
34-
Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, ev->Get()->SubjectId, ev->Get()->MetricName, QueryQuotas.Value(ev->Get()->SubjectId, 0)));
33+
ui64 usage = 0;
34+
auto quotaIt = this->QueryQuotas.find(ev->Get()->SubjectId);
35+
if (quotaIt != this->QueryQuotas.end()) {
36+
auto queryType = ev->Get()->MetricName == QUOTA_ANALYTICS_COUNT_LIMIT
37+
? FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_ANALYTICS
38+
: FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_STREAMING;
39+
usage = quotaIt->second[queryType];
40+
}
41+
Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, ev->Get()->SubjectId, ev->Get()->MetricName, usage));
3542
}
3643

3744
QueryQuotaRequests[ev->Get()->SubjectId] = ev;
@@ -43,21 +50,24 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
4350
QuotasUpdating = true;
4451
QueryQuotas.clear();
4552

53+
using TQuotaCountExecuter = TDbExecuter<TQueryQuotasMap>;
54+
4655
TDbExecutable::TPtr executable;
4756
auto& executer = TQuotaCountExecuter::Create(executable, false, [](TQuotaCountExecuter& executer) { executer.State.clear(); } );
4857

4958
executer.Read(
5059
[=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) {
5160
builder.AddText(
52-
"SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n"
61+
"SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "` AS QUERY_TYPE, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n"
5362
"FROM `" PENDING_SMALL_TABLE_NAME "`\n"
54-
"GROUP BY `" SCOPE_COLUMN_NAME "`\n"
63+
"GROUP BY `" QUERY_TYPE_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`\n"
5564
);
5665
},
5766
[=](TQuotaCountExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) {
5867
TResultSetParser parser(resultSets.front());
5968
while (parser.TryNextRow()) {
6069
auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString();
70+
auto queryType = static_cast<FederatedQuery::QueryContent::QueryType>(parser.ColumnParser("QUERY_TYPE").GetOptionalInt64().value_or(1));
6171
auto count = parser.ColumnParser("PENDING_COUNT").GetUint64();
6272
executer.Read(
6373
[=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) {
@@ -73,7 +83,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
7383
if (parser.TryNextRow()) {
7484
FederatedQuery::Internal::QueryInternal internal;
7585
ParseProto(executer, internal, parser, INTERNAL_COLUMN_NAME);
76-
executer.State[internal.cloud_id()] += count;
86+
executer.State[internal.cloud_id()][queryType] += count;
7787
}
7888
},
7989
"GetScopeCloud_" + scope, true
@@ -87,8 +97,15 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
8797
this->QueryQuotas.swap(executer.State);
8898
for (auto& it : this->QueryQuotaRequests) {
8999
auto ev = it.second;
90-
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_ANALYTICS_COUNT_LIMIT, this->QueryQuotas.Value(it.first, 0)));
91-
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_STREAMING_COUNT_LIMIT, this->QueryQuotas.Value(it.first, 0)));
100+
ui64 analyticCount = 0;
101+
ui64 streamingCount = 0;
102+
auto quotaIt = this->QueryQuotas.find(it.first);
103+
if (quotaIt != this->QueryQuotas.end()) {
104+
analyticCount = quotaIt->second[FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_ANALYTICS];
105+
streamingCount = quotaIt->second[FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_STREAMING];
106+
}
107+
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_ANALYTICS_COUNT_LIMIT, analyticCount));
108+
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_STREAMING_COUNT_LIMIT, streamingCount));
92109
}
93110
this->QueryQuotaRequests.clear();
94111
this->QuotasUpdating = false;

0 commit comments

Comments
 (0)
Please sign in to comment.