@@ -2,6 +2,8 @@ package etcd3retry
2
2
3
3
import (
4
4
"context"
5
+ "k8s.io/apiserver/pkg/audit"
6
+ "strings"
5
7
"time"
6
8
7
9
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
@@ -32,11 +34,18 @@ func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
32
34
return & retryClient {Interface : delegate }
33
35
}
34
36
37
+ func addEtcdAccessAuditAnnotation (ctx context.Context ) {
38
+ // add an audit annotation indicating we reached out to etcd. This allows our post-processing to exclude requests
39
+ // that don't attempt to access etcd from, "how reliably is etcd" calculations.
40
+ audit .AddAuditAnnotation (ctx , "apiserver.internal.openshift.io/etcd-access" , time .Now ().Format (time .RFC3339 ))
41
+ }
42
+
35
43
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
36
44
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
37
45
// set to the read value from database.
38
46
func (c * retryClient ) Create (ctx context.Context , key string , obj , out runtime.Object , ttl uint64 ) error {
39
47
return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
48
+ addEtcdAccessAuditAnnotation (ctx )
40
49
return c .Interface .Create (ctx , key , obj , out , ttl )
41
50
})
42
51
}
@@ -45,6 +54,7 @@ func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.O
45
54
// If key didn't exist, it will return NotFound storage error.
46
55
func (c * retryClient ) Delete (ctx context.Context , key string , out runtime.Object , preconditions * storage.Preconditions , validateDeletion storage.ValidateObjectFunc , cachedExistingObject runtime.Object ) error {
47
56
return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
57
+ addEtcdAccessAuditAnnotation (ctx )
48
58
return c .Interface .Delete (ctx , key , out , preconditions , validateDeletion , cachedExistingObject )
49
59
})
50
60
}
@@ -59,6 +69,7 @@ func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object
59
69
func (c * retryClient ) Watch (ctx context.Context , key string , opts storage.ListOptions ) (watch.Interface , error ) {
60
70
var ret watch.Interface
61
71
err := OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
72
+ addEtcdAccessAuditAnnotation (ctx )
62
73
var innerErr error
63
74
ret , innerErr = c .Interface .Watch (ctx , key , opts )
64
75
return innerErr
@@ -73,6 +84,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp
73
84
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
74
85
func (c * retryClient ) Get (ctx context.Context , key string , opts storage.GetOptions , objPtr runtime.Object ) error {
75
86
return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
87
+ addEtcdAccessAuditAnnotation (ctx )
76
88
return c .Interface .Get (ctx , key , opts , objPtr )
77
89
})
78
90
}
@@ -85,6 +97,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio
85
97
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
86
98
func (c * retryClient ) GetList (ctx context.Context , key string , opts storage.ListOptions , listObj runtime.Object ) error {
87
99
return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
100
+ addEtcdAccessAuditAnnotation (ctx )
88
101
return c .Interface .GetList (ctx , key , opts , listObj )
89
102
})
90
103
}
@@ -126,6 +139,7 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List
126
139
func (c * retryClient ) GuaranteedUpdate (ctx context.Context , key string , destination runtime.Object , ignoreNotFound bool ,
127
140
preconditions * storage.Preconditions , tryUpdate storage.UpdateFunc , cachedExistingObject runtime.Object ) error {
128
141
return OnError (ctx , DefaultRetry , IsRetriableEtcdError , func () error {
142
+ addEtcdAccessAuditAnnotation (ctx )
129
143
return c .Interface .GuaranteedUpdate (ctx , key , destination , ignoreNotFound , preconditions , tryUpdate , cachedExistingObject )
130
144
})
131
145
}
@@ -153,6 +167,8 @@ func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s
153
167
var retry bool
154
168
var retryCounter int
155
169
err := backoffWithRequestContext (ctx , backoff , func () (bool , error ) {
170
+ startTime := time .Now ()
171
+
156
172
err := fn ()
157
173
if retry {
158
174
klog .V (1 ).Infof ("etcd retry - counter: %v, lastErrLabel: %s lastError: %v, error: %v" , retryCounter , lastErrLabel , lastErr , err )
@@ -162,6 +178,12 @@ func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s
162
178
return true , nil
163
179
}
164
180
181
+ // add an audit annotation if we hit a no leader condition so we can track this failure in post-processing CI steps.
182
+ // We only mark the first time through. Hopefully there's enough traffic that it doesn't matter
183
+ if strings .Contains (err .Error (), "no leader" ) {
184
+ audit .AddAuditAnnotation (ctx , "apiserver.internal.openshift.io/no-leader" , startTime .Format (time .RFC3339 ))
185
+ }
186
+
165
187
lastErrLabel , retry = retriable (err )
166
188
if retry {
167
189
lastErr = err
0 commit comments