Skip to content

Commit a9041ef

Browse files
punkwalkermjlshen
authored andcommitted
implements Metrics Middleware for AWS SDK Go V2
Signed-off-by: Pankaj Walke <[email protected]>
1 parent ba898fb commit a9041ef

File tree

2 files changed

+235
-61
lines changed

2 files changed

+235
-61
lines changed

pkg/cloud/metricsv2/metrics.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package metrics provides a way to capture request metrics.
18+
package metricsv2
19+
20+
import (
21+
"context"
22+
"errors"
23+
"fmt"
24+
"net/http"
25+
"strconv"
26+
"time"
27+
28+
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
29+
"github.com/aws/smithy-go"
30+
"github.com/aws/smithy-go/middleware"
31+
smithyhttp "github.com/aws/smithy-go/transport/http"
32+
"github.com/prometheus/client_golang/prometheus"
33+
"k8s.io/apimachinery/pkg/runtime"
34+
"sigs.k8s.io/controller-runtime/pkg/metrics"
35+
36+
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/record"
37+
"sigs.k8s.io/cluster-api-provider-aws/v2/version"
38+
)
39+
40+
const (
41+
metricAWSSubsystem = "aws"
42+
metricRequestCountKey = "api_requests_total_v2"
43+
metricRequestDurationKey = "api_request_duration_seconds_v2"
44+
metricAPICallRetries = "api_call_retries_v2"
45+
metricServiceLabel = "service"
46+
metricRegionLabel = "region"
47+
metricOperationLabel = "operation"
48+
metricControllerLabel = "controller"
49+
metricStatusCodeLabel = "status_code"
50+
metricErrorCodeLabel = "error_code"
51+
)
52+
53+
var (
54+
awsRequestCount = prometheus.NewCounterVec(prometheus.CounterOpts{
55+
Subsystem: metricAWSSubsystem,
56+
Name: metricRequestCountKey,
57+
Help: "Total number of AWS requests",
58+
}, []string{metricControllerLabel, metricServiceLabel, metricRegionLabel, metricOperationLabel, metricStatusCodeLabel, metricErrorCodeLabel})
59+
awsRequestDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
60+
Subsystem: metricAWSSubsystem,
61+
Name: metricRequestDurationKey,
62+
Help: "Latency of HTTP requests to AWS",
63+
}, []string{metricControllerLabel, metricServiceLabel, metricRegionLabel, metricOperationLabel})
64+
awsCallRetries = prometheus.NewHistogramVec(prometheus.HistogramOpts{
65+
Subsystem: metricAWSSubsystem,
66+
Name: metricAPICallRetries,
67+
Help: "Number of retries made against an AWS API",
68+
Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
69+
}, []string{metricControllerLabel, metricServiceLabel, metricRegionLabel, metricOperationLabel})
70+
getRawResponse = func(metadata middleware.Metadata) *http.Response {
71+
switch res := awsmiddleware.GetRawResponse(metadata).(type) {
72+
case *http.Response:
73+
return res
74+
default:
75+
return nil
76+
}
77+
}
78+
)
79+
80+
func init() {
81+
metrics.Registry.MustRegister(awsRequestCount)
82+
metrics.Registry.MustRegister(awsRequestDurationSeconds)
83+
metrics.Registry.MustRegister(awsCallRetries)
84+
}
85+
86+
type requestContextKey struct{}
87+
88+
type RequestData struct {
89+
RequestStartTime time.Time
90+
RequestEndTime time.Time
91+
StatusCode int
92+
ErrorCode string
93+
RequestCount int
94+
Service string
95+
OperationName string
96+
Region string
97+
UserAgent string
98+
Controller string
99+
Target runtime.Object
100+
Attempts int
101+
}
102+
103+
// Inspired by https://github.com/jonathan-innis/aws-sdk-go-prometheus/v2
104+
func WithMiddlewares(controller string, target runtime.Object) func(stack *middleware.Stack) error {
105+
return func(stack *middleware.Stack) error {
106+
if err := stack.Initialize.Add(getMetricCollectionMiddleware(controller, target), middleware.Before); err != nil {
107+
return err
108+
}
109+
if err := stack.Build.Add(getAddToUserAgentMiddleware(), middleware.Before); err != nil {
110+
return err
111+
}
112+
if err := stack.Finalize.Add(getRequestMetricContextMiddleware(), middleware.Before); err != nil {
113+
return err
114+
}
115+
if err := stack.Finalize.Insert(getAttemptContextMiddleware(), "Retry", middleware.After); err != nil {
116+
return err
117+
}
118+
if err := stack.Deserialize.Add(getRecordAWSPermissionsIssueMiddleware(target), middleware.After); err != nil {
119+
return err
120+
}
121+
return nil
122+
}
123+
}
124+
125+
func getMetricCollectionMiddleware(controller string, target runtime.Object) middleware.InitializeMiddleware {
126+
return middleware.InitializeMiddlewareFunc("capa/MetricCollectionMiddleware", func(ctx context.Context, input middleware.InitializeInput, handler middleware.InitializeHandler) (middleware.InitializeOutput, middleware.Metadata, error) {
127+
ctx = initRequestContext(ctx, controller, target)
128+
request := getContext(ctx)
129+
130+
request.RequestStartTime = time.Now().UTC()
131+
out, metadata, err := handler.HandleInitialize(ctx, input)
132+
request.RequestEndTime = time.Now().UTC()
133+
134+
request.CaptureRequestMetrics()
135+
136+
return out, metadata, err
137+
})
138+
}
139+
140+
func getRequestMetricContextMiddleware() middleware.FinalizeMiddleware {
141+
return middleware.FinalizeMiddlewareFunc("capa/RequestMetricContextMiddleware", func(ctx context.Context, input middleware.FinalizeInput, handler middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
142+
request := getContext(ctx)
143+
request.Service = awsmiddleware.GetServiceID(ctx)
144+
request.OperationName = awsmiddleware.GetOperationName(ctx)
145+
request.Region = awsmiddleware.GetRegion(ctx)
146+
147+
return handler.HandleFinalize(ctx, input)
148+
})
149+
}
150+
151+
// For capturing retry count and status codes
152+
func getAttemptContextMiddleware() middleware.FinalizeMiddleware {
153+
return middleware.FinalizeMiddlewareFunc("capa/AttemptMetricContextMiddleware", func(ctx context.Context, input middleware.FinalizeInput, handler middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
154+
request := getContext(ctx)
155+
request.Attempts++
156+
out, metadata, err := handler.HandleFinalize(ctx, input)
157+
response := getRawResponse(metadata)
158+
159+
// This will record only last attempts status code.
160+
// Can be further extended to capture status codes of all attempts
161+
if response != nil {
162+
request.StatusCode = response.StatusCode
163+
} else {
164+
request.StatusCode = -1
165+
}
166+
167+
return out, metadata, err
168+
})
169+
}
170+
171+
func getRecordAWSPermissionsIssueMiddleware(target runtime.Object) middleware.DeserializeMiddleware {
172+
return middleware.DeserializeMiddlewareFunc("capa/RecordAWSPermissionsIssueMiddleware", func(ctx context.Context, input middleware.DeserializeInput, handler middleware.DeserializeHandler) (middleware.DeserializeOutput, middleware.Metadata, error) {
173+
r, ok := input.Request.(*smithyhttp.ResponseError)
174+
if !ok {
175+
return middleware.DeserializeOutput{}, middleware.Metadata{}, fmt.Errorf("unknown transport type %T", input.Request)
176+
}
177+
178+
var ae smithy.APIError
179+
if errors.As(r.Err, &ae) {
180+
switch ae.ErrorCode() {
181+
case "AuthFailure", "UnauthorizedOperation", "NoCredentialProviders":
182+
record.Warnf(target, ae.ErrorCode(), "Operation %s failed with a credentials or permission issue", awsmiddleware.GetOperationName(ctx))
183+
}
184+
}
185+
return handler.HandleDeserialize(ctx, input)
186+
})
187+
}
188+
189+
func getAddToUserAgentMiddleware() middleware.BuildMiddleware {
190+
return middleware.BuildMiddlewareFunc("capa/AddUserAgentMiddleware", func(ctx context.Context, input middleware.BuildInput, handler middleware.BuildHandler) (middleware.BuildOutput, middleware.Metadata, error) {
191+
request := getContext(ctx)
192+
r, ok := input.Request.(*smithyhttp.Request)
193+
if !ok {
194+
return middleware.BuildOutput{}, middleware.Metadata{}, fmt.Errorf("unknown transport type %T", input.Request)
195+
}
196+
197+
if curUA := r.Header.Get("User-Agent"); curUA != "" {
198+
request.UserAgent = curUA + " " + request.UserAgent
199+
}
200+
r.Header.Set("User-Agent", request.UserAgent)
201+
202+
return handler.HandleBuild(ctx, input)
203+
})
204+
}
205+
206+
func initRequestContext(ctx context.Context, controller string, target runtime.Object) context.Context {
207+
if middleware.GetStackValue(ctx, requestContextKey{}) == nil {
208+
ctx = middleware.WithStackValue(ctx, requestContextKey{}, &RequestData{
209+
Controller: controller,
210+
Target: target,
211+
UserAgent: fmt.Sprintf("aws.cluster.x-k8s.io/%s", version.Get().String()),
212+
})
213+
}
214+
return ctx
215+
}
216+
217+
func getContext(ctx context.Context) *RequestData {
218+
rctx := middleware.GetStackValue(ctx, requestContextKey{})
219+
if rctx == nil {
220+
return nil
221+
}
222+
return rctx.(*RequestData)
223+
}
224+
225+
// CaptureRequestMetrics will monitor and capture request metrics.
226+
func (r *RequestData) CaptureRequestMetrics() {
227+
requestDuration := r.RequestStartTime.Sub(r.RequestEndTime)
228+
retryCount := r.Attempts - 1
229+
230+
awsRequestCount.WithLabelValues(r.Controller, r.Service, r.Region, r.OperationName, strconv.Itoa(r.StatusCode), r.ErrorCode).Inc()
231+
awsRequestDurationSeconds.WithLabelValues(r.Controller, r.Service, r.Region, r.OperationName).Observe(requestDuration.Seconds())
232+
awsCallRetries.WithLabelValues(r.Controller, r.Service, r.Region, r.OperationName).Observe(float64(retryCount))
233+
}

