Skip to content

Commit 678ff3d

Browse files
author
OpenShift Bot
authored
Merge pull request #12910 from mfojtik/retry-rc-watch
Merged by openshift-bot
2 parents efab088 + d214e60 commit 678ff3d

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

Diff for: pkg/deploy/registry/rest.go

+19
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,20 @@ import (
66
"time"
77

88
kapi "k8s.io/kubernetes/pkg/api"
9+
"k8s.io/kubernetes/pkg/api/unversioned"
910
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
1011
"k8s.io/kubernetes/pkg/fields"
1112
"k8s.io/kubernetes/pkg/watch"
1213

14+
"github.com/golang/glog"
1315
"github.com/openshift/origin/pkg/deploy/api"
1416
deployutil "github.com/openshift/origin/pkg/deploy/util"
1517
)
1618

1719
var (
1820
// ErrUnknownDeploymentPhase is returned for WaitForRunningDeployment if an unknown phase is returned.
1921
ErrUnknownDeploymentPhase = errors.New("unknown deployment phase")
22+
ErrTooOldResourceVersion = errors.New("too old resource version")
2023
)
2124

2225
// WaitForRunningDeployment waits until the specified deployment is no longer New or Pending. Returns true if
@@ -33,6 +36,15 @@ func WaitForRunningDeployment(rn kcoreclient.ReplicationControllersGetter, obser
3336

3437
if _, err := watch.Until(timeout, w, func(e watch.Event) (bool, error) {
3538
if e.Type == watch.Error {
39+
// When we send too old resource version in observed replication controller to
40+
// watcher, restart the watch with latest available controller.
41+
switch t := e.Object.(type) {
42+
case *unversioned.Status:
43+
if t.Reason == unversioned.StatusReasonGone {
44+
glog.V(5).Infof("encountered error while watching for replication controller: %v (retrying)", t)
45+
return false, ErrTooOldResourceVersion
46+
}
47+
}
3648
return false, fmt.Errorf("encountered error while watching for replication controller: %v", e.Object)
3749
}
3850
obj, isController := e.Object.(*kapi.ReplicationController)
@@ -49,6 +61,13 @@ func WaitForRunningDeployment(rn kcoreclient.ReplicationControllersGetter, obser
4961
return false, ErrUnknownDeploymentPhase
5062
}
5163
}); err != nil {
64+
if err == ErrTooOldResourceVersion {
65+
latestRC, err := rn.ReplicationControllers(observed.Namespace).Get(observed.Name)
66+
if err != nil {
67+
return observed, false, err
68+
}
69+
return WaitForRunningDeployment(rn, latestRC, timeout)
70+
}
5271
return observed, false, err
5372
}
5473

Diff for: pkg/deploy/registry/rest_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package registry
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
kapi "k8s.io/kubernetes/pkg/api"
8+
"k8s.io/kubernetes/pkg/api/unversioned"
9+
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
10+
"k8s.io/kubernetes/pkg/client/testing/core"
11+
"k8s.io/kubernetes/pkg/runtime"
12+
"k8s.io/kubernetes/pkg/watch"
13+
14+
deployapi "github.com/openshift/origin/pkg/deploy/api"
15+
)
16+
17+
func TestWaitForRunningDeploymentSuccess(t *testing.T) {
18+
fakeController := &kapi.ReplicationController{}
19+
fakeController.Name = "test-1"
20+
fakeController.Namespace = "test"
21+
22+
kubeclient := fake.NewSimpleClientset([]runtime.Object{fakeController}...)
23+
fakeWatch := watch.NewFake()
24+
kubeclient.PrependWatchReactor("replicationcontrollers", core.DefaultWatchReactor(fakeWatch, nil))
25+
stopChan := make(chan struct{})
26+
27+
go func() {
28+
defer close(stopChan)
29+
rc, ok, err := WaitForRunningDeployment(kubeclient.Core(), fakeController, 10*time.Second)
30+
if err != nil {
31+
t.Fatalf("unexpected error: %v", err)
32+
}
33+
if !ok {
34+
t.Errorf("expected to return success")
35+
}
36+
if rc == nil {
37+
t.Errorf("expected returned replication controller to not be nil")
38+
}
39+
}()
40+
41+
fakeController.Annotations = map[string]string{deployapi.DeploymentStatusAnnotation: string(deployapi.DeploymentStatusRunning)}
42+
fakeWatch.Modify(fakeController)
43+
<-stopChan
44+
}
45+
46+
func TestWaitForRunningDeploymentRestartWatch(t *testing.T) {
47+
fakeController := &kapi.ReplicationController{}
48+
fakeController.Name = "test-1"
49+
fakeController.Namespace = "test"
50+
51+
kubeclient := fake.NewSimpleClientset([]runtime.Object{fakeController}...)
52+
fakeWatch := watch.NewFake()
53+
54+
watchCalledChan := make(chan struct{})
55+
kubeclient.PrependWatchReactor("replicationcontrollers", func(action core.Action) (bool, watch.Interface, error) {
56+
fakeWatch.Reset()
57+
watchCalledChan <- struct{}{}
58+
return core.DefaultWatchReactor(fakeWatch, nil)(action)
59+
})
60+
61+
getReceivedChan := make(chan struct{})
62+
kubeclient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (bool, runtime.Object, error) {
63+
close(getReceivedChan)
64+
return true, fakeController, nil
65+
})
66+
67+
stopChan := make(chan struct{})
68+
go func() {
69+
defer close(stopChan)
70+
rc, ok, err := WaitForRunningDeployment(kubeclient.Core(), fakeController, 10*time.Second)
71+
if err != nil {
72+
t.Fatalf("unexpected error: %v", err)
73+
}
74+
if !ok {
75+
t.Errorf("expected to return success")
76+
}
77+
if rc == nil {
78+
t.Errorf("expected returned replication controller to not be nil")
79+
}
80+
}()
81+
82+
select {
83+
case <-watchCalledChan:
84+
case <-time.After(time.Second * 5):
85+
t.Fatalf("timeout waiting for the watch to start")
86+
}
87+
88+
// Send the StatusReasonGone error to watcher which should trigger the watch restart.
89+
goneError := &unversioned.Status{Reason: unversioned.StatusReasonGone}
90+
fakeWatch.Error(goneError)
91+
92+
// Make sure we observed the "get" action on replication controller, so the watch gets
93+
// the latest resourceVersion.
94+
select {
95+
case <-getReceivedChan:
96+
case <-time.After(time.Second * 5):
97+
t.Fatalf("timeout waiting for get on replication controllers")
98+
}
99+
100+
// Wait for the watcher to restart and then transition the replication controller to
101+
// running state.
102+
select {
103+
case <-watchCalledChan:
104+
fakeController.Annotations = map[string]string{deployapi.DeploymentStatusAnnotation: string(deployapi.DeploymentStatusRunning)}
105+
fakeWatch.Modify(fakeController)
106+
<-stopChan
107+
case <-time.After(time.Second * 5):
108+
t.Fatalf("timeout waiting for the watch restart")
109+
}
110+
}

0 commit comments

Comments
 (0)