Skip to content

Support cancel after in rate limiter #9486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/grpc_request_check_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class TGrpcRequestCheckActor
SetTokenAndDie();
break;
case Ydb::StatusIds::TIMEOUT:
case Ydb::StatusIds::CANCELLED:
Counters_->IncDatabaseRateLimitedCounter();
LOG_INFO(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Throughput limit exceeded");
ReplyOverloadedAndDie(MakeIssue(NKikimrIssues::TIssuesIds::YDB_RESOURCE_USAGE_LIMITED, "Throughput limit exceeded"));
Expand All @@ -331,7 +332,8 @@ class TGrpcRequestCheckActor
}
};

req.mutable_operation_params()->mutable_operation_timeout()->set_nanos(200000000); // same as cloud-go serverless proxy
req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(10);
req.mutable_operation_params()->mutable_cancel_after()->set_nanos(200000000); // same as cloud-go serverless proxy

NKikimr::NRpcService::RateLimiterAcquireUseSameMailbox(
std::move(req),
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/grpc_services/local_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ TActorId RateLimiterAcquireUseSameMailbox(
onSuccess();
break;
case Ydb::StatusIds::TIMEOUT:
case Ydb::StatusIds::CANCELLED:
onTimeout();
break;
default:
Expand All @@ -32,7 +33,8 @@ TActorId RateLimiterAcquireUseSameMailbox(
};

Ydb::RateLimiter::AcquireResourceRequest request;
SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after());
request.set_coordination_node_path(fullPath.CoordinationNode);
request.set_resource_path(fullPath.ResourcePath);
request.set_required(required);
Expand Down Expand Up @@ -72,6 +74,7 @@ TActorId RateLimiterAcquireUseSameMailbox(
onSuccess();
break;
case Ydb::StatusIds::TIMEOUT:
case Ydb::StatusIds::CANCELLED:
onTimeout();
break;
default:
Expand All @@ -82,7 +85,8 @@ TActorId RateLimiterAcquireUseSameMailbox(

const auto& rlPath = maybeRlPath.GetRef();
Ydb::RateLimiter::AcquireResourceRequest request;
SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after());
request.set_coordination_node_path(rlPath.CoordinationNode);
request.set_resource_path(rlPath.ResourcePath);
request.set_required(required);
Expand Down
32 changes: 27 additions & 5 deletions ydb/core/grpc_services/rpc_rate_limiter_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,18 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLi
SendRequest();
}

// Always race when "cancel after" time is not set.
// If "cancel after" is not set, quoter service can spend resource and say "OK", but we here reply with TIMEOUT.
void OnOperationTimeout(const TActorContext& ctx) {
Send(MakeQuoterServiceID(), new TEvQuota::TEvRpcTimeout(GetProtoRequest()->coordination_node_path(), GetProtoRequest()->resource_path()), 0, 0);
TBase::OnOperationTimeout(ctx);
}

// Do nothing here, because quoter service replies after "cancel after" time passes.
void OnCancelOperation(const TActorContext& ctx) {
Y_UNUSED(ctx);
}

STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvQuota::TEvClearance, Handle);
Expand Down Expand Up @@ -637,22 +644,37 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLi
true));
}

StatusIds::StatusCode QuoterDeadlineStatusCode() {
if (const TDuration cancelAfter = GetCancelAfter(); cancelAfter && cancelAfter < GetOperationTimeout()) {
return StatusIds::CANCELLED;
}
return StatusIds::TIMEOUT;
}

void SendLeaf(const TEvQuota::TResourceLeaf& leaf) {
TDuration deadline = GetOperationTimeout();
// CancelAfter is an intelligent way to say quoter service that we can wait maximum time.
// After that time quoter service sends EResult::Deadline.
// It says that the system lacks the resource.
if (const TDuration cancelAfter = GetCancelAfter(); cancelAfter && cancelAfter < deadline) {
deadline = cancelAfter;
}

Send(MakeQuoterServiceID(),
new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, GetOperationTimeout()), 0, 0);
new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, deadline), 0, 0);
}