pkg/cloud/scope/clients.go

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ limitations under the License.
1717
package scope
1818

1919
import (
20-
"context"
21-
"fmt"
22-
23-
awsv2middleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
2420
"github.com/aws/aws-sdk-go-v2/service/s3"
2521
"github.com/aws/aws-sdk-go/aws"
2622
"github.com/aws/aws-sdk-go/aws/awserr"
@@ -49,16 +45,13 @@ import (
4945
"github.com/aws/aws-sdk-go/service/ssm/ssmiface"
5046
"github.com/aws/aws-sdk-go/service/sts"
5147
"github.com/aws/aws-sdk-go/service/sts/stsiface"
52-
"github.com/aws/smithy-go"
53-
"github.com/aws/smithy-go/middleware"
54-
smithyhttp "github.com/aws/smithy-go/transport/http"
55-
"github.com/pkg/errors"
5648
"k8s.io/apimachinery/pkg/runtime"
5749

5850
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud"
5951
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/endpointsv2"
6052
awslogs "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/logs"
6153
awsmetrics "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/metrics"
54+
awsmetricsv2 "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/metricsv2"
6255
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger"
6356
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/record"
6457
"sigs.k8s.io/cluster-api-provider-aws/v2/version"
@@ -220,20 +213,8 @@ func NewS3Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger logge
220213
o.ClientLogMode = awslogs.GetAWSLogLevelV2(logger.GetLogger())
221214
o.EndpointResolverV2 = s3EndpointResolver
222215
},
223-
s3.WithAPIOptions(
224-
func(stack *middleware.Stack) error {
225-
return stack.Build.Add(getUserAgentHandlerV2(), middleware.Before)
226-
},
227-
func(stack *middleware.Stack) error {
228-
return stack.Deserialize.Add(recordAWSPermissionsIssueV2(target), middleware.After)
229-
},
230-
),
216+
s3.WithAPIOptions(awsmetricsv2.WithMiddlewares(scopeUser.ControllerName(), target)),
231217
}
232-
// TODO: https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/sdk-timing.html
233-
// cfg.APIOptions = append(cfg.APIOptions, func(stack *middleware.Stack) error {
234-
// return stack.Deserialize.Add(awsmetrics.CaptureRequestMetricsV2(scopeUser.ControllerName()), middleware.Before)
235-
// })
236-
237218
return s3.NewFromConfig(cfg, s3Opts...)
238219
}
239220

