Skip to content

Commit 0c68a45

Browse files
committed
UPSTREAM: <carry>: Don't retry storage calls with side effects.
The existing patch retried any etcd error returned from storage with the code "Unavailable". Writes can only be safely retried if the client can be absolutely sure that the initial attempt ended before persisting any changes. The "Unavailable" code includes errors like "timed out" that can't be safely retried for writes.
1 parent ff3bcb8 commit 0c68a45

File tree

3 files changed

+141
-27
lines changed

3 files changed

+141
-27
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go

+65-17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package etcd3retry
22

33
import (
44
"context"
5+
"fmt"
6+
"regexp"
7+
"strings"
58
"time"
69

710
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
@@ -36,15 +39,15 @@ func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
3639
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
3740
// set to the read value from database.
3841
func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
39-
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
42+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
4043
return c.Interface.Create(ctx, key, obj, out, ttl)
4144
})
4245
}
4346

4447
// Delete removes the specified key and returns the value that existed at that spot.
4548
// If key didn't exist, it will return NotFound storage error.
4649
func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
47-
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
50+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
4851
return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject)
4952
})
5053
}
@@ -58,7 +61,7 @@ func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object
5861
// and send it in an "ADDED" event, before watch starts.
5962
func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
6063
var ret watch.Interface
61-
err := OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
64+
err := OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
6265
var innerErr error
6366
ret, innerErr = c.Interface.Watch(ctx, key, opts)
6467
return innerErr
@@ -72,7 +75,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp
7275
// The returned contents may be delayed, but it is guaranteed that they will
7376
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
7477
func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
75-
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
78+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
7679
return c.Interface.Get(ctx, key, opts, objPtr)
7780
})
7881
}
@@ -84,7 +87,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio
8487
// The returned contents may be delayed, but it is guaranteed that they will
8588
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
8689
func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
87-
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
90+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
8891
return c.Interface.GetList(ctx, key, opts, listObj)
8992
})
9093
}
@@ -125,23 +128,65 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List
125128
// )
126129
func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
127130
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
128-
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
131+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
129132
return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
130133
})
131134
}
132135

133-
// IsRetriableEtcdError returns true if a retry should be attempted, otherwise false.
134-
// errorLabel is set to a non-empty value that reflects the type of error encountered.
135-
func IsRetriableEtcdError(err error) (errorLabel string, retry bool) {
136-
if err != nil {
137-
if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok {
138-
if etcdError.Code() == codes.Unavailable {
139-
errorLabel = "Unavailable"
140-
retry = true
141-
}
142-
}
136+
// These errors are coming back from the k8s.io/apiserver storage.Interface, not directly from an
137+
// etcd client. Classifying them can be fragile since the storage methods may not return etcd client
138+
// errors directly.
139+
var errorLabelsBySuffix = map[string]string{
140+
"etcdserver: leader changed": "LeaderChanged",
141+
"etcdserver: no leader": "NoLeader",
142+
"raft proposal dropped": "ProposalDropped",
143+
144+
"etcdserver: request timed out": "Timeout",
145+
"etcdserver: request timed out, possibly due to previous leader failure": "Timeout",
146+
"etcdserver: request timed out, possible due to connection lost": "Timeout",
147+
"etcdserver: request timed out, waiting for the applied index took too long": "Timeout",
148+
"etcdserver: server stopped": "Stopped",
149+
}
150+
151+
var retriableWriteErrorSuffixes = func() *regexp.Regexp {
152+
// This list should include only errors the caller is certain have no side effects.
153+
suffixes := []string{
154+
"etcdserver: leader changed",
155+
"etcdserver: no leader",
156+
"raft proposal dropped",
157+
}
158+
return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`)))
159+
}()
160+
161+
// IsRetriableErrorOnWrite returns true if and only if a retry should be attempted when the provided
162+
// error is returned from a write attempt. If the error is retriable, a non-empty string classifying
163+
// the error is also returned.
164+
func IsRetriableErrorOnWrite(err error) (string, bool) {
165+
if suffix := retriableWriteErrorSuffixes.FindString(err.Error()); suffix != "" {
166+
return errorLabelsBySuffix[suffix], true
167+
}
168+
return "", false
169+
}
170+
171+
var retriableReadErrorSuffixes = func() *regexp.Regexp {
172+
var suffixes []string
173+
for suffix := range errorLabelsBySuffix {
174+
suffixes = append(suffixes, suffix)
175+
}
176+
return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`)))
177+
}()
178+
179+
// IsRetriableErrorOnRead returns true if and only if a retry should be attempted when the provided
180+
// error is returned from a read attempt. If the error is retriable, a non-empty string classifying
181+
// the error is also returned.
182+
func IsRetriableErrorOnRead(err error) (string, bool) {
183+
if suffix := retriableReadErrorSuffixes.FindString(err.Error()); suffix != "" {
184+
return errorLabelsBySuffix[suffix], true
143185
}
144-
return
186+
if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok && etcdError.Code() == codes.Unavailable {
187+
return "Unavailable", true
188+
}
189+
return "", false
145190
}
146191

