@@ -36,8 +36,7 @@ import (
36
36
"testing"
37
37
"time"
38
38
39
- "k8s.io/klog/v2"
40
-
39
+ "github.com/google/go-cmp/cmp"
41
40
v1 "k8s.io/api/core/v1"
42
41
apiequality "k8s.io/apimachinery/pkg/api/equality"
43
42
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -52,8 +51,10 @@ import (
52
51
"k8s.io/apimachinery/pkg/watch"
53
52
"k8s.io/client-go/kubernetes/scheme"
54
53
restclientwatch "k8s.io/client-go/rest/watch"
54
+ "k8s.io/client-go/tools/metrics"
55
55
"k8s.io/client-go/util/flowcontrol"
56
56
utiltesting "k8s.io/client-go/util/testing"
57
+ "k8s.io/klog/v2"
57
58
testingclock "k8s.io/utils/clock/testing"
58
59
)
59
60
@@ -2755,6 +2756,34 @@ func TestRequestWatchWithRetry(t *testing.T) {
2755
2756
})
2756
2757
}
2757
2758
2759
+ func TestRequestDoRetryWithRateLimiterBackoffAndMetrics (t * testing.T ) {
2760
+ // both request.Do and request.DoRaw have the same behavior and expectations
2761
+ testRetryWithRateLimiterBackoffAndMetrics (t , "Do" , func (ctx context.Context , r * Request ) {
2762
+ r .DoRaw (ctx )
2763
+ })
2764
+ }
2765
+
2766
+ func TestRequestStreamRetryWithRateLimiterBackoffAndMetrics (t * testing.T ) {
2767
+ testRetryWithRateLimiterBackoffAndMetrics (t , "Stream" , func (ctx context.Context , r * Request ) {
2768
+ r .Stream (ctx )
2769
+ })
2770
+ }
2771
+
2772
+ func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics (t * testing.T ) {
2773
+ testRetryWithRateLimiterBackoffAndMetrics (t , "Watch" , func (ctx context.Context , r * Request ) {
2774
+ w , err := r .Watch (ctx )
2775
+ if err == nil {
2776
+ // in this test the the response body returned by the server is always empty,
2777
+ // this will cause StreamWatcher.receive() to:
2778
+ // - return an io.EOF to indicate that the watch closed normally and
2779
+ // - then close the io.Reader
2780
+ // since we assert on the number of times 'Close' has been called on the
2781
+ // body of the response object, we need to wait here to avoid race condition.
2782
+ <- w .ResultChan ()
2783
+ }
2784
+ })
2785
+ }
2786
+
2758
2787
func testRequestWithRetry (t * testing.T , key string , doFunc func (ctx context.Context , r * Request )) {
2759
2788
type expected struct {
2760
2789
attempts int
@@ -2914,6 +2943,231 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
2914
2943
}
2915
2944
}
2916
2945
2946
+ type retryTestKeyType int
2947
+
2948
+ const retryTestKey retryTestKeyType = iota
2949
+
2950
+ // fake flowcontrol.RateLimiter so we can tap into the Wait method of the rate limiter.
2951
+ // fake BackoffManager so we can tap into backoff calls
2952
+ // fake metrics.ResultMetric to tap into the metric calls
2953
+ // we use it to verify that RateLimiter, BackoffManager, and
2954
+ // metric calls are invoked appropriately in right order.
2955
+ type withRateLimiterBackoffManagerAndMetrics struct {
2956
+ flowcontrol.RateLimiter
2957
+ * NoBackoff
2958
+ metrics.ResultMetric
2959
+ backoffWaitSeconds int
2960
+
2961
+ invokeOrderGot []string
2962
+ sleepsGot []string
2963
+ statusCodesGot []string
2964
+ }
2965
+
2966
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Wait (ctx context.Context ) error {
2967
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "RateLimiter.Wait" )
2968
+ return nil
2969
+ }
2970
+
2971
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) CalculateBackoff (actualUrl * url.URL ) time.Duration {
2972
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "BackoffManager.CalculateBackoff" )
2973
+
2974
+ // we simulate a sleep sequence of 0m, 2m, 4m, 6m, ...
2975
+ waitFor := time .Duration (lb .backoffWaitSeconds ) * time .Minute
2976
+ lb .backoffWaitSeconds += 2
2977
+ return waitFor
2978
+ }
2979
+
2980
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) UpdateBackoff (actualUrl * url.URL , err error , responseCode int ) {
2981
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "BackoffManager.UpdateBackoff" )
2982
+ }
2983
+
2984
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Sleep (d time.Duration ) {
2985
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "BackoffManager.Sleep" )
2986
+ lb .sleepsGot = append (lb .sleepsGot , d .String ())
2987
+ }
2988
+
2989
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Increment (ctx context.Context , code , _ , _ string ) {
2990
+ // we are interested in the request context that is marked by this test
2991
+ if marked , ok := ctx .Value (retryTestKey ).(bool ); ok && marked {
2992
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "RequestResult.Increment" )
2993
+ lb .statusCodesGot = append (lb .statusCodesGot , code )
2994
+ }
2995
+ }
2996
+
2997
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Do () {
2998
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "Client.Do" )
2999
+ }
3000
+
3001
+ func testRetryWithRateLimiterBackoffAndMetrics (t * testing.T , key string , doFunc func (ctx context.Context , r * Request )) {
3002
+ type expected struct {
3003
+ attempts int
3004
+ order []string
3005
+ }
3006
+
3007
+ // we define the expected order of how the client invokes the
3008
+ // rate limiter, backoff, and metrics methods.
3009
+ // scenario:
3010
+ // - A: original request fails with a retryable response: (500, 'Retry-After: 1')
3011
+ // - B: retry 1: successful with a status code 200
3012
+ // so we have a total of 2 attempts
3013
+ invokeOrderWant := []string {
3014
+ // before we send the request to the server:
3015
+ // - we wait as dictated by the client rate lmiter
3016
+ // - we wait, as dictated by the backoff manager
3017
+ "RateLimiter.Wait" ,
3018
+ "BackoffManager.CalculateBackoff" ,
3019
+ "BackoffManager.Sleep" ,
3020
+
3021
+ // A: first attempt for which the server sends a retryable response
3022
+ "Client.Do" ,
3023
+
3024
+ // we got a response object, status code: 500, Retry-Afer: 1
3025
+ // - call metrics method with appropriate status code
3026
+ // - update backoff parameters with the status code returned
3027
+ // - sleep for N seconds from 'Retry-After: N' response header
3028
+ "RequestResult.Increment" ,
3029
+ "BackoffManager.UpdateBackoff" ,
3030
+ "BackoffManager.Sleep" ,
3031
+ // sleep for delay dictated by backoff parameters
3032
+ "BackoffManager.CalculateBackoff" ,
3033
+ "BackoffManager.Sleep" ,
3034
+ // wait as dictated by the client rate lmiter
3035
+ "RateLimiter.Wait" ,
3036
+
3037
+ // B: 2nd attempt: retry, and this should return a status code=200
3038
+ "Client.Do" ,
3039
+
3040
+ // it's a success, so do the following:
3041
+ // - call metrics and update backoff parameters
3042
+ "RequestResult.Increment" ,
3043
+ "BackoffManager.UpdateBackoff" ,
3044
+ }
3045
+ sleepWant := []string {
3046
+ // initial backoff.Sleep before we send the request to the server for the first time
3047
+ "0s" ,
3048
+ // from 'Retry-After: 1' response header (A)
3049
+ (1 * time .Second ).String (),
3050
+ // backoff.Sleep before retry 1 (B)
3051
+ (2 * time .Minute ).String (),
3052
+ }
3053
+ statusCodesWant := []string {
3054
+ "500" ,
3055
+ "200" ,
3056
+ }
3057
+
3058
+ tests := []struct {
3059
+ name string
3060
+ maxRetries int
3061
+ serverReturns []responseErr
3062
+ // expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
3063
+ expectations map [string ]expected
3064
+ }{
3065
+ {
3066
+ name : "success after one retry" ,
3067
+ maxRetries : 1 ,
3068
+ serverReturns : []responseErr {
3069
+ {response : retryAfterResponse (), err : nil },
3070
+ {response : & http.Response {StatusCode : http .StatusOK }, err : nil },
3071
+ },
3072
+ expectations : map [string ]expected {
3073
+ "Do" : {
3074
+ attempts : 2 ,
3075
+ order : invokeOrderWant ,
3076
+ },
3077
+ "Watch" : {
3078
+ attempts : 2 ,
3079
+ // Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
3080
+ order : invokeOrderWant [1 :],
3081
+ },
3082
+ "Stream" : {
3083
+ attempts : 2 ,
3084
+ order : invokeOrderWant ,
3085
+ },
3086
+ },
3087
+ },
3088
+ }
3089
+
3090
+ for _ , test := range tests {
3091
+ t .Run (test .name , func (t * testing.T ) {
3092
+ interceptor := & withRateLimiterBackoffManagerAndMetrics {
3093
+ RateLimiter : flowcontrol .NewFakeAlwaysRateLimiter (),
3094
+ NoBackoff : & NoBackoff {},
3095
+ }
3096
+
3097
+ // TODO: today this is the only site where a test overrides the
3098
+ // default metric interfaces, in future if we other tests want
3099
+ // to override as well, and we want tests to be able to run in
3100
+ // parallel then we will need to provide a way for tests to
3101
+ // register/deregister their own metric inerfaces.
3102
+ old := metrics .RequestResult
3103
+ metrics .RequestResult = interceptor
3104
+ defer func () {
3105
+ metrics .RequestResult = old
3106
+ }()
3107
+
3108
+ ctx , cancel := context .WithCancel (context .Background ())
3109
+ defer cancel ()
3110
+ // we are changing metrics.RequestResult (a global state) in
3111
+ // this test, to avoid interference from other tests running in
3112
+ // parallel we need to associate a key to the context so we
3113
+ // can identify the metric calls associated with this test.
3114
+ ctx = context .WithValue (ctx , retryTestKey , true )
3115
+
3116
+ var attempts int
3117
+ client := clientForFunc (func (req * http.Request ) (* http.Response , error ) {
3118
+ defer func () {
3119
+ attempts ++
3120
+ }()
3121
+
3122
+ interceptor .Do ()
3123
+ resp := test .serverReturns [attempts ].response
3124
+ if resp != nil {
3125
+ resp .Body = ioutil .NopCloser (bytes .NewReader ([]byte {}))
3126
+ }
3127
+ return resp , test .serverReturns [attempts ].err
3128
+ })
3129
+
3130
+ base , err := url .Parse ("http://foo.bar" )
3131
+ if err != nil {
3132
+ t .Fatalf ("Wrong test setup - did not find expected for: %s" , key )
3133
+ }
3134
+ req := & Request {
3135
+ verb : "GET" ,
3136
+ body : bytes .NewReader ([]byte {}),
3137
+ c : & RESTClient {
3138
+ base : base ,
3139
+ content : defaultContentConfig (),
3140
+ Client : client ,
3141
+ rateLimiter : interceptor ,
3142
+ },
3143
+ pathPrefix : "/api/v1" ,
3144
+ rateLimiter : interceptor ,
3145
+ backoff : interceptor ,
3146
+ retry : & withRetry {maxRetries : test .maxRetries },
3147
+ }
3148
+
3149
+ doFunc (ctx , req )
3150
+
3151
+ want , ok := test .expectations [key ]
3152
+ if ! ok {
3153
+ t .Fatalf ("Wrong test setup - did not find expected for: %s" , key )
3154
+ }
3155
+ if want .attempts != attempts {
3156
+ t .Errorf ("%s: Expected retries: %d, but got: %d" , key , want .attempts , attempts )
3157
+ }
3158
+ if ! cmp .Equal (want .order , interceptor .invokeOrderGot ) {
3159
+ t .Errorf ("%s: Expected invoke order to match, diff: %s" , key , cmp .Diff (want .order , interceptor .invokeOrderGot ))
3160
+ }
3161
+ if ! cmp .Equal (sleepWant , interceptor .sleepsGot ) {
3162
+ t .Errorf ("%s: Expected sleep sequence to match, diff: %s" , key , cmp .Diff (sleepWant , interceptor .sleepsGot ))
3163
+ }
3164
+ if ! cmp .Equal (statusCodesWant , interceptor .statusCodesGot ) {
3165
+ t .Errorf ("%s: Expected status codes to match, diff: %s" , key , cmp .Diff (statusCodesWant , interceptor .statusCodesGot ))
3166
+ }
3167
+ })
3168
+ }
3169
+ }
3170
+
2917
3171
func TestReuseRequest (t * testing.T ) {
2918
3172
var tests = []struct {
2919
3173
name string
0 commit comments