@@ -248,53 +229,13 @@ func recordAWSPermissionsIssue(target runtime.Object) func(r *request.Request) {
248229
}
249230
}
250231

251-
func recordAWSPermissionsIssueV2(target runtime.Object) middleware.DeserializeMiddleware {
252-
return middleware.DeserializeMiddlewareFunc("capa/aws-permission-issue", func(ctx context.Context, input middleware.DeserializeInput, handler middleware.DeserializeHandler) (middleware.DeserializeOutput, middleware.Metadata, error) {
253-
r, ok := input.Request.(*smithyhttp.ResponseError)
254-
if !ok {
255-
return middleware.DeserializeOutput{}, middleware.Metadata{}, fmt.Errorf("unknown transport type %T", input.Request)
256-
}
257-
258-
var ae smithy.APIError
259-
if errors.As(r.Err, &ae) {
260-
switch ae.ErrorCode() {
261-
case "AuthFailure", "UnauthorizedOperation", "NoCredentialProviders":
262-
record.Warnf(target, ae.ErrorCode(), "Operation %s failed with a credentials or permission issue", awsv2middleware.GetOperationName(ctx))
263-
}
264-
}
265-
return handler.HandleDeserialize(ctx, input)
266-
})
267-
}
268-
269232
func getUserAgentHandler() request.NamedHandler {
270233
return request.NamedHandler{
271234
Name: "capa/user-agent",
272235
Fn: request.MakeAddToUserAgentHandler("aws.cluster.x-k8s.io", version.Get().String()),
273236
}
274237
}
275238