147192
// OnError allows the caller to retry fn in case the error returned by fn is retriable
@@ -163,6 +208,9 @@ func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s
163208
}
164209

165210
lastErrLabel, retry = retriable(err)
211+
if klog.V(6).Enabled() {
212+
klog.V(6).InfoS("observed storage error", "err", err, "retriable", retry)
213+
}
166214
if retry {
167215
lastErr = err
168216
retryCounter++

staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go

+74-8
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@ import (
1616
func TestOnError(t *testing.T) {
1717
tests := []struct {
1818
name string
19+
retriableFn func(error) (string, bool)
1920
returnedFnError func(retryCounter int) error
2021
expectedRetries int
2122
expectedFinalError error
2223
}{
2324
{
2425
name: "retry ErrLeaderChanged",
26+
retriableFn: IsRetriableErrorOnRead,
2527
returnedFnError: func(_ int) error { return etcdrpc.ErrLeaderChanged },
2628
expectedRetries: 5,
2729
expectedFinalError: etcdrpc.ErrLeaderChanged,
2830
},
2931
{
30-
name: "retry ErrLeaderChanged a few times",
32+
name: "retry ErrLeaderChanged a few times",
33+
retriableFn: IsRetriableErrorOnRead,
3134
returnedFnError: func(retryCounter int) error {
3235
if retryCounter == 3 {
3336
return nil
@@ -38,10 +41,12 @@ func TestOnError(t *testing.T) {
3841
},
3942
{
4043
name: "no retries",
44+
retriableFn: IsRetriableErrorOnRead,
4145
returnedFnError: func(_ int) error { return nil },
4246
},
4347
{
4448
name: "no retries for a random error",
49+
retriableFn: IsRetriableErrorOnRead,
4550
returnedFnError: func(_ int) error { return fmt.Errorf("random error") },
4651
expectedFinalError: fmt.Errorf("random error"),
4752
},
@@ -53,7 +58,7 @@ func TestOnError(t *testing.T) {
5358
// we set it to -1 to indicate that the first
5459
// execution is not a retry
5560
actualRetries := -1
56-
err := OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
61+
err := OnError(ctx, DefaultRetry, scenario.retriableFn, func() error {
5762
actualRetries++
5863
return scenario.returnedFnError(actualRetries)
5964
})
@@ -71,18 +76,67 @@ func TestOnError(t *testing.T) {
7176
}
7277
}
7378

74-
func TestIsRetriableEtcdError(t *testing.T) {
79+
func TestIsRetriableErrorOnRead(t *testing.T) {
7580
tests := []struct {
7681
name string
7782
etcdErr error
7883
errorLabelExpected string
7984
retryExpected bool
8085
}{
8186
{
82-
name: "error is nil",
87+
name: "generic storage error",
88+
etcdErr: storage.NewKeyNotFoundError("key", 0),
89+
errorLabelExpected: "",
90+
retryExpected: false,
91+
},
92+
{
93+
name: "connection refused error",
94+
etcdErr: &url.Error{Err: &net.OpError{Err: syscall.ECONNREFUSED}},
8395
errorLabelExpected: "",
8496
retryExpected: false,
8597
},
98+
{
99+
name: "etcd unavailable error",
100+
etcdErr: etcdrpc.ErrLeaderChanged,
101+
errorLabelExpected: "LeaderChanged",
102+
retryExpected: true,
103+
},
104+
{
105+
name: "should also inspect error message",
106+
etcdErr: fmt.Errorf("etcdserver: no leader"),
107+
errorLabelExpected: "NoLeader",
108+
retryExpected: true,
109+
},
110+
{
111+
name: "unavailable code with unrecognized suffix",
112+
etcdErr: etcdrpc.ErrGRPCUnhealthy,
113+
errorLabelExpected: "Unavailable",
114+
retryExpected: true,
115+
},
116+
}
117+
118+
for _, test := range tests {
119+
t.Run(test.name, func(t *testing.T) {
120+
errorCodeGot, retryGot := IsRetriableErrorOnRead(test.etcdErr)
121+
122+
if test.errorLabelExpected != errorCodeGot {
123+
t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot)
124+
}
125+
126+
if test.retryExpected != retryGot {
127+
t.Errorf("expected retry: %s but got: %s", strconv.FormatBool(test.retryExpected), strconv.FormatBool(retryGot))
128+
}
129+
})
130+
}
131+
}
132+
133+
func TestIsRetriableErrorOnWrite(t *testing.T) {
134+
tests := []struct {
135+
name string
136+
etcdErr error
137+
errorLabelExpected string
138+
retryExpected bool
139+
}{
86140
{
87141
name: "generic storage error",
88142
etcdErr: storage.NewKeyNotFoundError("key", 0),
@@ -98,20 +152,32 @@ func TestIsRetriableEtcdError(t *testing.T) {
98152
{
99153
name: "etcd unavailable error",
100154
etcdErr: etcdrpc.ErrLeaderChanged,
101-
errorLabelExpected: "Unavailable",
155+
errorLabelExpected: "LeaderChanged",
102156
retryExpected: true,
103157
},
104158
{
105159
name: "should also inspect error message",
106-
etcdErr: fmt.Errorf("etcdserver: leader changed"),
107-
errorLabelExpected: "Unavailable",
160+
etcdErr: fmt.Errorf("etcdserver: no leader"),
161+
errorLabelExpected: "NoLeader",
108162
retryExpected: true,
109163
},
164+
{
165+
name: "unavailable code with unrecognized suffix",
166+
etcdErr: etcdrpc.ErrGRPCUnhealthy,
167+
errorLabelExpected: "",
168+
retryExpected: false,
169+
},
170+
{
171+
name: "timeout not retried for writes",
172+
etcdErr: etcdrpc.ErrGRPCTimeout,
173+
errorLabelExpected: "",
174+
retryExpected: false,
175+
},
110176
}
111177

112178
for _, test := range tests {
113179
t.Run(test.name, func(t *testing.T) {
114-
errorCodeGot, retryGot := IsRetriableEtcdError(test.etcdErr)
180+
errorCodeGot, retryGot := IsRetriableErrorOnWrite(test.etcdErr)
115181

116182
if test.errorLabelExpected != errorCodeGot {
117183
t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot)

staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/retry_etcdprobemonitor.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ func newRetryingETCD3ProberMonitor(c storagebackend.Config) (*etcd3RetryingProbe
2626
}
2727

2828
func (t *etcd3RetryingProberMonitor) Probe(ctx context.Context) error {
29-
return etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error {
29+
return etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableErrorOnRead, func() error {
3030
return t.delegate.Probe(ctx)
3131
})
3232
}
3333

3434
func (t *etcd3RetryingProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
3535
var ret metrics.StorageMetrics
36-
err := etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error {
36+
err := etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableErrorOnRead, func() error {
3737
var innerErr error
3838
ret, innerErr = t.delegate.Monitor(ctx)
3939
return innerErr

0 commit comments

Comments
 (0)