Skip to content

Commit 2a434d1

Browse files
committed
Wait for old pods to terminate before proceeding to Recreate
1 parent 6eefa2a commit 2a434d1

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

pkg/deploy/strategy/recreate/recreate.go

+46
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
kclient "k8s.io/kubernetes/pkg/client/unversioned"
1414
adapter "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset"
1515
"k8s.io/kubernetes/pkg/kubectl"
16+
"k8s.io/kubernetes/pkg/labels"
1617
"k8s.io/kubernetes/pkg/runtime"
18+
"k8s.io/kubernetes/pkg/util/wait"
19+
"k8s.io/kubernetes/pkg/watch"
1720

1821
"github.com/openshift/origin/pkg/client"
1922
deployapi "github.com/openshift/origin/pkg/deploy/api"
@@ -36,6 +39,8 @@ type RecreateDeploymentStrategy struct {
3639
until string
3740
// rcClient is a client to access replication controllers
3841
rcClient kcoreclient.ReplicationControllersGetter
42+
// podClient is used to list and watch pods.
43+
podClient kcoreclient.PodsGetter
3944
// eventClient is a client to access events
4045
eventClient kcoreclient.EventsGetter
4146
// getUpdateAcceptor returns an UpdateAcceptor to verify the first replica
@@ -80,13 +85,15 @@ func NewRecreateDeploymentStrategy(oldClient kclient.Interface, tagClient client
8085
events: events,
8186
until: until,
8287
rcClient: client.Core(),
88+
podClient: client.Core(),
8389
eventClient: client.Core(),
8490
getUpdateAcceptor: func(timeout time.Duration, minReadySeconds int32) strat.UpdateAcceptor {
8591
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout, acceptorInterval, minReadySeconds)
8692
},
8793
scaler: scaler,
8894
decoder: decoder,
8995
hookExecutor: stratsupport.NewHookExecutor(client.Core(), tagClient, client.Core(), os.Stdout, decoder),
96+
// TODO: Should be config.Spec.Strategy.RecreateParams.TimeoutSeconds - (time.Now - deployerPodStartTime)
9097
retryTimeout: 120 * time.Second,
9198
retryPeriod: 1 * time.Second,
9299
}
@@ -140,6 +147,8 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
140147
if err != nil {
141148
return fmt.Errorf("couldn't scale %s to 0: %v", from.Name, err)
142149
}
150+
// Wait for pods to terminate.
151+
s.waitForTerminatedPods(from, time.Duration(*params.TimeoutSeconds)*time.Second)
143152
}
144153

145154
if s.until == "0%" {
@@ -221,3 +230,40 @@ func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationCo
221230

222231
return s.rcClient.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
223232
}
233+
234+
// waitForTerminatedPods waits until all pods for the provided replication controller are terminated.
235+
func (s *RecreateDeploymentStrategy) waitForTerminatedPods(from *kapi.ReplicationController, timeout time.Duration) {
236+
selector := labels.Set(from.Spec.Selector).AsSelector()
237+
options := kapi.ListOptions{LabelSelector: selector}
238+
podList, err := s.podClient.Pods(from.Namespace).List(options)
239+
if err != nil {
240+
fmt.Fprintf(s.out, "--> Cannot list pods: %v\nNew pods may be scaled up before old pods terminate\n", err)
241+
return
242+
}
243+
// If there are no pods left, we are done.
244+
if len(podList.Items) == 0 {
245+
return
246+
}
247+
// Watch from the resource version of the list and wait for all pods to be deleted
248+
// before proceeding with the Recreate strategy.
249+
options.ResourceVersion = podList.ResourceVersion
250+
w, err := s.podClient.Pods(from.Namespace).Watch(options)
251+
if err != nil {
252+
fmt.Fprintf(s.out, "--> Watch could not be established: %v\nNew pods may be scaled up before old pods terminate\n", err)
253+
return
254+
}
255+
defer w.Stop()
256+
// Observe as many deletions as the remaining pods and then return.
257+
deletionsNeeded := len(podList.Items)
258+
condition := func(event watch.Event) (bool, error) {
259+
if event.Type == watch.Deleted {
260+
deletionsNeeded--
261+
}
262+
return deletionsNeeded == 0, nil
263+
}
264+
// TODO: Timeout should be timeout - (time.Now - deployerPodStartTime)
265+
if _, err = watch.Until(timeout, w, condition); err != nil && err != wait.ErrWaitTimeout {
266+
fmt.Fprintf(s.out, "--> Watch failed: %v\nNew pods may be scaled up before old pods terminate\n", err)
267+
}
268+
return
269+
}

pkg/deploy/strategy/recreate/recreate_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ func TestRecreate_acceptorSuccess(t *testing.T) {
276276
out: &bytes.Buffer{},
277277
errOut: &bytes.Buffer{},
278278
eventClient: fake.NewSimpleClientset().Core(),
279+
podClient: fake.NewSimpleClientset().Core(),
279280
decoder: kapi.Codecs.UniversalDecoder(),
280281
retryTimeout: 1 * time.Second,
281282
retryPeriod: 1 * time.Millisecond,
@@ -326,6 +327,7 @@ func TestRecreate_acceptorFail(t *testing.T) {
326327
retryPeriod: 1 * time.Millisecond,
327328
scaler: scaler,
328329
eventClient: fake.NewSimpleClientset().Core(),
330+
podClient: fake.NewSimpleClientset().Core(),
329331
}
330332

331333
acceptor := &testAcceptor{

0 commit comments

Comments
 (0)