Skip to content

Commit e646c98

Browse files
p0lyn0mialbertinatto
authored andcommitted
UPSTREAM: <carry>: add etcd3RetryingProberMonitor for retrying etcd Unavailable errors for the etcd health checker client
UPSTREAM: <carry>: replace newETCD3ProberMonitor with etcd3RetryingProberMonitor
1 parent 712b94e commit e646c98

File tree

4 files changed

+197
-4
lines changed

4 files changed

+197
-4
lines changed

staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,13 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
156156
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
157157

158158
lock := sync.RWMutex{}
159-
var prober *etcd3ProberMonitor
159+
var prober *etcd3RetryingProberMonitor
160160
clientErr := fmt.Errorf("etcd client connection not yet established")
161161

162162
go wait.PollImmediateUntil(time.Second, func() (bool, error) {
163163
lock.Lock()
164164
defer lock.Unlock()
165-
newProber, err := newETCD3ProberMonitor(c)
165+
newProber, err := newRetryingETCD3ProberMonitor(c)
166166
// Ensure that server is already not shutting down.
167167
select {
168168
case <-stopCh:

staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func CreateProber(c storagebackend.Config) (Prober, error) {
6969
case storagebackend.StorageTypeETCD2:
7070
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
7171
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
72-
return newETCD3ProberMonitor(c)
72+
return newRetryingETCD3ProberMonitor(c)
7373
default:
7474
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
7575
}
@@ -80,7 +80,7 @@ func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) {
8080
case storagebackend.StorageTypeETCD2:
8181
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
8282
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
83-
return newETCD3ProberMonitor(c)
83+
return newRetryingETCD3ProberMonitor(c)
8484
default:
8585
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
8686
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package factory
2+
3+
import (
4+
"context"
5+
6+
"k8s.io/apiserver/pkg/storage/etcd3/etcd3retry"
7+
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
8+
"k8s.io/apiserver/pkg/storage/storagebackend"
9+
)
10+
11+
type proberMonitor interface {
12+
Prober
13+
metrics.Monitor
14+
}
15+
16+
type etcd3RetryingProberMonitor struct {
17+
delegate proberMonitor
18+
}
19+
20+
func newRetryingETCD3ProberMonitor(c storagebackend.Config) (*etcd3RetryingProberMonitor, error) {
21+
delegate, err := newETCD3ProberMonitor(c)
22+
if err != nil {
23+
return nil, err
24+
}
25+
return &etcd3RetryingProberMonitor{delegate: delegate}, nil
26+
}
27+
28+
func (t *etcd3RetryingProberMonitor) Probe(ctx context.Context) error {
29+
return etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error {
30+
return t.delegate.Probe(ctx)
31+
})
32+
}
33+
34+
func (t *etcd3RetryingProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
35+
var ret metrics.StorageMetrics
36+
err := etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error {
37+
var innerErr error
38+
ret, innerErr = t.delegate.Monitor(ctx)
39+
return innerErr
40+
})
41+
return ret, err
42+
}
43+
44+
func (t *etcd3RetryingProberMonitor) Close() error {
45+
return t.delegate.Close()
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package factory
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
9+
10+
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
11+
)
12+
13+
func getRetryScenarios() []struct {
14+
name string
15+
retryFnError func() error
16+
expectedRetries int
17+
expectedFinalError error
18+
} {
19+
return []struct {
20+
name string
21+
retryFnError func() error
22+
expectedRetries int
23+
expectedFinalError error
24+
}{
25+
{
26+
name: "retry ErrLeaderChanged",
27+
retryFnError: func() error {
28+
return etcdrpc.ErrLeaderChanged
29+
},
30+
expectedRetries: 5,
31+
expectedFinalError: etcdrpc.ErrLeaderChanged,
32+
},
33+
{
34+
name: "retry ErrLeaderChanged a few times",
35+
retryFnError: func() func() error {
36+
retryCounter := -1
37+
return func() error {
38+
retryCounter++
39+
if retryCounter == 3 {
40+
return nil
41+
}
42+
return etcdrpc.ErrLeaderChanged
43+
}
44+
}(),
45+
expectedRetries: 3,
46+
},
47+
{
48+
name: "no retries",
49+
retryFnError: func() error {
50+
return nil
51+
},
52+
},
53+
{
54+
name: "no retries for a random error",
55+
retryFnError: func() error {
56+
return fmt.Errorf("random error")
57+
},
58+
expectedFinalError: fmt.Errorf("random error"),
59+
},
60+
}
61+
}
62+
63+
func TestEtcd3RetryingProber(t *testing.T) {
64+
for _, scenario := range getRetryScenarios() {
65+
t.Run(scenario.name, func(t *testing.T) {
66+
ctx := context.TODO()
67+
targetDelegate := &fakeEtcd3RetryingProberMonitor{
68+
// we set it to -1 to indicate that the first
69+
// execution is not a retry
70+
actualRetries: -1,
71+
probeFn: scenario.retryFnError,
72+
}
73+
74+
target := &etcd3RetryingProberMonitor{delegate: targetDelegate}
75+
err := target.Probe(ctx)
76+
77+
if targetDelegate.actualRetries != scenario.expectedRetries {
78+
t.Errorf("Unexpected number of retries %v, expected %v", targetDelegate.actualRetries, scenario.expectedRetries)
79+
}
80+
if (err == nil && scenario.expectedFinalError != nil) || (err != nil && scenario.expectedFinalError == nil) {
81+
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
82+
}
83+
if err != nil && scenario.expectedFinalError != nil && err.Error() != scenario.expectedFinalError.Error() {
84+
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
85+
}
86+
})
87+
}
88+
}
89+
90+
func TestEtcd3RetryingMonitor(t *testing.T) {
91+
for _, scenario := range getRetryScenarios() {
92+
t.Run(scenario.name, func(t *testing.T) {
93+
ctx := context.TODO()
94+
expectedRetValue := int64(scenario.expectedRetries)
95+
targetDelegate := &fakeEtcd3RetryingProberMonitor{
96+
// we set it to -1 to indicate that the first
97+
// execution is not a retry
98+
actualRetries: -1,
99+
monitorFn: func() func() (metrics.StorageMetrics, error) {
100+
retryCounter := -1
101+
return func() (metrics.StorageMetrics, error) {
102+
retryCounter++
103+
err := scenario.retryFnError()
104+
ret := metrics.StorageMetrics{int64(retryCounter)}
105+
return ret, err
106+
}
107+
}(),
108+
}
109+
110+
target := &etcd3RetryingProberMonitor{delegate: targetDelegate}
111+
actualRetValue, err := target.Monitor(ctx)
112+
113+
if targetDelegate.actualRetries != scenario.expectedRetries {
114+
t.Errorf("Unexpected number of retries %v, expected %v", targetDelegate.actualRetries, scenario.expectedRetries)
115+
}
116+
if (err == nil && scenario.expectedFinalError != nil) || (err != nil && scenario.expectedFinalError == nil) {
117+
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
118+
}
119+
if err != nil && scenario.expectedFinalError != nil && err.Error() != scenario.expectedFinalError.Error() {
120+
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
121+
}
122+
if actualRetValue.Size != expectedRetValue {
123+
t.Errorf("Unexpected value returned actual %v, expected %v", actualRetValue.Size, expectedRetValue)
124+
}
125+
})
126+
}
127+
}
128+
129+
type fakeEtcd3RetryingProberMonitor struct {
130+
actualRetries int
131+
probeFn func() error
132+
monitorFn func() (metrics.StorageMetrics, error)
133+
}
134+
135+
func (f *fakeEtcd3RetryingProberMonitor) Probe(_ context.Context) error {
136+
f.actualRetries++
137+
return f.probeFn()
138+
}
139+
140+
func (f *fakeEtcd3RetryingProberMonitor) Monitor(_ context.Context) (metrics.StorageMetrics, error) {
141+
f.actualRetries++
142+
return f.monitorFn()
143+
}
144+
145+
func (f *fakeEtcd3RetryingProberMonitor) Close() error {
146+
panic("not implemented")
147+
}

0 commit comments

Comments
 (0)