Skip to content

Commit 97ff008

Browse files
authored
Support cancel after in rate limiter (#9486)
1 parent 492d722 commit 97ff008

File tree

6 files changed

+83
-20
lines changed

6 files changed

+83
-20
lines changed

ydb/core/grpc_services/grpc_request_check_actor.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ class TGrpcRequestCheckActor
312312
SetTokenAndDie();
313313
break;
314314
case Ydb::StatusIds::TIMEOUT:
315+
case Ydb::StatusIds::CANCELLED:
315316
Counters_->IncDatabaseRateLimitedCounter();
316317
LOG_INFO(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Throughput limit exceeded");
317318
ReplyOverloadedAndDie(MakeIssue(NKikimrIssues::TIssuesIds::YDB_RESOURCE_USAGE_LIMITED, "Throughput limit exceeded"));
@@ -331,7 +332,8 @@ class TGrpcRequestCheckActor
331332
}
332333
};
333334

334-
req.mutable_operation_params()->mutable_operation_timeout()->set_nanos(200000000); // same as cloud-go serverless proxy
335+
req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(10);
336+
req.mutable_operation_params()->mutable_cancel_after()->set_nanos(200000000); // same as cloud-go serverless proxy
335337

336338
NKikimr::NRpcService::RateLimiterAcquireUseSameMailbox(
337339
std::move(req),

ydb/core/grpc_services/local_rate_limiter.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ TActorId RateLimiterAcquireUseSameMailbox(
2323
onSuccess();
2424
break;
2525
case Ydb::StatusIds::TIMEOUT:
26+
case Ydb::StatusIds::CANCELLED:
2627
onTimeout();
2728
break;
2829
default:
@@ -32,7 +33,8 @@ TActorId RateLimiterAcquireUseSameMailbox(
3233
};
3334

3435
Ydb::RateLimiter::AcquireResourceRequest request;
35-
SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout());
36+
SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout());
37+
SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after());
3638
request.set_coordination_node_path(fullPath.CoordinationNode);
3739
request.set_resource_path(fullPath.ResourcePath);
3840
request.set_required(required);
@@ -72,6 +74,7 @@ TActorId RateLimiterAcquireUseSameMailbox(
7274
onSuccess();
7375
break;
7476
case Ydb::StatusIds::TIMEOUT:
77+
case Ydb::StatusIds::CANCELLED:
7578
onTimeout();
7679
break;
7780
default:
@@ -82,7 +85,8 @@ TActorId RateLimiterAcquireUseSameMailbox(
8285

8386
const auto& rlPath = maybeRlPath.GetRef();
8487
Ydb::RateLimiter::AcquireResourceRequest request;
85-
SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout());
88+
SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout());
89+
SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after());
8690
request.set_coordination_node_path(rlPath.CoordinationNode);
8791
request.set_resource_path(rlPath.ResourcePath);
8892
request.set_required(required);

ydb/core/grpc_services/rpc_rate_limiter_api.cpp

+27-5
Original file line numberDiff line numberDiff line change
@@ -594,11 +594,18 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLi
594594
SendRequest();
595595
}
596596

597+
// Always race when "cancel after" time is not set.
598+
// If "cancel after" is not set, quoter service can spend resource and say "OK", but we here reply with TIMEOUT.
597599
void OnOperationTimeout(const TActorContext& ctx) {
598600
Send(MakeQuoterServiceID(), new TEvQuota::TEvRpcTimeout(GetProtoRequest()->coordination_node_path(), GetProtoRequest()->resource_path()), 0, 0);
599601
TBase::OnOperationTimeout(ctx);
600602
}
601603

604+
// Do nothing here, because quoter service replies after "cancel after" time passes.
605+
void OnCancelOperation(const TActorContext& ctx) {
606+
Y_UNUSED(ctx);
607+
}
608+
602609
STFUNC(StateFunc) {
603610
switch (ev->GetTypeRewrite()) {
604611
hFunc(TEvQuota::TEvClearance, Handle);
@@ -637,22 +644,37 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLi
637644
true));
638645
}
639646

647+
StatusIds::StatusCode QuoterDeadlineStatusCode() {
648+
if (const TDuration cancelAfter = GetCancelAfter(); cancelAfter && cancelAfter < GetOperationTimeout()) {
649+
return StatusIds::CANCELLED;
650+
}
651+
return StatusIds::TIMEOUT;
652+
}
653+
640654
void SendLeaf(const TEvQuota::TResourceLeaf& leaf) {
655+
TDuration deadline = GetOperationTimeout();
656+
// CancelAfter is an intelligent way to say quoter service that we can wait maximum time.
657+
// After that time quoter service sends EResult::Deadline.
658+
// It says that the system lacks the resource.
659+
if (const TDuration cancelAfter = GetCancelAfter(); cancelAfter && cancelAfter < deadline) {
660+
deadline = cancelAfter;
661+
}
662+
641663
Send(MakeQuoterServiceID(),
642-
new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, GetOperationTimeout()), 0, 0);
664+
new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, deadline), 0, 0);
643665
}
644666

