Skip to content

Commit 19cfbf1

Browse files
author
Harvey Lowndes
committed
Support request rate limiting
1 parent 95f288c commit 19cfbf1

File tree

7 files changed

+188
-6
lines changed

7 files changed

+188
-6
lines changed

pkg/oci/ccm.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
clientset "k8s.io/client-go/kubernetes"
3636
listersv1 "k8s.io/client-go/listers/core/v1"
3737
cache "k8s.io/client-go/tools/cache"
38+
"k8s.io/client-go/util/flowcontrol"
3839
cloudprovider "k8s.io/kubernetes/pkg/cloudprovider"
3940
controller "k8s.io/kubernetes/pkg/controller"
4041

@@ -43,6 +44,11 @@ import (
4344
"github.com/oracle/oci-cloud-controller-manager/pkg/oci/util"
4445
)
4546

47+
const (
48+
RateLimitQPSDefault = 1.0
49+
RateLimitBucketDefault = 5
50+
)
51+
4652
// ProviderName uniquely identifies the Oracle Bare Metal Cloud Services (OCI)
4753
// cloud-provider.
4854
func ProviderName() string {
@@ -73,7 +79,10 @@ func NewCloudProvider(config *Config) (cloudprovider.Interface, error) {
7379
if err != nil {
7480
return nil, err
7581
}
76-
c, err := client.New(cp)
82+
83+
rateLimiter := buildNewRateLimiter(config.RateLimiter)
84+
85+
c, err := client.New(cp, &rateLimiter)
7786
if err != nil {
7887
return nil, err
7988
}
@@ -218,3 +227,39 @@ func buildConfigurationProvider(config *Config) (common.ConfigurationProvider, e
218227
)
219228
return cp, nil
220229
}
230+
231+
// BuildRateLimiter ...
232+
func buildNewRateLimiter(config *RateLimiterConfig) client.RateLimiter {
233+
// Set to default values if configuration not declared
234+
if config.RateLimitQPSRead == 0 {
235+
config.RateLimitQPSRead = RateLimitQPSDefault
236+
}
237+
if config.RateLimitBucketRead == 0 {
238+
config.RateLimitBucketRead = RateLimitBucketDefault
239+
}
240+
if config.RateLimitQPSWrite == 0 {
241+
config.RateLimitQPSWrite = RateLimitQPSDefault
242+
}
243+
if config.RateLimitBucketWrite == 0 {
244+
config.RateLimitBucketWrite = RateLimitBucketDefault
245+
}
246+
247+
rateLimiter := client.RateLimiter{
248+
Reader: flowcontrol.NewTokenBucketRateLimiter(
249+
config.RateLimitQPSRead,
250+
config.RateLimitBucketRead),
251+
Writer: flowcontrol.NewTokenBucketRateLimiter(
252+
config.RateLimitQPSWrite,
253+
config.RateLimitBucketWrite),
254+
}
255+
256+
glog.V(2).Infof("OCI using read rate limit configuration: QPS=%g, bucket=%d",
257+
config.RateLimitQPSRead,
258+
config.RateLimitBucketRead)
259+
260+
glog.V(2).Infof("OCI using write rate limit configuration: QPS=%g, bucket=%d",
261+
config.RateLimitQPSWrite,
262+
config.RateLimitBucketWrite)
263+
264+
return rateLimiter
265+
}

pkg/oci/client/client.go

+44-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package client
1616

1717
import (
18+
"context"
1819
"crypto/tls"
1920
"crypto/x509"
2021
"io/ioutil"
@@ -25,6 +26,7 @@ import (
2526
"time"
2627

2728
"k8s.io/client-go/tools/cache"
29+
"k8s.io/client-go/util/flowcontrol"
2830

2931
"github.com/golang/glog"
3032
"github.com/oracle/oci-go-sdk/common"
@@ -40,16 +42,53 @@ type Interface interface {
4042
Networking() NetworkingInterface
4143
}
4244

45+
// RateLimiter reader and writer.
46+
type RateLimiter struct {
47+
Reader flowcontrol.RateLimiter
48+
Writer flowcontrol.RateLimiter
49+
}
50+
51+
type computeClient interface {
52+
GetInstance(ctx context.Context, request core.GetInstanceRequest) (response core.GetInstanceResponse, err error)
53+
ListInstances(ctx context.Context, request core.ListInstancesRequest) (response core.ListInstancesResponse, err error)
54+
ListVnicAttachments(ctx context.Context, request core.ListVnicAttachmentsRequest) (response core.ListVnicAttachmentsResponse, err error)
55+
}
56+
57+
type virtualNetworkClient interface {
58+
GetVnic(ctx context.Context, request core.GetVnicRequest) (response core.GetVnicResponse, err error)
59+
GetSubnet(ctx context.Context, request core.GetSubnetRequest) (response core.GetSubnetResponse, err error)
60+
GetSecurityList(ctx context.Context, request core.GetSecurityListRequest) (response core.GetSecurityListResponse, err error)
61+
UpdateSecurityList(ctx context.Context, request core.UpdateSecurityListRequest) (response core.UpdateSecurityListResponse, err error)
62+
}
63+
64+
type loadBalancerClient interface {
65+
GetLoadBalancer(ctx context.Context, request loadbalancer.GetLoadBalancerRequest) (response loadbalancer.GetLoadBalancerResponse, err error)
66+
ListLoadBalancers(ctx context.Context, request loadbalancer.ListLoadBalancersRequest) (response loadbalancer.ListLoadBalancersResponse, err error)
67+
CreateLoadBalancer(ctx context.Context, request loadbalancer.CreateLoadBalancerRequest) (response loadbalancer.CreateLoadBalancerResponse, err error)
68+
DeleteLoadBalancer(ctx context.Context, request loadbalancer.DeleteLoadBalancerRequest) (response loadbalancer.DeleteLoadBalancerResponse, err error)
69+
ListCertificates(ctx context.Context, request loadbalancer.ListCertificatesRequest) (response loadbalancer.ListCertificatesResponse, err error)
70+
CreateCertificate(ctx context.Context, request loadbalancer.CreateCertificateRequest) (response loadbalancer.CreateCertificateResponse, err error)
71+
GetWorkRequest(ctx context.Context, request loadbalancer.GetWorkRequestRequest) (response loadbalancer.GetWorkRequestResponse, err error)
72+
CreateBackendSet(ctx context.Context, request loadbalancer.CreateBackendSetRequest) (response loadbalancer.CreateBackendSetResponse, err error)
73+
UpdateBackendSet(ctx context.Context, request loadbalancer.UpdateBackendSetRequest) (response loadbalancer.UpdateBackendSetResponse, err error)
74+
DeleteBackendSet(ctx context.Context, request loadbalancer.DeleteBackendSetRequest) (response loadbalancer.DeleteBackendSetResponse, err error)
75+
CreateListener(ctx context.Context, request loadbalancer.CreateListenerRequest) (response loadbalancer.CreateListenerResponse, err error)
76+
UpdateListener(ctx context.Context, request loadbalancer.UpdateListenerRequest) (response loadbalancer.UpdateListenerResponse, err error)
77+
DeleteListener(ctx context.Context, request loadbalancer.DeleteListenerRequest) (response loadbalancer.DeleteListenerResponse, err error)
78+
}
79+
4380
type client struct {
44-
compute *core.ComputeClient
45-
network *core.VirtualNetworkClient
46-
loadbalancer *loadbalancer.LoadBalancerClient
81+
compute computeClient
82+
network virtualNetworkClient
83+
loadbalancer loadBalancerClient
84+
85+
rateLimiter RateLimiter
4786

4887
subnetCache cache.Store
4988
}
5089

5190
// New constructs an OCI API client.
52-
func New(cp common.ConfigurationProvider) (Interface, error) {
91+
func New(cp common.ConfigurationProvider, opRateLimiter *RateLimiter) (Interface, error) {
5392
compute, err := core.NewComputeClientWithConfigurationProvider(cp)
5493
if err != nil {
5594
return nil, errors.Wrap(err, "NewComputeClientWithConfigurationProvider")
@@ -84,6 +123,7 @@ func New(cp common.ConfigurationProvider) (Interface, error) {
84123
compute: &compute,
85124
network: &network,
86125
loadbalancer: &lb,
126+
rateLimiter: *opRateLimiter,
87127

88128
subnetCache: cache.NewTTLStore(subnetCacheKeyFn, time.Duration(24)*time.Hour),
89129
}

pkg/oci/client/compute.go

+11
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type ComputeInterface interface {
3535
}
3636

3737
func (c *client) GetInstance(ctx context.Context, id string) (*core.Instance, error) {
38+
if !c.rateLimiter.Reader.TryAccept() {
39+
return nil, RateLimitError(false, "GetInstance")
40+
}
41+
3842
resp, err := c.compute.GetInstance(ctx, core.GetInstanceRequest{
3943
InstanceId: &id,
4044
})
@@ -53,6 +57,9 @@ func (c *client) getInstanceByDisplayName(ctx context.Context, compartmentID, di
5357
instances []core.Instance
5458
)
5559
for {
60+
if !c.rateLimiter.Reader.TryAccept() {
61+
return nil, RateLimitError(false, "getInstanceByDisplayName")
62+
}
5663
resp, err := c.compute.ListInstances(ctx, core.ListInstancesRequest{
5764
CompartmentId: &compartmentID,
5865
DisplayName: &displayName,
@@ -81,6 +88,10 @@ func (c *client) getInstanceByDisplayName(ctx context.Context, compartmentID, di
8188
}
8289

8390
func (c *client) listVNICAttachments(ctx context.Context, req core.ListVnicAttachmentsRequest) (core.ListVnicAttachmentsResponse, error) {
91+
if !c.rateLimiter.Reader.TryAccept() {
92+
return core.ListVnicAttachmentsResponse{}, RateLimitError(false, "listVNICAttachments")
93+
}
94+
8495
resp, err := c.compute.ListVnicAttachments(ctx, req)
8596
incRequestCounter(err, listVerb, vnicAttachmentResource)
8697

pkg/oci/client/errors.go

+9
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,12 @@ func IsRetryable(err error) bool {
4949
serviceErr, ok := common.IsServiceError(err)
5050
return ok && serviceErr.GetHTTPStatusCode() == http.StatusTooManyRequests
5151
}
52+
53+
// RateLimitError produces an Errorf for rate limiting.
54+
func RateLimitError(isWrite bool, opName string) error {
55+
opType := "read"
56+
if isWrite {
57+
opType = "write"
58+
}
59+
return errors.Errorf("rate limited(%s) for operation: %s", opType, opName)
60+
}

pkg/oci/client/load_balancer.go

+53-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const workRequestPollInterval = 5 * time.Second
2929
// LoadBalancerInterface for consumed LB functionality.
3030
type LoadBalancerInterface interface {
3131
CreateLoadBalancer(ctx context.Context, details loadbalancer.CreateLoadBalancerDetails) (string, error)
32+
3233
GetLoadBalancer(ctx context.Context, id string) (*loadbalancer.LoadBalancer, error)
3334
GetLoadBalancerByName(ctx context.Context, compartmentID, name string) (*loadbalancer.LoadBalancer, error)
3435
DeleteLoadBalancer(ctx context.Context, id string) (string, error)
@@ -48,6 +49,10 @@ type LoadBalancerInterface interface {
4849
}
4950

5051
func (c *client) GetLoadBalancer(ctx context.Context, id string) (*loadbalancer.LoadBalancer, error) {
52+
if !c.rateLimiter.Reader.TryAccept() {
53+
return nil, RateLimitError(false, "GetLoadBalancer")
54+
}
55+
5156
resp, err := c.loadbalancer.GetLoadBalancer(ctx, loadbalancer.GetLoadBalancerRequest{
5257
LoadBalancerId: &id,
5358
})
@@ -63,6 +68,9 @@ func (c *client) GetLoadBalancer(ctx context.Context, id string) (*loadbalancer.
6368
func (c *client) GetLoadBalancerByName(ctx context.Context, compartmentID, name string) (*loadbalancer.LoadBalancer, error) {
6469
var page *string
6570
for {
71+
if !c.rateLimiter.Reader.TryAccept() {
72+
return nil, RateLimitError(false, "GetLoadBalancerByName")
73+
}
6674
resp, err := c.loadbalancer.ListLoadBalancers(ctx, loadbalancer.ListLoadBalancersRequest{
6775
CompartmentId: &compartmentID,
6876
DisplayName: &name,
@@ -87,6 +95,10 @@ func (c *client) GetLoadBalancerByName(ctx context.Context, compartmentID, name
8795
}
8896

8997
func (c *client) CreateLoadBalancer(ctx context.Context, details loadbalancer.CreateLoadBalancerDetails) (string, error) {
98+
if !c.rateLimiter.Writer.TryAccept() {
99+
return "", RateLimitError(true, "CreateLoadBalancer")
100+
}
101+
90102
resp, err := c.loadbalancer.CreateLoadBalancer(ctx, loadbalancer.CreateLoadBalancerRequest{
91103
CreateLoadBalancerDetails: details,
92104
})
@@ -100,6 +112,10 @@ func (c *client) CreateLoadBalancer(ctx context.Context, details loadbalancer.Cr
100112
}
101113

102114
func (c *client) DeleteLoadBalancer(ctx context.Context, id string) (string, error) {
115+
if !c.rateLimiter.Writer.TryAccept() {
116+
return "", RateLimitError(true, "DeleteLoadBalancer")
117+
}
118+
103119
resp, err := c.loadbalancer.DeleteLoadBalancer(ctx, loadbalancer.DeleteLoadBalancerRequest{
104120
LoadBalancerId: &id,
105121
})
@@ -113,6 +129,10 @@ func (c *client) DeleteLoadBalancer(ctx context.Context, id string) (string, err
113129
}
114130

115131
func (c *client) GetCertificateByName(ctx context.Context, lbID, name string) (*loadbalancer.Certificate, error) {
132+
if !c.rateLimiter.Reader.TryAccept() {
133+
return nil, RateLimitError(false, "GetCertificateByName")
134+
}
135+
116136
resp, err := c.loadbalancer.ListCertificates(ctx, loadbalancer.ListCertificatesRequest{
117137
LoadBalancerId: &lbID,
118138
})
@@ -131,6 +151,10 @@ func (c *client) GetCertificateByName(ctx context.Context, lbID, name string) (*
131151
}
132152

133153
func (c *client) CreateCertificate(ctx context.Context, lbID, certificate, key string) (string, error) {
154+
if !c.rateLimiter.Writer.TryAccept() {
155+
return "", RateLimitError(true, "CreateCertificate")
156+
}
157+
134158
// TODO(apryde): We currently don't have a mechanism for supplying
135159
// CreateCertificateDetails.CaCertificate.
136160
resp, err := c.loadbalancer.CreateCertificate(ctx, loadbalancer.CreateCertificateRequest{
@@ -150,6 +174,10 @@ func (c *client) CreateCertificate(ctx context.Context, lbID, certificate, key s
150174
}
151175

152176
func (c *client) GetWorkRequest(ctx context.Context, id string) (*loadbalancer.WorkRequest, error) {
177+
if !c.rateLimiter.Reader.TryAccept() {
178+
return nil, RateLimitError(false, "GetWorkRequest")
179+
}
180+
153181
resp, err := c.loadbalancer.GetWorkRequest(ctx, loadbalancer.GetWorkRequestRequest{
154182
WorkRequestId: &id,
155183
})
@@ -163,6 +191,10 @@ func (c *client) GetWorkRequest(ctx context.Context, id string) (*loadbalancer.W
163191
}
164192

165193
func (c *client) CreateBackendSet(ctx context.Context, lbID, name string, details loadbalancer.BackendSetDetails) (string, error) {
194+
if !c.rateLimiter.Writer.TryAccept() {
195+
return "", RateLimitError(true, "CreateBackendSet")
196+
}
197+
166198
resp, err := c.loadbalancer.CreateBackendSet(ctx, loadbalancer.CreateBackendSetRequest{
167199
LoadBalancerId: &lbID,
168200
CreateBackendSetDetails: loadbalancer.CreateBackendSetDetails{
@@ -184,6 +216,10 @@ func (c *client) CreateBackendSet(ctx context.Context, lbID, name string, detail
184216
}
185217

186218
func (c *client) UpdateBackendSet(ctx context.Context, lbID, name string, details loadbalancer.BackendSetDetails) (string, error) {
219+
if !c.rateLimiter.Writer.TryAccept() {
220+
return "", RateLimitError(true, "UpdateBackendSet")
221+
}
222+
187223
resp, err := c.loadbalancer.UpdateBackendSet(ctx, loadbalancer.UpdateBackendSetRequest{
188224
LoadBalancerId: &lbID,
189225
BackendSetName: &name,
@@ -205,6 +241,10 @@ func (c *client) UpdateBackendSet(ctx context.Context, lbID, name string, detail
205241
}
206242

207243
func (c *client) DeleteBackendSet(ctx context.Context, lbID, name string) (string, error) {
244+
if !c.rateLimiter.Writer.TryAccept() {
245+
return "", RateLimitError(true, "DeleteBackendSet")
246+
}
247+
208248
resp, err := c.loadbalancer.DeleteBackendSet(ctx, loadbalancer.DeleteBackendSetRequest{
209249
LoadBalancerId: &lbID,
210250
BackendSetName: &name,
@@ -219,6 +259,10 @@ func (c *client) DeleteBackendSet(ctx context.Context, lbID, name string) (strin
219259
}
220260

221261
func (c *client) CreateListener(ctx context.Context, lbID, name string, details loadbalancer.ListenerDetails) (string, error) {
262+
if !c.rateLimiter.Writer.TryAccept() {
263+
return "", RateLimitError(true, "CreateListener")
264+
}
265+
222266
resp, err := c.loadbalancer.CreateListener(ctx, loadbalancer.CreateListenerRequest{
223267
LoadBalancerId: &lbID,
224268
CreateListenerDetails: loadbalancer.CreateListenerDetails{
@@ -239,6 +283,10 @@ func (c *client) CreateListener(ctx context.Context, lbID, name string, details
239283
}
240284

241285
func (c *client) UpdateListener(ctx context.Context, lbID, name string, details loadbalancer.ListenerDetails) (string, error) {
286+
if !c.rateLimiter.Writer.TryAccept() {
287+
return "", RateLimitError(true, "UpdateListener")
288+
}
289+
242290
resp, err := c.loadbalancer.UpdateListener(ctx, loadbalancer.UpdateListenerRequest{
243291
LoadBalancerId: &lbID,
244292
ListenerName: &name,
@@ -273,14 +321,18 @@ func (c *client) AwaitWorkRequest(ctx context.Context, id string) (*loadbalancer
273321
wr = twr
274322
return true, nil
275323
case loadbalancer.WorkRequestLifecycleStateFailed:
276-
return false, errors.Errorf("WorkRequest %q failed: %s", id, twr.Message)
324+
return false, errors.Errorf("WorkRequest %q failed: %s", id, *twr.Message)
277325
}
278326
return false, nil
279327
}, ctx.Done())
280328
return wr, err
281329
}
282330

283331
func (c *client) DeleteListener(ctx context.Context, lbID, name string) (string, error) {
332+
if !c.rateLimiter.Writer.TryAccept() {
333+
return "", RateLimitError(true, "DeleteListener")
334+
}
335+
284336
resp, err := c.loadbalancer.DeleteListener(ctx, loadbalancer.DeleteListenerRequest{
285337
LoadBalancerId: &lbID,
286338
ListenerName: &name,

0 commit comments

Comments
 (0)