Skip to content

Commit f95917d

Browse files
p0lyn0mialbertinatto
authored andcommitted
UPSTREAM: <carry>: retry etcd Unavailable errors
This commit renews #327 What has changed compared to the original PR is: - The retryClient interface has been adapted to storage.Interface. - The isRetriableEtcdError method has been completely changed; it seems that previously the error we wanted to retry was not being retried. Even the unit tests were failing. Overall, I still think this is not the correct fix. The proper fix should be added to the etcd client. UPSTREAM: <carry>: retry etcd Unavailable errors This is the second commit for the retry logic. This commit adds unit tests and slightly improves the logging. During a rebase squash with the previous one. UPSTREAM: <carry>: retry_etcdclient: expose retry logic functionality during rebase merge with: UPSTREAM: <carry>: retry etcd Unavailable errors
1 parent c4d5d8f commit f95917d

File tree

4 files changed

+343
-1
lines changed

4 files changed

+343
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package etcd3retry
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
8+
"google.golang.org/grpc/codes"
9+
10+
"k8s.io/apimachinery/pkg/runtime"
11+
"k8s.io/apimachinery/pkg/util/wait"
12+
"k8s.io/apimachinery/pkg/watch"
13+
"k8s.io/apiserver/pkg/storage"
14+
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
15+
"k8s.io/klog/v2"
16+
)
17+
18+
var DefaultRetry = wait.Backoff{
19+
Duration: 300 * time.Millisecond,
20+
Factor: 2, // double the timeout for every failure
21+
Jitter: 0.1,
22+
Steps: 6, // .3 + .6 + 1.2 + 2.4 + 4.8 = 10ish this lets us smooth out short bumps but not long ones and keeps retry behavior closer.
23+
}
24+
25+
type retryClient struct {
26+
// embed because we only want to override a few states
27+
storage.Interface
28+
}
29+
30+
// New returns an etcd3 implementation of storage.Interface.
31+
func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
32+
return &retryClient{Interface: delegate}
33+
}
34+
35+
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
36+
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
37+
// set to the read value from database.
38+
func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
39+
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
40+
return c.Interface.Create(ctx, key, obj, out, ttl)
41+
})
42+
}
43+
44+
// Delete removes the specified key and returns the value that existed at that spot.
45+
// If key didn't exist, it will return NotFound storage error.
46+
func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
47+
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
48+
return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts)
49+
})
50+
}
51+
52+
// Watch begins watching the specified key. Events are decoded into API objects,
53+
// and any items selected by 'p' are sent down to returned watch.Interface.
54+
// resourceVersion may be used to specify what version to begin watching,
55+
// which should be the current resourceVersion, and no longer rv+1
56+
// (e.g. reconnecting without missing any updates).
57+
// If resource version is "0", this interface will get current object at given key
58+
// and send it in an "ADDED" event, before watch starts.
59+
func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
60+
var ret watch.Interface
61+
err := OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
62+
var innerErr error
63+
ret, innerErr = c.Interface.Watch(ctx, key, opts)
64+
return innerErr
65+
})
66+
return ret, err
67+
}
68+
69+
// Get unmarshals json found at key into objPtr. On a not found error, will either
70+
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
71+
// Treats empty responses and nil response nodes exactly like a not found error.
72+
// The returned contents may be delayed, but it is guaranteed that they will
73+
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
74+
func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
75+
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
76+
return c.Interface.Get(ctx, key, opts, objPtr)
77+
})
78+
}
79+
80+
// GetList unmarshalls objects found at key into a *List api object (an object
81+
// that satisfies runtime.IsList definition).
82+
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
83+
// is true, 'key' is used as a prefix.
84+
// The returned contents may be delayed, but it is guaranteed that they will
85+
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
86+
func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
87+
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
88+
return c.Interface.GetList(ctx, key, opts, listObj)
89+
})
90+
}
91+
92+
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
93+
// retrying the update until success if there is index conflict.
94+
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
95+
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
96+
// the current contents of the object when deciding how the update object should look.
97+
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
98+
// else `destination` will be set to the zero value of it's type.
99+
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
100+
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
101+
// contents.
102+
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
103+
// current version of the object to avoid read operation from storage to get it.
104+
// However, the implementations have to retry in case suggestion is stale.
105+
//
106+
// Example:
107+
//
108+
// s := /* implementation of Interface */
109+
// err := s.GuaranteedUpdate(
110+
//
111+
// "myKey", &MyType{}, true, preconditions,
112+
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
113+
// // Before each invocation of the user defined function, "input" is reset to
114+
// // current contents for "myKey" in database.
115+
// curr := input.(*MyType) // Guaranteed to succeed.
116+
//
117+
// // Make the modification
118+
// curr.Counter++
119+
//
120+
// // Return the modified object - return an error to stop iterating. Return
121+
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
122+
// return cur, nil, nil
123+
// }, cachedExistingObject
124+
//
125+
// )
126+
func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
127+
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
128+
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
129+
return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
130+
})
131+
}
132+
133+
// IsRetriableEtcdError returns true if a retry should be attempted, otherwise false.
134+
// errorLabel is set to a non-empty value that reflects the type of error encountered.
135+
func IsRetriableEtcdError(err error) (errorLabel string, retry bool) {
136+
if err != nil {
137+
if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok {
138+
if etcdError.Code() == codes.Unavailable {
139+
errorLabel = "Unavailable"
140+
retry = true
141+
}
142+
}
143+
}
144+
return
145+
}
146+
147+
// OnError allows the caller to retry fn in case the error returned by fn is retriable
148+
// according to the provided function. backoff defines the maximum retries and the wait
149+
// interval between two retries.
150+
func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (string, bool), fn func() error) error {
151+
var lastErr error
152+
var lastErrLabel string
153+
var retry bool
154+
var retryCounter int
155+
err := backoffWithRequestContext(ctx, backoff, func() (bool, error) {
156+
err := fn()
157+
if retry {
158+
klog.V(1).Infof("etcd retry - counter: %v, lastErrLabel: %s lastError: %v, error: %v", retryCounter, lastErrLabel, lastErr, err)
159+
metrics.UpdateEtcdRequestRetry(lastErrLabel)
160+
}
161+
if err == nil {
162+
return true, nil
163+
}
164+
165+
lastErrLabel, retry = retriable(err)
166+
if retry {
167+
lastErr = err
168+
retryCounter++
169+
return false, nil
170+
}
171+
172+
return false, err
173+
})
174+
if err == wait.ErrWaitTimeout && lastErr != nil {
175+
err = lastErr
176+
}
177+
return err
178+
}
179+
180+
// backoffWithRequestContext works with a request context and a Backoff. It ensures that the retry wait never
181+
// exceeds the deadline specified by the request context.
182+
func backoffWithRequestContext(ctx context.Context, backoff wait.Backoff, condition wait.ConditionFunc) error {
183+
for backoff.Steps > 0 {
184+
if ok, err := condition(); err != nil || ok {
185+
return err
186+
}
187+
188+
if backoff.Steps == 1 {
189+
break
190+
}
191+
192+
waitBeforeRetry := backoff.Step()
193+
select {
194+
case <-ctx.Done():
195+
return ctx.Err()
196+
case <-time.After(waitBeforeRetry):
197+
}
198+
}
199+
200+
return wait.ErrWaitTimeout
201+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package etcd3retry
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"net/url"
8+
"strconv"
9+
"syscall"
10+
"testing"
11+
12+
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
13+
"k8s.io/apiserver/pkg/storage"
14+
)
15+
16+
func TestOnError(t *testing.T) {
17+
tests := []struct {
18+
name string
19+
returnedFnError func(retryCounter int) error
20+
expectedRetries int
21+
expectedFinalError error
22+
}{
23+
{
24+
name: "retry ErrLeaderChanged",
25+
returnedFnError: func(_ int) error { return etcdrpc.ErrLeaderChanged },
26+
expectedRetries: 5,
27+
expectedFinalError: etcdrpc.ErrLeaderChanged,
28+
},
29+
{
30+
name: "retry ErrLeaderChanged a few times",
31+
returnedFnError: func(retryCounter int) error {
32+
if retryCounter == 3 {
33+
return nil
34+
}
35+
return etcdrpc.ErrLeaderChanged
36+
},
37+
expectedRetries: 3,
38+
},
39+
{
40+
name: "no retries",
41+
returnedFnError: func(_ int) error { return nil },
42+
},
43+
{
44+
name: "no retries for a random error",
45+
returnedFnError: func(_ int) error { return fmt.Errorf("random error") },
46+
expectedFinalError: fmt.Errorf("random error"),
47+
},
48+
}
49+
50+
for _, scenario := range tests {
51+
t.Run(scenario.name, func(t *testing.T) {
52+
ctx := context.TODO()
53+
// we set it to -1 to indicate that the first
54+
// execution is not a retry
55+
actualRetries := -1
56+
err := OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
57+
actualRetries++
58+
return scenario.returnedFnError(actualRetries)
59+
})
60+
61+
if actualRetries != scenario.expectedRetries {
62+
t.Errorf("Unexpected number of retries %v, expected %v", actualRetries, scenario.expectedRetries)
63+
}
64+
if (err == nil && scenario.expectedFinalError != nil) || (err != nil && scenario.expectedFinalError == nil) {
65+
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
66+
}
67+
if err != nil && scenario.expectedFinalError != nil && err.Error() != scenario.expectedFinalError.Error() {
68+
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
69+
}
70+
})
71+
}
72+
}
73+
74+
func TestIsRetriableEtcdError(t *testing.T) {
75+
tests := []struct {
76+
name string
77+
etcdErr error
78+
errorLabelExpected string
79+
retryExpected bool
80+
}{
81+
{
82+
name: "error is nil",
83+
errorLabelExpected: "",
84+
retryExpected: false,
85+
},
86+
{
87+
name: "generic storage error",
88+
etcdErr: storage.NewKeyNotFoundError("key", 0),
89+
errorLabelExpected: "",
90+
retryExpected: false,
91+
},
92+
{
93+
name: "connection refused error",
94+
etcdErr: &url.Error{Err: &net.OpError{Err: syscall.ECONNREFUSED}},
95+
errorLabelExpected: "",
96+
retryExpected: false,
97+
},
98+
{
99+
name: "etcd unavailable error",
100+
etcdErr: etcdrpc.ErrLeaderChanged,
101+
errorLabelExpected: "Unavailable",
102+
retryExpected: true,
103+
},
104+
{
105+
name: "should also inspect error message",
106+
etcdErr: fmt.Errorf("etcdserver: leader changed"),
107+
errorLabelExpected: "Unavailable",
108+
retryExpected: true,
109+
},
110+
}
111+
112+
for _, test := range tests {
113+
t.Run(test.name, func(t *testing.T) {
114+
errorCodeGot, retryGot := IsRetriableEtcdError(test.etcdErr)
115+
116+
if test.errorLabelExpected != errorCodeGot {
117+
t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot)
118+
}
119+
120+
if test.retryExpected != retryGot {
121+
t.Errorf("expected retry: %s but got: %s", strconv.FormatBool(test.retryExpected), strconv.FormatBool(retryGot))
122+
}
123+
})
124+
}
125+
}

staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go

+14
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@ var (
153153
},
154154
[]string{"resource"},
155155
)
156+
etcdRequestRetry = compbasemetrics.NewCounterVec(
157+
&compbasemetrics.CounterOpts{
158+
Name: "etcd_request_retry_total",
159+
Help: "Etcd request retry total",
160+
StabilityLevel: compbasemetrics.ALPHA,
161+
},
162+
[]string{"error"},
163+
)
156164
)
157165

158166
var registerMetrics sync.Once
@@ -175,6 +183,7 @@ func Register() {
175183
legacyregistry.MustRegister(listStorageNumSelectorEvals)
176184
legacyregistry.MustRegister(listStorageNumReturned)
177185
legacyregistry.MustRegister(decodeErrorCounts)
186+
legacyregistry.MustRegister(etcdRequestRetry)
178187
})
179188
}
180189

@@ -239,6 +248,11 @@ func UpdateLeaseObjectCount(count int64) {
239248
etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count))
240249
}
241250

251+
// UpdateEtcdRequestRetry sets the etcd_request_retry_total metric.
252+
func UpdateEtcdRequestRetry(errorCode string) {
253+
etcdRequestRetry.WithLabelValues(errorCode).Inc()
254+
}
255+
242256
// RecordListEtcd3Metrics notes various metrics of the cost to serve a LIST request
243257
func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned int) {
244258
listStorageCount.WithLabelValues(resource).Inc()

staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"go.uber.org/zap/zapcore"
4040
"golang.org/x/time/rate"
4141
"google.golang.org/grpc"
42+
"k8s.io/apiserver/pkg/storage/etcd3/etcd3retry"
4243
"k8s.io/klog/v2"
4344

4445
"k8s.io/apimachinery/pkg/runtime"
@@ -464,8 +465,9 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
464465

465466
versioner := storage.APIObjectVersioner{}
466467
decoder := etcd3.NewDefaultDecoder(c.Codec, versioner)
467-
store := etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner)
468+
store := etcd3retry.NewRetryingEtcdStorage(etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner))
468469
return store, destroyFunc, nil
470+
469471
}
470472

471473
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

0 commit comments

Comments
 (0)