@@ -2,6 +2,9 @@ package etcd3retry
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
6
+ "regexp"
7
+ "strings"
5
8
"time"
6
9
7
10
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
@@ -36,15 +39,15 @@ func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
36
39
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
37
40
// set to the read value from database.
38
41
func (c * retryClient ) Create (ctx context.Context , key string , obj , out runtime.Object , ttl uint64 ) error {
39
- return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
42
+ return OnError (ctx , DefaultRetry , IsRetriableErrorOnWrite , func () error {
40
43
return c .Interface .Create (ctx , key , obj , out , ttl )
41
44
})
42
45
}
43
46
44
47
// Delete removes the specified key and returns the value that existed at that spot.
45
48
// If key didn't exist, it will return NotFound storage error.
46
49
func (c * retryClient ) Delete (ctx context.Context , key string , out runtime.Object , preconditions * storage.Preconditions , validateDeletion storage.ValidateObjectFunc , cachedExistingObject runtime.Object ) error {
47
- return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
50
+ return OnError (ctx , DefaultRetry , IsRetriableErrorOnWrite , func () error {
48
51
return c .Interface .Delete (ctx , key , out , preconditions , validateDeletion , cachedExistingObject )
49
52
})
50
53
}
@@ -58,7 +61,7 @@ func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object
58
61
// and send it in an "ADDED" event, before watch starts.
59
62
func (c * retryClient ) Watch (ctx context.Context , key string , opts storage.ListOptions ) (watch.Interface , error ) {
60
63
var ret watch.Interface
61
- err := OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
64
+ err := OnError (ctx , DefaultRetry , IsRetriableErrorOnRead , func () error {
62
65
var innerErr error
63
66
ret , innerErr = c .Interface .Watch (ctx , key , opts )
64
67
return innerErr
@@ -72,7 +75,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp
72
75
// The returned contents may be delayed, but it is guaranteed that they will
73
76
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
74
77
func (c * retryClient ) Get (ctx context.Context , key string , opts storage.GetOptions , objPtr runtime.Object ) error {
75
- return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
78
+ return OnError (ctx , DefaultRetry , IsRetriableErrorOnRead , func () error {
76
79
return c .Interface .Get (ctx , key , opts , objPtr )
77
80
})
78
81
}
@@ -84,7 +87,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio
84
87
// The returned contents may be delayed, but it is guaranteed that they will
85
88
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
86
89
func (c * retryClient ) GetList (ctx context.Context , key string , opts storage.ListOptions , listObj runtime.Object ) error {
87
- return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
90
+ return OnError (ctx , DefaultRetry , IsRetriableErrorOnRead , func () error {
88
91
return c .Interface .GetList (ctx , key , opts , listObj )
89
92
})
90
93
}
@@ -125,23 +128,65 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List
125
128
// )
126
129
func (c * retryClient ) GuaranteedUpdate (ctx context.Context , key string , destination runtime.Object , ignoreNotFound bool ,
127
130
preconditions * storage.Preconditions , tryUpdate storage.UpdateFunc , cachedExistingObject runtime.Object ) error {
128
- return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
131
+ return OnError (ctx , DefaultRetry , IsRetriableErrorOnWrite , func () error {
129
132
return c .Interface .GuaranteedUpdate (ctx , key , destination , ignoreNotFound , preconditions , tryUpdate , cachedExistingObject )
130
133
})
131
134
}
132
135
133
- // IsRetriableEtcdError returns true if a retry should be attempted, otherwise false.
134
- // errorLabel is set to a non-empty value that reflects the type of error encountered.
135
- func IsRetriableEtcdError (err error ) (errorLabel string , retry bool ) {
136
- if err != nil {
137
- if etcdError , ok := etcdrpc .Error (err ).(etcdrpc.EtcdError ); ok {
138
- if etcdError .Code () == codes .Unavailable {
139
- errorLabel = "Unavailable"
140
- retry = true
141
- }
142
- }
136
+ // These errors are coming back from the k8s.io/apiserver storage.Interface, not directly from an
137
+ // etcd client. Classifying them can be fragile since the storage methods may not return etcd client
138
+ // errors directly.
139
+ var errorLabelsBySuffix = map [string ]string {
140
+ "etcdserver: leader changed" : "LeaderChanged" ,
141
+ "etcdserver: no leader" : "NoLeader" ,
142
+ "raft proposal dropped" : "ProposalDropped" ,
143
+
144
+ "etcdserver: request timed out" : "Timeout" ,
145
+ "etcdserver: request timed out, possibly due to previous leader failure" : "Timeout" ,
146
+ "etcdserver: request timed out, possible due to connection lost" : "Timeout" ,
147
+ "etcdserver: request timed out, waiting for the applied index took too long" : "Timeout" ,
148
+ "etcdserver: server stopped" : "Stopped" ,
149
+ }
150
+
151
+ var retriableWriteErrorSuffixes = func () * regexp.Regexp {
152
+ // This list should include only errors the caller is certain have no side effects.
153
+ suffixes := []string {
154
+ "etcdserver: leader changed" ,
155
+ "etcdserver: no leader" ,
156
+ "raft proposal dropped" ,
157
+ }
158
+ return regexp .MustCompile (fmt .Sprintf (`(%s)$` , strings .Join (suffixes , `|` )))
159
+ }()
160
+
161
+ // IsRetriableErrorOnWrite returns true if and only if a retry should be attempted when the provided
162
+ // error is returned from a write attempt. If the error is retriable, a non-empty string classifying
163
+ // the error is also returned.
164
+ func IsRetriableErrorOnWrite (err error ) (string , bool ) {
165
+ if suffix := retriableWriteErrorSuffixes .FindString (err .Error ()); suffix != "" {
166
+ return errorLabelsBySuffix [suffix ], true
167
+ }
168
+ return "" , false
169
+ }
170
+
171
+ var retriableReadErrorSuffixes = func () * regexp.Regexp {
172
+ var suffixes []string
173
+ for suffix := range errorLabelsBySuffix {
174
+ suffixes = append (suffixes , suffix )
175
+ }
176
+ return regexp .MustCompile (fmt .Sprintf (`(%s)$` , strings .Join (suffixes , `|` )))
177
+ }()
178
+
179
+ // IsRetriableErrorOnRead returns true if and only if a retry should be attempted when the provided
180
+ // error is returned from a read attempt. If the error is retriable, a non-empty string classifying
181
+ // the error is also returned.
182
+ func IsRetriableErrorOnRead (err error ) (string , bool ) {
183
+ if suffix := retriableReadErrorSuffixes .FindString (err .Error ()); suffix != "" {
184
+ return errorLabelsBySuffix [suffix ], true
143
185
}
144
- return
186
+ if etcdError , ok := etcdrpc .Error (err ).(etcdrpc.EtcdError ); ok && etcdError .Code () == codes .Unavailable {
187
+ return "Unavailable" , true
188
+ }
189
+ return "" , false
145
190
}
146
191
147
192
// OnError allows the caller to retry fn in case the error returned by fn is retriable
@@ -163,6 +208,9 @@ func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s
163
208
}
164
209
165
210
lastErrLabel , retry = retriable (err )
211
+ if klog .V (6 ).Enabled () {
212
+ klog .V (6 ).InfoS ("observed storage error" , "err" , err , "retriable" , retry )
213
+ }
166
214
if retry {
167
215
lastErr = err
168
216
retryCounter ++
0 commit comments