Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

oc cluster up: work around docker attach race condition #12223

Merged
merged 1 commit into from
Dec 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/bootstrap/docker/openshift/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
RouterPorts = []int{80, 443}
DefaultPorts = append(BasePorts, DefaultDNSPort)
PortsWithAlternateDNS = append(BasePorts, AlternateDNSPort)
AllPorts = append(append(RouterPorts, DefaultPorts...), AlternateDNSPort)
SocatPidFile = filepath.Join(homedir.HomeDir(), cliconfig.OpenShiftConfigHomeDir, "socat-8443.pid")
)

Expand Down Expand Up @@ -100,14 +101,14 @@ func NewHelper(client *docker.Client, hostHelper *host.HostHelper, image, contai
}

func (h *Helper) TestPorts(ports []int) error {
portData, _, err := h.runHelper.New().Image(h.image).
portData, _, _, err := h.runHelper.New().Image(h.image).
DiscardContainer().
Privileged().
HostNetwork().
HostPid().
Entrypoint("/bin/bash").
Command("-c", "cat /proc/net/tcp && ( [ -e /proc/net/tcp6 ] && cat /proc/net/tcp6 || true)").
CombinedOutput()
Output()
if err != nil {
return errors.NewError("Cannot get TCP port information from Kubernetes host").WithCause(err)
}
Expand Down
107 changes: 35 additions & 72 deletions pkg/bootstrap/docker/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package run
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"sync"

docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
Expand Down Expand Up @@ -37,7 +34,6 @@ type Runner struct {
client *docker.Client
config *docker.Config
hostConfig *docker.HostConfig
input io.Reader
removeContainer bool
}

Expand Down Expand Up @@ -119,14 +115,6 @@ func (h *Runner) DiscardContainer() *Runner {
return h
}

// Input sets an input stream for the Docker run command
func (h *Runner) Input(reader io.Reader) *Runner {
h.config.OpenStdin = true
h.config.StdinOnce = true
h.input = reader
return h
}

// Start starts the container as a daemon and returns
func (h *Runner) Start() (string, error) {
id, err := h.Create()
Expand All @@ -138,22 +126,13 @@ func (h *Runner) Start() (string, error) {

// Output starts the container, waits for it to finish and returns its output
func (h *Runner) Output() (string, string, int, error) {
stdOut, errOut := &bytes.Buffer{}, &bytes.Buffer{}
rc, err := h.runWithOutput(h.input, stdOut, errOut)
return stdOut.String(), errOut.String(), rc, err
}

// CombinedOutput is the same as Output, except both output and error streams
// are combined into one.
func (h *Runner) CombinedOutput() (string, int, error) {
out := &bytes.Buffer{}
rc, err := h.runWithOutput(h.input, out, out)
return out.String(), rc, err
return h.runWithOutput()
}

// Run executes the container and waits until it completes
func (h *Runner) Run() (int, error) {
return h.runWithOutput(h.input, ioutil.Discard, ioutil.Discard)
_, _, rc, err := h.runWithOutput()
return rc, err
}

func (h *Runner) Create() (string, error) {
Expand All @@ -179,10 +158,10 @@ func (h *Runner) startContainer(id string) error {
return nil
}

func (h *Runner) runWithOutput(stdIn io.Reader, stdOut, stdErr io.Writer) (int, error) {
func (h *Runner) runWithOutput() (string, string, int, error) {
id, err := h.Create()
if err != nil {
return 0, err
return "", "", 0, err
}
if h.removeContainer {
defer func() {
Expand All @@ -192,64 +171,48 @@ func (h *Runner) runWithOutput(stdIn io.Reader, stdOut, stdErr io.Writer) (int,
}
}()
}
logOut, logErr := &bytes.Buffer{}, &bytes.Buffer{}
outStream := io.MultiWriter(stdOut, logOut)
errStream := io.MultiWriter(stdErr, logErr)
attached := make(chan struct{})
attachErr := make(chan error)
streamingWG := &sync.WaitGroup{}
streamingWG.Add(1)
go func() {
glog.V(5).Infof("Attaching to container %q", id)
err = h.client.AttachToContainer(docker.AttachToContainerOptions{
Container: id,
Logs: true,
Stream: true,
Stdout: true,
Stderr: true,
Stdin: stdIn != nil,
OutputStream: outStream,
ErrorStream: errStream,
InputStream: stdIn,
Success: attached,
})
if err != nil {
glog.V(2).Infof("Error occurred while attaching: %v", err)
attachErr <- err
}
streamingWG.Done()
glog.V(5).Infof("Done attaching to container %q", id)
}()

select {
case <-attached:
glog.V(5).Infof("Attach is successful.")
case err = <-attachErr:
return 0, err
}

glog.V(5).Infof("Starting container %q", id)
err = h.startContainer(id)
if err != nil {
glog.V(2).Infof("Error occurred starting container %q: %v", id, err)
return 0, err
return "", "", 0, err
}
glog.V(5).Infof("signaling attached channel")
attached <- struct{}{}

glog.V(5).Infof("Waiting for container %q", id)
rc, err := h.client.WaitContainer(id)
if err != nil {
glog.V(2).Infof("Error occurred waiting for container %q: %v, rc = %d", id, err, rc)
glog.V(2).Infof("Error occurred waiting for container %q: %v", id, err)
return "", "", 0, err
}
glog.V(5).Infof("Done waiting for container %q, rc=%d", id, rc)

stdOut, stdErr := &bytes.Buffer{}, &bytes.Buffer{}

// changed to only reading logs after execution instead of streaming
// stdout/stderr to avoid race condition in (at least) docker 1.10-1.14-dev:
// https://github.com/docker/docker/issues/29285
glog.V(5).Infof("Reading logs from container %q", id)
err = h.client.Logs(docker.LogsOptions{
Container: id,
Stdout: true,
Stderr: true,
OutputStream: stdOut,
ErrorStream: stdErr,
})
if err != nil {
glog.V(2).Infof("Error occurred while reading logs: %v", err)
return "", "", 0, err
}
glog.V(5).Infof("Done waiting for container %q, rc=%d. Waiting for attach routine", id, rc)
streamingWG.Wait()
glog.V(5).Infof("Done waiting for attach routine")
glog.V(5).Infof("Stdout:\n%s", logOut.String())
glog.V(5).Infof("Stderr:\n%s", logErr.String())
glog.V(5).Infof("Done reading logs from container %q", id)

glog.V(5).Infof("Stdout:\n%s", stdOut.String())
glog.V(5).Infof("Stderr:\n%s", stdErr.String())
if rc != 0 || err != nil {
return rc, newRunError(rc, err, logOut.Bytes(), logErr.Bytes(), h.config)
return stdOut.String(), stdErr.String(), rc, newRunError(rc, err, stdOut.Bytes(), stdErr.Bytes(), h.config)
}
glog.V(4).Infof("Container run successful\n")
return 0, nil
return stdOut.String(), stdErr.String(), rc, nil
}

// printConfig prints out the relevant parts of a container's Docker config
Expand Down
33 changes: 17 additions & 16 deletions pkg/bootstrap/docker/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,31 +598,32 @@ func (c *ClientStartConfig) EnsureDefaultRedirectURIs(out io.Writer) error {

// CheckAvailablePorts ensures that ports used by OpenShift are available on the Docker host
func (c *ClientStartConfig) CheckAvailablePorts(out io.Writer) error {
for _, port := range openshift.RouterPorts {
err := c.OpenShiftHelper().TestPorts([]int{port})
if err != nil {
fmt.Fprintf(out, "WARNING: Port %d is already in use and may cause routing issues for applications.\n", port)
}
}
err := c.OpenShiftHelper().TestPorts(openshift.DefaultPorts)
c.DNSPort = openshift.DefaultDNSPort
err := c.OpenShiftHelper().TestPorts(openshift.AllPorts)
if err == nil {
c.DNSPort = openshift.DefaultDNSPort
return nil
}
if !openshift.IsPortsNotAvailableErr(err) {
return err
}
conflicts := openshift.UnavailablePorts(err)
if len(conflicts) == 1 && conflicts[0] == openshift.DefaultDNSPort {
err = c.OpenShiftHelper().TestPorts(openshift.PortsWithAlternateDNS)
if err == nil {
c.DNSPort = openshift.AlternateDNSPort
fmt.Fprintf(out, "WARNING: Binding DNS on port %d instead of 53, which may not be resolvable from all clients.\n", openshift.AlternateDNSPort)
return nil
unavailable := sets.NewInt(openshift.UnavailablePorts(err)...)
if unavailable.HasAny(openshift.BasePorts...) {
return errors.NewError("a port needed by OpenShift is not available").WithCause(err)
}
if unavailable.Has(openshift.DefaultDNSPort) {
if unavailable.Has(openshift.AlternateDNSPort) {
return errors.NewError("a port needed by OpenShift is not available").WithCause(err)
}
c.DNSPort = openshift.AlternateDNSPort
fmt.Fprintf(out, "WARNING: Binding DNS on port %d instead of 53, which may not be resolvable from all clients.\n", openshift.AlternateDNSPort)
}

return errors.NewError("a port needed by OpenShift is not available").WithCause(err)
for _, port := range openshift.RouterPorts {
if unavailable.Has(port) {
fmt.Fprintf(out, "WARNING: Port %d is already in use and may cause routing issues for applications.\n", port)
}
}
return nil
}

// DetermineServerIP gets an appropriate IP address to communicate with the OpenShift server
Expand Down