@@ -36,8 +36,7 @@ import (
36
36
"testing"
37
37
"time"
38
38
39
- "k8s.io/klog/v2"
40
-
39
+ "github.com/google/go-cmp/cmp"
41
40
v1 "k8s.io/api/core/v1"
42
41
apiequality "k8s.io/apimachinery/pkg/api/equality"
43
42
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -52,8 +51,10 @@ import (
52
51
"k8s.io/apimachinery/pkg/watch"
53
52
"k8s.io/client-go/kubernetes/scheme"
54
53
restclientwatch "k8s.io/client-go/rest/watch"
54
+ "k8s.io/client-go/tools/metrics"
55
55
"k8s.io/client-go/util/flowcontrol"
56
56
utiltesting "k8s.io/client-go/util/testing"
57
+ "k8s.io/klog/v2"
57
58
testingclock "k8s.io/utils/clock/testing"
58
59
)
59
60
@@ -2555,6 +2556,34 @@ func TestRequestWatchWithRetry(t *testing.T) {
2555
2556
})
2556
2557
}
2557
2558
2559
+ func TestRequestDoRetryWithRateLimiterBackoffAndMetrics (t * testing.T ) {
2560
+ // both request.Do and request.DoRaw have the same behavior and expectations
2561
+ testRetryWithRateLimiterBackoffAndMetrics (t , "Do" , func (ctx context.Context , r * Request ) {
2562
+ r .DoRaw (ctx )
2563
+ })
2564
+ }
2565
+
2566
+ func TestRequestStreamRetryWithRateLimiterBackoffAndMetrics (t * testing.T ) {
2567
+ testRetryWithRateLimiterBackoffAndMetrics (t , "Stream" , func (ctx context.Context , r * Request ) {
2568
+ r .Stream (ctx )
2569
+ })
2570
+ }
2571
+
2572
+ func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics (t * testing.T ) {
2573
+ testRetryWithRateLimiterBackoffAndMetrics (t , "Watch" , func (ctx context.Context , r * Request ) {
2574
+ w , err := r .Watch (ctx )
2575
+ if err == nil {
2576
+ // in this test the the response body returned by the server is always empty,
2577
+ // this will cause StreamWatcher.receive() to:
2578
+ // - return an io.EOF to indicate that the watch closed normally and
2579
+ // - then close the io.Reader
2580
+ // since we assert on the number of times 'Close' has been called on the
2581
+ // body of the response object, we need to wait here to avoid race condition.
2582
+ <- w .ResultChan ()
2583
+ }
2584
+ })
2585
+ }
2586
+
2558
2587
func testRequestWithRetry (t * testing.T , key string , doFunc func (ctx context.Context , r * Request )) {
2559
2588
type expected struct {
2560
2589
attempts int
@@ -2714,6 +2743,231 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
2714
2743
}
2715
2744
}
2716
2745
2746
+ type retryTestKeyType int
2747
+
2748
+ const retryTestKey retryTestKeyType = iota
2749
+
2750
+ // fake flowcontrol.RateLimiter so we can tap into the Wait method of the rate limiter.
2751
+ // fake BackoffManager so we can tap into backoff calls
2752
+ // fake metrics.ResultMetric to tap into the metric calls
2753
+ // we use it to verify that RateLimiter, BackoffManager, and
2754
+ // metric calls are invoked appropriately in right order.
2755
+ type withRateLimiterBackoffManagerAndMetrics struct {
2756
+ flowcontrol.RateLimiter
2757
+ * NoBackoff
2758
+ metrics.ResultMetric
2759
+ backoffWaitSeconds int
2760
+
2761
+ invokeOrderGot []string
2762
+ sleepsGot []string
2763
+ statusCodesGot []string
2764
+ }
2765
+
2766
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Wait (ctx context.Context ) error {
2767
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "RateLimiter.Wait" )
2768
+ return nil
2769
+ }
2770
+
2771
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) CalculateBackoff (actualUrl * url.URL ) time.Duration {
2772
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "BackoffManager.CalculateBackoff" )
2773
+
2774
+ // we simulate a sleep sequence of 0m, 2m, 4m, 6m, ...
2775
+ waitFor := time .Duration (lb .backoffWaitSeconds ) * time .Minute
2776
+ lb .backoffWaitSeconds += 2
2777
+ return waitFor
2778
+ }
2779
+
2780
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) UpdateBackoff (actualUrl * url.URL , err error , responseCode int ) {
2781
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "BackoffManager.UpdateBackoff" )
2782
+ }
2783
+
2784
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Sleep (d time.Duration ) {
2785
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "BackoffManager.Sleep" )
2786
+ lb .sleepsGot = append (lb .sleepsGot , d .String ())
2787
+ }
2788
+
2789
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Increment (ctx context.Context , code , _ , _ string ) {
2790
+ // we are interested in the request context that is marked by this test
2791
+ if marked , ok := ctx .Value (retryTestKey ).(bool ); ok && marked {
2792
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "RequestResult.Increment" )
2793
+ lb .statusCodesGot = append (lb .statusCodesGot , code )
2794
+ }
2795
+ }
2796
+
2797
+ func (lb * withRateLimiterBackoffManagerAndMetrics ) Do () {
2798
+ lb .invokeOrderGot = append (lb .invokeOrderGot , "Client.Do" )
2799
+ }
2800
+
2801
+ func testRetryWithRateLimiterBackoffAndMetrics (t * testing.T , key string , doFunc func (ctx context.Context , r * Request )) {
2802
+ type expected struct {
2803
+ attempts int
2804
+ order []string
2805
+ }
2806
+
2807
+ // we define the expected order of how the client invokes the
2808
+ // rate limiter, backoff, and metrics methods.
2809
+ // scenario:
2810
+ // - A: original request fails with a retryable response: (500, 'Retry-After: 1')
2811
+ // - B: retry 1: successful with a status code 200
2812
+ // so we have a total of 2 attempts
2813
+ invokeOrderWant := []string {
2814
+ // before we send the request to the server:
2815
+ // - we wait as dictated by the client rate lmiter
2816
+ // - we wait, as dictated by the backoff manager
2817
+ "RateLimiter.Wait" ,
2818
+ "BackoffManager.CalculateBackoff" ,
2819
+ "BackoffManager.Sleep" ,
2820
+
2821
+ // A: first attempt for which the server sends a retryable response
2822
+ "Client.Do" ,
2823
+
2824
+ // we got a response object, status code: 500, Retry-Afer: 1
2825
+ // - call metrics method with appropriate status code
2826
+ // - update backoff parameters with the status code returned
2827
+ // - sleep for N seconds from 'Retry-After: N' response header
2828
+ "RequestResult.Increment" ,
2829
+ "BackoffManager.UpdateBackoff" ,
2830
+ "BackoffManager.Sleep" ,
2831
+ // sleep for delay dictated by backoff parameters
2832
+ "BackoffManager.CalculateBackoff" ,
2833
+ "BackoffManager.Sleep" ,
2834
+ // wait as dictated by the client rate lmiter
2835
+ "RateLimiter.Wait" ,
2836
+
2837
+ // B: 2nd attempt: retry, and this should return a status code=200
2838
+ "Client.Do" ,
2839
+
2840
+ // it's a success, so do the following:
2841
+ // - call metrics and update backoff parameters
2842
+ "RequestResult.Increment" ,
2843
+ "BackoffManager.UpdateBackoff" ,
2844
+ }
2845
+ sleepWant := []string {
2846
+ // initial backoff.Sleep before we send the request to the server for the first time
2847
+ "0s" ,
2848
+ // from 'Retry-After: 1' response header (A)
2849
+ (1 * time .Second ).String (),
2850
+ // backoff.Sleep before retry 1 (B)
2851
+ (2 * time .Minute ).String (),
2852
+ }
2853
+ statusCodesWant := []string {
2854
+ "500" ,
2855
+ "200" ,
2856
+ }
2857
+
2858
+ tests := []struct {
2859
+ name string
2860
+ maxRetries int
2861
+ serverReturns []responseErr
2862
+ // expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
2863
+ expectations map [string ]expected
2864
+ }{
2865
+ {
2866
+ name : "success after one retry" ,
2867
+ maxRetries : 1 ,
2868
+ serverReturns : []responseErr {
2869
+ {response : retryAfterResponse (), err : nil },
2870
+ {response : & http.Response {StatusCode : http .StatusOK }, err : nil },
2871
+ },
2872
+ expectations : map [string ]expected {
2873
+ "Do" : {
2874
+ attempts : 2 ,
2875
+ order : invokeOrderWant ,
2876
+ },
2877
+ "Watch" : {
2878
+ attempts : 2 ,
2879
+ // Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
2880
+ order : invokeOrderWant [1 :],
2881
+ },
2882
+ "Stream" : {
2883
+ attempts : 2 ,
2884
+ order : invokeOrderWant ,
2885
+ },
2886
+ },
2887
+ },
2888
+ }
2889
+
2890
+ for _ , test := range tests {
2891
+ t .Run (test .name , func (t * testing.T ) {
2892
+ interceptor := & withRateLimiterBackoffManagerAndMetrics {
2893
+ RateLimiter : flowcontrol .NewFakeAlwaysRateLimiter (),
2894
+ NoBackoff : & NoBackoff {},
2895
+ }
2896
+
2897
+ // TODO: today this is the only site where a test overrides the
2898
+ // default metric interfaces, in future if we other tests want
2899
+ // to override as well, and we want tests to be able to run in
2900
+ // parallel then we will need to provide a way for tests to
2901
+ // register/deregister their own metric inerfaces.
2902
+ old := metrics .RequestResult
2903
+ metrics .RequestResult = interceptor
2904
+ defer func () {
2905
+ metrics .RequestResult = old
2906
+ }()
2907
+
2908
+ ctx , cancel := context .WithCancel (context .Background ())
2909
+ defer cancel ()
2910
+ // we are changing metrics.RequestResult (a global state) in
2911
+ // this test, to avoid interference from other tests running in
2912
+ // parallel we need to associate a key to the context so we
2913
+ // can identify the metric calls associated with this test.
2914
+ ctx = context .WithValue (ctx , retryTestKey , true )
2915
+
2916
+ var attempts int
2917
+ client := clientForFunc (func (req * http.Request ) (* http.Response , error ) {
2918
+ defer func () {
2919
+ attempts ++
2920
+ }()
2921
+
2922
+ interceptor .Do ()
2923
+ resp := test .serverReturns [attempts ].response
2924
+ if resp != nil {
2925
+ resp .Body = ioutil .NopCloser (bytes .NewReader ([]byte {}))
2926
+ }
2927
+ return resp , test .serverReturns [attempts ].err
2928
+ })
2929
+
2930
+ base , err := url .Parse ("http://foo.bar" )
2931
+ if err != nil {
2932
+ t .Fatalf ("Wrong test setup - did not find expected for: %s" , key )
2933
+ }
2934
+ req := & Request {
2935
+ verb : "GET" ,
2936
+ body : bytes .NewReader ([]byte {}),
2937
+ c : & RESTClient {
2938
+ base : base ,
2939
+ content : defaultContentConfig (),
2940
+ Client : client ,
2941
+ rateLimiter : interceptor ,
2942
+ },
2943
+ pathPrefix : "/api/v1" ,
2944
+ rateLimiter : interceptor ,
2945
+ backoff : interceptor ,
2946
+ retry : & withRetry {maxRetries : test .maxRetries },
2947
+ }
2948
+
2949
+ doFunc (ctx , req )
2950
+
2951
+ want , ok := test .expectations [key ]
2952
+ if ! ok {
2953
+ t .Fatalf ("Wrong test setup - did not find expected for: %s" , key )
2954
+ }
2955
+ if want .attempts != attempts {
2956
+ t .Errorf ("%s: Expected retries: %d, but got: %d" , key , want .attempts , attempts )
2957
+ }
2958
+ if ! cmp .Equal (want .order , interceptor .invokeOrderGot ) {
2959
+ t .Errorf ("%s: Expected invoke order to match, diff: %s" , key , cmp .Diff (want .order , interceptor .invokeOrderGot ))
2960
+ }
2961
+ if ! cmp .Equal (sleepWant , interceptor .sleepsGot ) {
2962
+ t .Errorf ("%s: Expected sleep sequence to match, diff: %s" , key , cmp .Diff (sleepWant , interceptor .sleepsGot ))
2963
+ }
2964
+ if ! cmp .Equal (statusCodesWant , interceptor .statusCodesGot ) {
2965
+ t .Errorf ("%s: Expected status codes to match, diff: %s" , key , cmp .Diff (statusCodesWant , interceptor .statusCodesGot ))
2966
+ }
2967
+ })
2968
+ }
2969
+ }
2970
+
2717
2971
func TestReuseRequest (t * testing.T ) {
2718
2972
var tests = []struct {
2719
2973
name string
0 commit comments