645667
void Handle(TEvQuota::TEvClearance::TPtr& ev) {
646668
switch (ev->Get()->Result) {
647669
case TEvQuota::TEvClearance::EResult::Success:
648670
Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext());
649-
break;
671+
break;
650672
case TEvQuota::TEvClearance::EResult::UnknownResource:
651673
Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext());
652-
break;
674+
break;
653675
case TEvQuota::TEvClearance::EResult::Deadline:
654-
Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext());
655-
break;
676+
Reply(QuoterDeadlineStatusCode(), TActivationContext::AsActorContext());
677+
break;
656678
default:
657679
Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
658680
}

ydb/public/api/protos/ydb_rate_limiter.proto

+5
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ message DescribeResourceResult {
266266
//
267267

268268
message AcquireResourceRequest {
269+
// If cancel_after is set greater than zero and less than operation_timeout
270+
// and resource is not ready after cancel_after time,
271+
// the result code of this operation will be CANCELLED and resource will not be spent.
272+
// It is recommended to specify both operation_timeout and cancel_after.
273+
// cancel_after should be less than operation_timeout and non zero.
269274
Ydb.Operations.OperationParams operation_params = 1;
270275

271276
// Path of a coordination node.

ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h

+5
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ class TRateLimiterClient {
161161
TAsyncDescribeResourceResult DescribeResource(const TString& coordinationNodePath, const TString& resourcePath, const TDescribeResourceSettings& = {});
162162

163163
// Acquire resources's units inside a coordination node.
164+
// If CancelAfter is set greater than zero and less than OperationTimeout
165+
// and resource is not ready after CancelAfter time,
166+
// the result code of this operation will be CANCELLED and resource will not be spent.
167+
// It is recommended to specify both OperationTimeout and CancelAfter.
168+
// CancelAfter should be less than OperationTimeout.
164169
TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& = {});
165170

166171
private:

ydb/services/rate_limiter/rate_limiter_ut.cpp

+37-12
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ class TTestSetupAcquireActor : public TTestSetup {
108108
request.set_resource_path(ResourcePath);
109109

110110
SetDuration(Settings.OperationTimeout_, *request.mutable_operation_params()->mutable_operation_timeout());
111+
if (Settings.CancelAfter_) {
112+
SetDuration(Settings.CancelAfter_, *request.mutable_operation_params()->mutable_cancel_after());
113+
}
111114

112115
if (Settings.IsUsedAmount_) {
113116
request.set_used(Settings.Amount_.GetRef());
@@ -317,50 +320,72 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) {
317320
return std::make_unique<TTestSetup>();
318321
}
319322

320-
void AcquireResourceManyRequired(bool useActorApi) {
323+
void AcquireResourceManyRequired(bool useActorApi, bool useCancelAfter) {
324+
const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200);
325+
const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set
326+
321327
using NYdb::NRateLimiter::TAcquireResourceSettings;
322328

323329
auto setup = MakeTestSetup(useActorApi);
324330

325331
ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
326332
TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));
327333

328-
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
334+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
329335

330336
for (int i = 0; i < 3; ++i) {
331-
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
332-
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
337+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT);
338+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
333339
}
334340
}
335341

336-
void AcquireResourceManyUsed(bool useActorApi) {
342+
void AcquireResourceManyUsed(bool useActorApi, bool useCancelAfter) {
343+
const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200);
344+
const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set
345+
337346
using NYdb::NRateLimiter::TAcquireResourceSettings;
338347

339348
auto setup = MakeTestSetup(useActorApi);
340349
ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
341350
TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));
342351

343-
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
352+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
344353
for (int i = 0; i < 3; ++i) {
345-
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
346-
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
354+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT);
355+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
347356
}
348357
}
349358

350359
Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApi) {
351-
AcquireResourceManyRequired(false);
360+
AcquireResourceManyRequired(false, false);
352361
}
353362

354363
Y_UNIT_TEST(AcquireResourceManyRequiredActorApi) {
355-
AcquireResourceManyRequired(true);
364+
AcquireResourceManyRequired(true, false);
365+
}
366+
367+
Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApiWithCancelAfter) {
368+
AcquireResourceManyRequired(false, true);
369+
}
370+
371+
Y_UNIT_TEST(AcquireResourceManyRequiredActorApiWithCancelAfter) {
372+
AcquireResourceManyRequired(true, true);
356373
}
357374

358375
Y_UNIT_TEST(AcquireResourceManyUsedGrpcApi) {
359-
AcquireResourceManyUsed(false);
376+
AcquireResourceManyUsed(false, false);
360377
}
361378

362379
Y_UNIT_TEST(AcquireResourceManyUsedActorApi) {
363-
AcquireResourceManyUsed(true);
380+
AcquireResourceManyUsed(true, false);
381+
}
382+
383+
Y_UNIT_TEST(AcquireResourceManyUsedGrpcApiWithCancelAfter) {
384+
AcquireResourceManyUsed(false, true);
385+
}
386+
387+
Y_UNIT_TEST(AcquireResourceManyUsedActorApiWithCancelAfter) {
388+
AcquireResourceManyUsed(true, true);
364389
}
365390
}
366391

0 commit comments

Comments
 (0)