-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[common-go] Add keyed gRPC rate limits #9547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -6,60 +6,201 @@ package grpc | |||||
|
||||||
import ( | ||||||
"context" | ||||||
"strings" | ||||||
"time" | ||||||
|
||||||
"golang.org/x/time/rate" | ||||||
"google.golang.org/grpc" | ||||||
"google.golang.org/grpc/codes" | ||||||
"google.golang.org/grpc/status" | ||||||
"google.golang.org/protobuf/proto" | ||||||
"google.golang.org/protobuf/reflect/protoreflect" | ||||||
|
||||||
"github.com/gitpod-io/gitpod/common-go/util" | ||||||
lru "github.com/hashicorp/golang-lru" | ||||||
"github.com/prometheus/client_golang/prometheus" | ||||||
) | ||||||
|
||||||
type keyFunc func(req interface{}) (string, error) | ||||||
|
||||||
// RateLimit configures the reate limit for a function | ||||||
type RateLimit struct { | ||||||
Block bool `json:"block"` | ||||||
BucketSize uint `json:"bucketSize"` | ||||||
RefillInterval util.Duration `json:"refillInterval"` | ||||||
|
||||||
KeyCacheSize uint `json:"keyCacheSize,omitempty"` | ||||||
Key string `json:"key,omitempty"` | ||||||
} | ||||||
|
||||||
func (r RateLimit) Limiter() *rate.Limiter { | ||||||
return rate.NewLimiter(rate.Every(time.Duration(r.RefillInterval)), int(r.BucketSize)) | ||||||
} | ||||||
|
||||||
// NewRatelimitingInterceptor creates a new rate limiting interceptor | ||||||
func NewRatelimitingInterceptor(f map[string]RateLimit) RatelimitingInterceptor { | ||||||
funcs := make(map[string]ratelimitedFunction, len(f)) | ||||||
callCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ | ||||||
Namespace: "grpc", | ||||||
Subsystem: "server", | ||||||
Name: "rate_limiter_calls_total", | ||||||
}, []string{"grpc_method", "rate_limited"}) | ||||||
cacheHitCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ | ||||||
Namespace: "grpc", | ||||||
Subsystem: "server", | ||||||
Name: "rate_limiter_cache_hit_total", | ||||||
}, []string{"grpc_method"}) | ||||||
|
||||||
funcs := make(map[string]*ratelimitedFunction, len(f)) | ||||||
for name, fnc := range f { | ||||||
funcs[name] = ratelimitedFunction{ | ||||||
Block: fnc.Block, | ||||||
L: rate.NewLimiter(rate.Every(time.Duration(fnc.RefillInterval)), int(fnc.BucketSize)), | ||||||
var ( | ||||||
keyedLimit *lru.Cache | ||||||
key keyFunc | ||||||
) | ||||||
if fnc.Key != "" && fnc.KeyCacheSize > 0 { | ||||||
keyedLimit, _ = lru.New(int(fnc.KeyCacheSize)) | ||||||
key = fieldAccessKey(fnc.Key) | ||||||
} | ||||||
|
||||||
funcs[name] = &ratelimitedFunction{ | ||||||
RateLimit: fnc, | ||||||
GlobalLimit: fnc.Limiter(), | ||||||
Key: key, | ||||||
KeyedLimit: keyedLimit, | ||||||
RateLimitedTotal: callCounter.WithLabelValues(name, "true"), | ||||||
NotRateLimitedTotal: callCounter.WithLabelValues(name, "false"), | ||||||
CacheMissTotal: cacheHitCounter.WithLabelValues(name), | ||||||
} | ||||||
} | ||||||
return funcs | ||||||
return RatelimitingInterceptor{ | ||||||
functions: funcs, | ||||||
collectors: []prometheus.Collector{callCounter, cacheHitCounter}, | ||||||
} | ||||||
} | ||||||
|
||||||
func fieldAccessKey(key string) keyFunc { | ||||||
return func(req interface{}) (string, error) { | ||||||
msg, ok := req.(proto.Message) | ||||||
if !ok { | ||||||
return "", status.Errorf(codes.Internal, "request was not a protobuf message") | ||||||
} | ||||||
|
||||||
val, ok := getFieldValue(msg.ProtoReflect(), strings.Split(key, ".")) | ||||||
if !ok { | ||||||
return "", status.Errorf(codes.Internal, "Field %s does not exist in message. This is a rate limiting configuration error.", key) | ||||||
} | ||||||
return val, nil | ||||||
} | ||||||
} | ||||||
|
||||||
func getFieldValue(msg protoreflect.Message, path []string) (val string, ok bool) { | ||||||
if len(path) == 0 { | ||||||
return "", false | ||||||
} | ||||||
|
||||||
field := msg.Descriptor().Fields().ByName(protoreflect.Name(path[0])) | ||||||
if field == nil { | ||||||
return "", false | ||||||
} | ||||||
if len(path) > 1 { | ||||||
if field.Kind() != protoreflect.MessageKind { | ||||||
// we should go deeper but the field is not a message | ||||||
return "", false | ||||||
} | ||||||
child := msg.Get(field).Message() | ||||||
return getFieldValue(child, path[1:]) | ||||||
} | ||||||
|
||||||
if field.Kind() != protoreflect.StringKind { | ||||||
// we only support string fields | ||||||
return "", false | ||||||
} | ||||||
|
||||||
return msg.Get(field).String(), true | ||||||
} | ||||||
|
||||||
// RatelimitingInterceptor limits how often a gRPC function may be called. If the limit has been | ||||||
// exceeded, we'll return resource exhausted. | ||||||
type RatelimitingInterceptor map[string]ratelimitedFunction | ||||||
type RatelimitingInterceptor struct { | ||||||
functions map[string]*ratelimitedFunction | ||||||
collectors []prometheus.Collector | ||||||
} | ||||||
|
||||||
var _ prometheus.Collector = RatelimitingInterceptor{} | ||||||
|
||||||
func (r RatelimitingInterceptor) Describe(d chan<- *prometheus.Desc) { | ||||||
for _, c := range r.collectors { | ||||||
c.Describe(d) | ||||||
} | ||||||
} | ||||||
|
||||||
func (r RatelimitingInterceptor) Collect(m chan<- prometheus.Metric) { | ||||||
for _, c := range r.collectors { | ||||||
c.Collect(m) | ||||||
} | ||||||
} | ||||||
|
||||||
type counter interface { | ||||||
Inc() | ||||||
} | ||||||
|
||||||
type ratelimitedFunction struct { | ||||||
Block bool | ||||||
L *rate.Limiter | ||||||
RateLimit RateLimit | ||||||
|
||||||
GlobalLimit *rate.Limiter | ||||||
Key keyFunc | ||||||
KeyedLimit *lru.Cache | ||||||
|
||||||
RateLimitedTotal counter | ||||||
NotRateLimitedTotal counter | ||||||
CacheMissTotal counter | ||||||
} | ||||||
|
||||||
// UnaryInterceptor creates a unary interceptor that implements the rate limiting | ||||||
func (r RatelimitingInterceptor) UnaryInterceptor() grpc.UnaryServerInterceptor { | ||||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
we're not using the named returns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. they indicate what's returned though :) |
||||||
f, ok := r[info.FullMethod] | ||||||
if ok { | ||||||
if f.Block { | ||||||
err := f.L.Wait(ctx) | ||||||
if err == context.Canceled { | ||||||
return nil, err | ||||||
} | ||||||
if err != nil { | ||||||
return nil, status.Error(codes.ResourceExhausted, err.Error()) | ||||||
} | ||||||
} else if !f.L.Allow() { | ||||||
return nil, status.Error(codes.ResourceExhausted, "too many requests") | ||||||
f, ok := r.functions[info.FullMethod] | ||||||
if !ok { | ||||||
return handler(ctx, req) | ||||||
} | ||||||
|
||||||
var limit *rate.Limiter | ||||||
if f.Key == nil { | ||||||
limit = f.GlobalLimit | ||||||
} else { | ||||||
key, err := f.Key(req) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
found, _ := f.KeyedLimit.ContainsOrAdd(key, f.RateLimit.Limiter()) | ||||||
if !found && f.CacheMissTotal != nil { | ||||||
f.CacheMissTotal.Inc() | ||||||
} | ||||||
v, _ := f.KeyedLimit.Get(key) | ||||||
limit = v.(*rate.Limiter) | ||||||
} | ||||||
|
||||||
var blocked bool | ||||||
defer func() { | ||||||
if blocked && f.RateLimitedTotal != nil { | ||||||
f.RateLimitedTotal.Inc() | ||||||
} else if !blocked && f.NotRateLimitedTotal != nil { | ||||||
f.NotRateLimitedTotal.Inc() | ||||||
} | ||||||
}() | ||||||
if f.RateLimit.Block { | ||||||
err := limit.Wait(ctx) | ||||||
if err == context.Canceled { | ||||||
blocked = true | ||||||
return nil, err | ||||||
} | ||||||
if err != nil { | ||||||
blocked = true | ||||||
return nil, status.Error(codes.ResourceExhausted, err.Error()) | ||||||
} | ||||||
} else if !limit.Allow() { | ||||||
blocked = true | ||||||
return nil, status.Error(codes.ResourceExhausted, "too many requests") | ||||||
} | ||||||
|
||||||
return handler(ctx, req) | ||||||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
// Copyright (c) 2022 Gitpod GmbH. All rights reserved. | ||
// Licensed under the GNU Affero General Public License (AGPL). | ||
// See License-AGPL.txt in the project root for license information. | ||
|
||
package grpc | ||
|
||
import ( | ||
"strings" | ||
"testing" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
"google.golang.org/protobuf/proto" | ||
"google.golang.org/protobuf/types/known/apipb" | ||
"google.golang.org/protobuf/types/known/sourcecontextpb" | ||
) | ||
|
||
func TestGetFieldValue(t *testing.T) { | ||
type Expectation struct { | ||
Found bool | ||
Val string | ||
} | ||
tests := []struct { | ||
Name string | ||
Message proto.Message | ||
Path string | ||
Expectation Expectation | ||
}{ | ||
{ | ||
Name: "direct access", | ||
Message: &apipb.Api{Name: "bar"}, | ||
Path: "name", | ||
Expectation: Expectation{Found: true, Val: "bar"}, | ||
}, | ||
{ | ||
Name: "empty field", | ||
Message: &apipb.Api{}, | ||
Path: "name", | ||
Expectation: Expectation{Found: true}, | ||
}, | ||
{ | ||
Name: "non-existent field", | ||
Message: &apipb.Api{}, | ||
Path: "does-not-exist", | ||
Expectation: Expectation{Found: false}, | ||
}, | ||
{ | ||
Name: "nest struct", | ||
Message: &apipb.Api{ | ||
SourceContext: &sourcecontextpb.SourceContext{ | ||
FileName: "bar", | ||
}, | ||
}, | ||
Path: "source_context.file_name", | ||
Expectation: Expectation{Found: true, Val: "bar"}, | ||
}, | ||
} | ||
for _, test := range tests { | ||
t.Run(test.Name, func(t *testing.T) { | ||
var act Expectation | ||
act.Val, act.Found = getFieldValue(test.Message.ProtoReflect(), strings.Split(test.Path, ".")) | ||
if diff := cmp.Diff(test.Expectation, act); diff != "" { | ||
t.Errorf("unexpected getFieldValue (-want +got):\n%s", diff) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func BenchmarkGetFieldValue(b *testing.B) { | ||
msg := apipb.Api{ | ||
SourceContext: &sourcecontextpb.SourceContext{ | ||
FileName: "bar", | ||
}, | ||
} | ||
msgr := msg.ProtoReflect() | ||
path := []string{"source_context", "file_name"} | ||
// run the Fib function b.N times | ||
for n := 0; n < b.N; n++ { | ||
getFieldValue(msgr, path) | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the metrics here also need to be registered, to be picked up by prometheus, otherwise they are just local counters which won't show up on
/metrics
endpoint.See https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#Register
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I'd actually hoist the counter definition outside of this instantiation and make it package private variable. You can use https://pkg.go.dev/github.com/prometheus/client_golang/prometheus/promauto to simplify the registration.
This will work because the metrics scraper also adds labels for the pod/service so we can differentiate the various metrics when querying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It registers them here:
gitpod/components/ws-manager/cmd/run.go
Line 122 in 29ba28f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, missed this. Normally, I'd expect the registration to happen in this file and automatically without requiring an extra call. The risk here is that a component uses the rate limiting interceptor but doesn't register the metric.
The other issue is that this would panic on startup, but CI doesn't catch this.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to address this would be accept the metrics registry when constructing the interceptor. This would signal to the caller that they control the registry and need to register it.
It would also make it possible to inject a registry in tests which would allow us to test the metrics, if we wanted to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a while now we've adopted the pattern to make things implement the
prometheus.Collector
interface so that they can be registered.Testing is just as easy as passing in the registry, but it doesn't mandate that someone uses metrics.
The "registration error at runtime" behaviour is also the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I'd personally like the need for registration to be more explicit (and to use DI to pass the registry in) but given this is an existing pattern, better to stick to it than introduce yet another. Thanks for the info.