Skip to content

Commit 46d539f

Browse files
csrwngJim Minter
authored and
Jim Minter
committed
oc cluster up: work around docker attach race condition
1 parent b61cf87 commit 46d539f

File tree

3 files changed

+55
-90
lines changed

3 files changed

+55
-90
lines changed

pkg/bootstrap/docker/openshift/helper.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var (
4949
RouterPorts = []int{80, 443}
5050
DefaultPorts = append(BasePorts, DefaultDNSPort)
5151
PortsWithAlternateDNS = append(BasePorts, AlternateDNSPort)
52+
AllPorts = append(append(RouterPorts, DefaultPorts...), AlternateDNSPort)
5253
SocatPidFile = filepath.Join(homedir.HomeDir(), cliconfig.OpenShiftConfigHomeDir, "socat-8443.pid")
5354
)
5455

@@ -100,14 +101,14 @@ func NewHelper(client *docker.Client, hostHelper *host.HostHelper, image, contai
100101
}
101102

102103
func (h *Helper) TestPorts(ports []int) error {
103-
portData, _, err := h.runHelper.New().Image(h.image).
104+
portData, _, _, err := h.runHelper.New().Image(h.image).
104105
DiscardContainer().
105106
Privileged().
106107
HostNetwork().
107108
HostPid().
108109
Entrypoint("/bin/bash").
109110
Command("-c", "cat /proc/net/tcp && ( [ -e /proc/net/tcp6 ] && cat /proc/net/tcp6 || true)").
110-
CombinedOutput()
111+
Output()
111112
if err != nil {
112113
return errors.NewError("Cannot get TCP port information from Kubernetes host").WithCause(err)
113114
}

pkg/bootstrap/docker/run/run.go

+35-72
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ package run
33
import (
44
"bytes"
55
"fmt"
6-
"io"
7-
"io/ioutil"
8-
"sync"
96

107
docker "github.com/fsouza/go-dockerclient"
118
"github.com/golang/glog"
@@ -37,7 +34,6 @@ type Runner struct {
3734
client *docker.Client
3835
config *docker.Config
3936
hostConfig *docker.HostConfig
40-
input io.Reader
4137
removeContainer bool
4238
}
4339

@@ -119,14 +115,6 @@ func (h *Runner) DiscardContainer() *Runner {
119115
return h
120116
}
121117

122-
// Input sets an input stream for the Docker run command
123-
func (h *Runner) Input(reader io.Reader) *Runner {
124-
h.config.OpenStdin = true
125-
h.config.StdinOnce = true
126-
h.input = reader
127-
return h
128-
}
129-
130118
// Start starts the container as a daemon and returns
131119
func (h *Runner) Start() (string, error) {
132120
id, err := h.Create()
@@ -138,22 +126,13 @@ func (h *Runner) Start() (string, error) {
138126

139127
// Output starts the container, waits for it to finish and returns its output
140128
func (h *Runner) Output() (string, string, int, error) {
141-
stdOut, errOut := &bytes.Buffer{}, &bytes.Buffer{}
142-
rc, err := h.runWithOutput(h.input, stdOut, errOut)
143-
return stdOut.String(), errOut.String(), rc, err
144-
}
145-
146-
// CombinedOutput is the same as Output, except both output and error streams
147-
// are combined into one.
148-
func (h *Runner) CombinedOutput() (string, int, error) {
149-
out := &bytes.Buffer{}
150-
rc, err := h.runWithOutput(h.input, out, out)
151-
return out.String(), rc, err
129+
return h.runWithOutput()
152130
}
153131

154132
// Run executes the container and waits until it completes
155133
func (h *Runner) Run() (int, error) {
156-
return h.runWithOutput(h.input, ioutil.Discard, ioutil.Discard)
134+
_, _, rc, err := h.runWithOutput()
135+
return rc, err
157136
}
158137

159138
func (h *Runner) Create() (string, error) {
@@ -179,10 +158,10 @@ func (h *Runner) startContainer(id string) error {
179158
return nil
180159
}
181160

182-
func (h *Runner) runWithOutput(stdIn io.Reader, stdOut, stdErr io.Writer) (int, error) {
161+
func (h *Runner) runWithOutput() (string, string, int, error) {
183162
id, err := h.Create()
184163
if err != nil {
185-
return 0, err
164+
return "", "", 0, err
186165
}
187166
if h.removeContainer {
188167
defer func() {
@@ -192,64 +171,48 @@ func (h *Runner) runWithOutput(stdIn io.Reader, stdOut, stdErr io.Writer) (int,
192171
}
193172
}()
194173
}
195-
logOut, logErr := &bytes.Buffer{}, &bytes.Buffer{}
196-
outStream := io.MultiWriter(stdOut, logOut)
197-
errStream := io.MultiWriter(stdErr, logErr)
198-
attached := make(chan struct{})
199-
attachErr := make(chan error)
200-
streamingWG := &sync.WaitGroup{}
201-
streamingWG.Add(1)
202-
go func() {
203-
glog.V(5).Infof("Attaching to container %q", id)
204-
err = h.client.AttachToContainer(docker.AttachToContainerOptions{
205-
Container: id,
206-
Logs: true,
207-
Stream: true,
208-
Stdout: true,
209-
Stderr: true,
210-
Stdin: stdIn != nil,
211-
OutputStream: outStream,
212-
ErrorStream: errStream,
213-
InputStream: stdIn,
214-
Success: attached,
215-
})
216-
if err != nil {
217-
glog.V(2).Infof("Error occurred while attaching: %v", err)
218-
attachErr <- err
219-
}
220-
streamingWG.Done()
221-
glog.V(5).Infof("Done attaching to container %q", id)
222-
}()
223-
224-
select {
225-
case <-attached:
226-
glog.V(5).Infof("Attach is successful.")
227-
case err = <-attachErr:
228-
return 0, err
229-
}
174+
230175
glog.V(5).Infof("Starting container %q", id)
231176
err = h.startContainer(id)
232177
if err != nil {
233178
glog.V(2).Infof("Error occurred starting container %q: %v", id, err)
234-
return 0, err
179+
return "", "", 0, err
235180
}
236-
glog.V(5).Infof("signaling attached channel")
237-
attached <- struct{}{}
181+
238182
glog.V(5).Infof("Waiting for container %q", id)
239183
rc, err := h.client.WaitContainer(id)
240184
if err != nil {
241-
glog.V(2).Infof("Error occurred waiting for container %q: %v, rc = %d", id, err, rc)
185+
glog.V(2).Infof("Error occurred waiting for container %q: %v", id, err)
186+
return "", "", 0, err
187+
}
188+
glog.V(5).Infof("Done waiting for container %q, rc=%d", id, rc)
189+
190+
stdOut, stdErr := &bytes.Buffer{}, &bytes.Buffer{}
191+
192+
// changed to only reading logs after execution instead of streaming
193+
// stdout/stderr to avoid race condition in (at least) docker 1.10-1.14-dev:
194+
// https://github.com/docker/docker/issues/29285
195+
glog.V(5).Infof("Reading logs from container %q", id)
196+
err = h.client.Logs(docker.LogsOptions{
197+
Container: id,
198+
Stdout: true,
199+
Stderr: true,
200+
OutputStream: stdOut,
201+
ErrorStream: stdErr,
202+
})
203+
if err != nil {
204+
glog.V(2).Infof("Error occurred while reading logs: %v", err)
205+
return "", "", 0, err
242206
}
243-
glog.V(5).Infof("Done waiting for container %q, rc=%d. Waiting for attach routine", id, rc)
244-
streamingWG.Wait()
245-
glog.V(5).Infof("Done waiting for attach routine")
246-
glog.V(5).Infof("Stdout:\n%s", logOut.String())
247-
glog.V(5).Infof("Stderr:\n%s", logErr.String())
207+
glog.V(5).Infof("Done reading logs from container %q", id)
208+
209+
glog.V(5).Infof("Stdout:\n%s", stdOut.String())
210+
glog.V(5).Infof("Stderr:\n%s", stdErr.String())
248211
if rc != 0 || err != nil {
249-
return rc, newRunError(rc, err, logOut.Bytes(), logErr.Bytes(), h.config)
212+
return stdOut.String(), stdErr.String(), rc, newRunError(rc, err, stdOut.Bytes(), stdErr.Bytes(), h.config)
250213
}
251214
glog.V(4).Infof("Container run successful\n")
252-
return 0, nil
215+
return stdOut.String(), stdErr.String(), rc, nil
253216
}
254217

255218
// printConfig prints out the relevant parts of a container's Docker config

pkg/bootstrap/docker/up.go

+17-16
Original file line numberDiff line numberDiff line change
@@ -598,31 +598,32 @@ func (c *ClientStartConfig) EnsureDefaultRedirectURIs(out io.Writer) error {
598598

599599
// CheckAvailablePorts ensures that ports used by OpenShift are available on the Docker host
600600
func (c *ClientStartConfig) CheckAvailablePorts(out io.Writer) error {
601-
for _, port := range openshift.RouterPorts {
602-
err := c.OpenShiftHelper().TestPorts([]int{port})
603-
if err != nil {
604-
fmt.Fprintf(out, "WARNING: Port %d is already in use and may cause routing issues for applications.\n", port)
605-
}
606-
}
607-
err := c.OpenShiftHelper().TestPorts(openshift.DefaultPorts)
601+
c.DNSPort = openshift.DefaultDNSPort
602+
err := c.OpenShiftHelper().TestPorts(openshift.AllPorts)
608603
if err == nil {
609-
c.DNSPort = openshift.DefaultDNSPort
610604
return nil
611605
}
612606
if !openshift.IsPortsNotAvailableErr(err) {
613607
return err
614608
}
615-
conflicts := openshift.UnavailablePorts(err)
616-
if len(conflicts) == 1 && conflicts[0] == openshift.DefaultDNSPort {
617-
err = c.OpenShiftHelper().TestPorts(openshift.PortsWithAlternateDNS)
618-
if err == nil {
619-
c.DNSPort = openshift.AlternateDNSPort
620-
fmt.Fprintf(out, "WARNING: Binding DNS on port %d instead of 53, which may not be resolvable from all clients.\n", openshift.AlternateDNSPort)
621-
return nil
609+
unavailable := sets.NewInt(openshift.UnavailablePorts(err)...)
610+
if unavailable.HasAny(openshift.BasePorts...) {
611+
return errors.NewError("a port needed by OpenShift is not available").WithCause(err)
612+
}
613+
if unavailable.Has(openshift.DefaultDNSPort) {
614+
if unavailable.Has(openshift.AlternateDNSPort) {
615+
return errors.NewError("a port needed by OpenShift is not available").WithCause(err)
622616
}
617+
c.DNSPort = openshift.AlternateDNSPort
618+
fmt.Fprintf(out, "WARNING: Binding DNS on port %d instead of 53, which may not be resolvable from all clients.\n", openshift.AlternateDNSPort)
623619
}
624620

625-
return errors.NewError("a port needed by OpenShift is not available").WithCause(err)
621+
for _, port := range openshift.RouterPorts {
622+
if unavailable.Has(port) {
623+
fmt.Fprintf(out, "WARNING: Port %d is already in use and may cause routing issues for applications.\n", port)
624+
}
625+
}
626+
return nil
626627
}
627628

628629
// DetermineServerIP gets an appropriate IP address to communicate with the OpenShift server

0 commit comments

Comments
 (0)