void Handle(TEvQuota::TEvClearance::TPtr& ev) {
switch (ev->Get()->Result) {
case TEvQuota::TEvClearance::EResult::Success:
Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext());
break;
break;
case TEvQuota::TEvClearance::EResult::UnknownResource:
Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext());
break;
break;
case TEvQuota::TEvClearance::EResult::Deadline:
Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext());
break;
Reply(QuoterDeadlineStatusCode(), TActivationContext::AsActorContext());
break;
default:
Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/api/protos/ydb_rate_limiter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ message DescribeResourceResult {
//

message AcquireResourceRequest {
// If cancel_after is set greater than zero and less than operation_timeout
// and resource is not ready after cancel_after time,
// the result code of this operation will be CANCELLED and resource will not be spent.
// It is recommended to specify both operation_timeout and cancel_after.
// cancel_after should be less than operation_timeout and non zero.
Ydb.Operations.OperationParams operation_params = 1;

// Path of a coordination node.
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ class TRateLimiterClient {
TAsyncDescribeResourceResult DescribeResource(const TString& coordinationNodePath, const TString& resourcePath, const TDescribeResourceSettings& = {});

// Acquire resources's units inside a coordination node.
// If CancelAfter is set greater than zero and less than OperationTimeout
// and resource is not ready after CancelAfter time,
// the result code of this operation will be CANCELLED and resource will not be spent.
// It is recommended to specify both OperationTimeout and CancelAfter.
// CancelAfter should be less than OperationTimeout.
TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& = {});

private:
Expand Down
49 changes: 37 additions & 12 deletions ydb/services/rate_limiter/rate_limiter_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ class TTestSetupAcquireActor : public TTestSetup {
request.set_resource_path(ResourcePath);

SetDuration(Settings.OperationTimeout_, *request.mutable_operation_params()->mutable_operation_timeout());
if (Settings.CancelAfter_) {
SetDuration(Settings.CancelAfter_, *request.mutable_operation_params()->mutable_cancel_after());
}

if (Settings.IsUsedAmount_) {
request.set_used(Settings.Amount_.GetRef());
Expand Down Expand Up @@ -317,50 +320,72 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) {
return std::make_unique<TTestSetup>();
}

void AcquireResourceManyRequired(bool useActorApi) {
void AcquireResourceManyRequired(bool useActorApi, bool useCancelAfter) {
const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200);
const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set

using NYdb::NRateLimiter::TAcquireResourceSettings;

auto setup = MakeTestSetup(useActorApi);

ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));

setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);

for (int i = 0; i < 3; ++i) {
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
}
}

void AcquireResourceManyUsed(bool useActorApi) {
void AcquireResourceManyUsed(bool useActorApi, bool useCancelAfter) {
const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200);
const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set

using NYdb::NRateLimiter::TAcquireResourceSettings;

auto setup = MakeTestSetup(useActorApi);
ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));

setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
for (int i = 0; i < 3; ++i) {
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
}
}

Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApi) {
AcquireResourceManyRequired(false);
AcquireResourceManyRequired(false, false);
}

Y_UNIT_TEST(AcquireResourceManyRequiredActorApi) {
AcquireResourceManyRequired(true);
AcquireResourceManyRequired(true, false);
}

Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApiWithCancelAfter) {
AcquireResourceManyRequired(false, true);
}

Y_UNIT_TEST(AcquireResourceManyRequiredActorApiWithCancelAfter) {
AcquireResourceManyRequired(true, true);
}

Y_UNIT_TEST(AcquireResourceManyUsedGrpcApi) {
AcquireResourceManyUsed(false);
AcquireResourceManyUsed(false, false);
}

Y_UNIT_TEST(AcquireResourceManyUsedActorApi) {
AcquireResourceManyUsed(true);
AcquireResourceManyUsed(true, false);
}

Y_UNIT_TEST(AcquireResourceManyUsedGrpcApiWithCancelAfter) {
AcquireResourceManyUsed(false, true);
}

Y_UNIT_TEST(AcquireResourceManyUsedActorApiWithCancelAfter) {
AcquireResourceManyUsed(true, true);
}
}

Expand Down
Loading