Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3903 Fix quota count (yq.streamingQuery.count) (#16704) / to stable #16814

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,13 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
{
using TBase = TControlPlaneStorageBase;

using TQueryQuotasMap = THashMap<TString, std::array<ui32, 3>>; // 3 = max(FederatedQuery::QueryContent::QueryType) + 1

::NFq::TYqSharedResources::TPtr YqSharedResources;

NKikimr::TYdbCredentialsProviderFactory CredProviderFactory;

// Query Quota
THashMap<TString, ui32> QueryQuotas;
TQueryQuotasMap QueryQuotas;
THashMap<TString, TEvQuotaService::TQuotaUsageRequest::TPtr> QueryQuotaRequests;
TInstant QuotasUpdatedAt = TInstant::Zero();
bool QuotasUpdating = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

namespace NFq {

using TQuotaCountExecuter = TDbExecuter<THashMap<TString, ui32>>;

void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) {

Expand All @@ -31,7 +30,15 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
}

if (QuotasUpdatedAt + Config->QuotaTtl > Now()) {
Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, ev->Get()->SubjectId, ev->Get()->MetricName, QueryQuotas.Value(ev->Get()->SubjectId, 0)));
ui64 usage = 0;
auto quotaIt = this->QueryQuotas.find(ev->Get()->SubjectId);
if (quotaIt != this->QueryQuotas.end()) {
auto queryType = ev->Get()->MetricName == QUOTA_ANALYTICS_COUNT_LIMIT
? FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_ANALYTICS
: FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_STREAMING;
usage = quotaIt->second[queryType];
}
Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, ev->Get()->SubjectId, ev->Get()->MetricName, usage));
}

QueryQuotaRequests[ev->Get()->SubjectId] = ev;
Expand All @@ -43,21 +50,24 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
QuotasUpdating = true;
QueryQuotas.clear();

using TQuotaCountExecuter = TDbExecuter<TQueryQuotasMap>;

TDbExecutable::TPtr executable;
auto& executer = TQuotaCountExecuter::Create(executable, false, [](TQuotaCountExecuter& executer) { executer.State.clear(); } );

executer.Read(
[=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) {
builder.AddText(
"SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n"
"SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "` AS QUERY_TYPE, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n"
"FROM `" PENDING_SMALL_TABLE_NAME "`\n"
"GROUP BY `" SCOPE_COLUMN_NAME "`\n"
"GROUP BY `" QUERY_TYPE_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`\n"
);
},
[=](TQuotaCountExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) {
TResultSetParser parser(resultSets.front());
while (parser.TryNextRow()) {
auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString();
auto queryType = static_cast<FederatedQuery::QueryContent::QueryType>(parser.ColumnParser("QUERY_TYPE").GetOptionalInt64().GetOrElse(1));
auto count = parser.ColumnParser("PENDING_COUNT").GetUint64();
executer.Read(
[=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) {
Expand All @@ -73,7 +83,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
if (parser.TryNextRow()) {
FederatedQuery::Internal::QueryInternal internal;
ParseProto(executer, internal, parser, INTERNAL_COLUMN_NAME);
executer.State[internal.cloud_id()] += count;
executer.State[internal.cloud_id()][queryType] += count;
}
},
"GetScopeCloud_" + scope, true
Expand All @@ -87,8 +97,15 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
this->QueryQuotas.swap(executer.State);
for (auto& it : this->QueryQuotaRequests) {
auto ev = it.second;
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_ANALYTICS_COUNT_LIMIT, this->QueryQuotas.Value(it.first, 0)));
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_STREAMING_COUNT_LIMIT, this->QueryQuotas.Value(it.first, 0)));
ui64 analyticCount = 0;
ui64 streamingCount = 0;
auto quotaIt = this->QueryQuotas.find(it.first);
if (quotaIt != this->QueryQuotas.end()) {
analyticCount = quotaIt->second[FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_ANALYTICS];
streamingCount = quotaIt->second[FederatedQuery::QueryContent::QueryType::QueryContent_QueryType_STREAMING];
}
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_ANALYTICS_COUNT_LIMIT, analyticCount));
this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_STREAMING_COUNT_LIMIT, streamingCount));
}
this->QueryQuotaRequests.clear();
this->QuotasUpdating = false;
Expand Down
Loading