276-
func getUserAgentHandlerV2() middleware.BuildMiddleware {
277-
capaUserAgent := fmt.Sprintf("aws.cluster.x-k8s.io/%s", version.Get().String())
278-
return middleware.BuildMiddlewareFunc("capa/user-agent", makeAddToUserAgentHandler(capaUserAgent))
279-
}
280-
281-
// aws-sdk-go-v2 version of https://pkg.go.dev/github.com/aws/aws-sdk-go/aws/[email protected]#AddToUserAgent
282-
func makeAddToUserAgentHandler(s string) func(context.Context, middleware.BuildInput, middleware.BuildHandler) (middleware.BuildOutput, middleware.Metadata, error) {
283-
return func(ctx context.Context, input middleware.BuildInput, handler middleware.BuildHandler) (middleware.BuildOutput, middleware.Metadata, error) {
284-
r, ok := input.Request.(*smithyhttp.Request)
285-
if !ok {
286-
return middleware.BuildOutput{}, middleware.Metadata{}, fmt.Errorf("unknown transport type %T", input.Request)
287-
}
288-
289-
if curUA := r.Header.Get("User-Agent"); curUA != "" {
290-
s = curUA + " " + s
291-
}
292-
r.Header.Set("User-Agent", s)
293-
294-
return handler.HandleBuild(ctx, input)
295-
}
296-
}
297-
298239
// AWSClients contains all the aws clients used by the scopes.
299240
type AWSClients struct {
300241
ASG autoscalingiface.AutoScalingAPI

0 commit comments

Comments
 (0)