Skip to content
This repository was archived by the owner on Jan 17, 2020. It is now read-only.

Change APIServer startup message #48

Merged
2 changes: 1 addition & 1 deletion integration/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *APIServer) Start() error {
return err
}

s.processState.StartMessage = internal.GetAPIServerStartMessage(s.processState.URL)
s.processState.HealthCheckEndpoint = "/healthz"

s.URL = &s.processState.URL
s.CertDir = s.processState.Dir
Expand Down
12 changes: 0 additions & 12 deletions integration/internal/apiserver.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package internal

import "net/url"

var APIServerDefaultArgs = []string{
"--etcd-servers={{ if .EtcdURL }}{{ .EtcdURL.String }}{{ end }}",
"--cert-dir={{ .CertDir }}",
Expand All @@ -17,13 +15,3 @@ func DoAPIServerArgDefaulting(args []string) []string {

return APIServerDefaultArgs
}

func GetAPIServerStartMessage(u url.URL) string {
if isSecureScheme(u.Scheme) {
// https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/staging/src/k8s.io/apiserver/pkg/server/serve.go#L89
return "Serving securely on " + u.Host
}

// https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/pkg/kubeapiserver/server/insecure_handler.go#L121
return "Serving insecurely on " + u.Host
}
25 changes: 0 additions & 25 deletions integration/internal/apiserver_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package internal_test

import (
"net/url"

. "github.com/kubernetes-sig-testing/frameworks/integration/internal"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -23,26 +21,3 @@ var _ = Describe("Apiserver", func() {
}))
})
})

var _ = Describe("GetAPIServerStartMessage()", func() {
Context("when using a non tls URL", func() {
It("generates valid start message", func() {
url := url.URL{
Scheme: "http",
Host: "some.insecure.apiserver:1234",
}
message := GetAPIServerStartMessage(url)
Expect(message).To(Equal("Serving insecurely on some.insecure.apiserver:1234"))
})
})
Context("when using a tls URL", func() {
It("generates valid start message", func() {
url := url.URL{
Scheme: "https",
Host: "some.secure.apiserver:8443",
}
message := GetAPIServerStartMessage(url)
Expect(message).To(Equal("Serving securely on some.secure.apiserver:8443"))
})
})
})
40 changes: 38 additions & 2 deletions integration/internal/integration_tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ var _ = Describe("The Testing Framework", func() {
fmt.Sprintf("Expected Etcd to listen for clients on %s,", etcdClientURL.Host))

By("Ensuring APIServer is listening")
Expect(isAPIServerListening()).To(BeTrue(),
fmt.Sprintf("Expected APIServer to listen on %s", apiServerURL.Host))
CheckAPIServerIsReady(controlPlane.KubeCtl())

By("getting a kubectl & run it against the control plane")
kubeCtl := controlPlane.KubeCtl()
Expand Down Expand Up @@ -122,3 +121,40 @@ func isSomethingListeningOnPort(hostAndPort string) portChecker {
return true
}
}

// CheckAPIServerIsReady checks if the APIServer is really ready and not only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

// listening.
//
// While porting some tests in k/k
// (https://github.com/hoegaarden/kubernetes/blob/287fdef1bd98646bc521f4433c1009936d5cf7a2/hack/make-rules/test-cmd-util.sh#L1524-L1535)
// we found, that the APIServer was
// listening but not serving certain APIs yet.
//
// We changed the readiness detection in the PR at
// https://github.com/kubernetes-sigs/testing_frameworks/pull/48. To confirm
// this changed behaviour does what it should do, we used the same test as in
// k/k's test-cmd (see link above) and test if certain well-known known APIs
// are actually available.
func CheckAPIServerIsReady(kubeCtl *integration.KubeCtl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this check required? It seems more like validation of the underlying kubeapi implementation than of the fixture that is managing a kube-apiserver binary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are porting tests in k/k and had an issue there. We took the code from this test and used it in here. This should guard against the same issue re-appearing, even if we change the method by which we check that the APIServer is ready to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider documenting your rationale in a comment so that this knowledge is available to future maintainers.

expectedAPIS := []string{
"/api/v1/namespaces/default/pods 200 OK",
"/api/v1/namespaces/default/replicationcontrollers 200 OK",
"/api/v1/namespaces/default/services 200 OK",
"/apis/apps/v1/namespaces/default/daemonsets 200 OK",
"/apis/apps/v1/namespaces/default/deployments 200 OK",
"/apis/apps/v1/namespaces/default/replicasets 200 OK",
"/apis/apps/v1/namespaces/default/statefulsets 200 OK",
"/apis/autoscaling/v1/namespaces/default/horizontalpodautoscalers 200",
"/apis/batch/v1/namespaces/default/jobs 200 OK",
}

_, output, err := kubeCtl.Run("--v=6", "--namespace", "default", "get", "all", "--chunk-size=0")
ExpectWithOffset(1, err).NotTo(HaveOccurred())

stdoutBytes, err := ioutil.ReadAll(output)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

for _, api := range expectedAPIS {
ExpectWithOffset(1, string(stdoutBytes)).To(ContainSubstring(api))
}
}
91 changes: 82 additions & 9 deletions integration/internal/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/exec"
Expand All @@ -16,7 +17,19 @@ import (

type ProcessState struct {
DefaultedProcessInput
Session *gexec.Session
Session *gexec.Session
// Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we
// assume the process is ready to operate. E.g. "/healthz". If this is set,
// we ignore StartMessage.
HealthCheckEndpoint string
// StartMessage is the message to wait for on stderr. If we recieve this
// message, we assume the process is ready to operate. Ignored if
// HealthCheckEndpoint is specified.
//
// The usage of StartMessage is discouraged, favour HealthCheckEndpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

// instead!
//
// Deprecated: Use HealthCheckEndpoint in favour of StartMessage
StartMessage string
Args []string
}
Expand Down Expand Up @@ -89,14 +102,17 @@ func DoDefaulting(
func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
command := exec.Command(ps.Path, ps.Args...)

startDetectStream := gbytes.NewBuffer()
detectedStart := startDetectStream.Detect(ps.StartMessage)
timedOut := time.After(ps.StartTimeout)
ready := make(chan bool)
timedOut, timeOutPoller := teeTimer(time.After(ps.StartTimeout))

if stderr == nil {
stderr = startDetectStream
if ps.HealthCheckEndpoint != "" {
healthCheckURL := ps.URL
healthCheckURL.Path = ps.HealthCheckEndpoint
go pollURLUntilOK(healthCheckURL, ready, timeOutPoller)
} else {
stderr = io.MultiWriter(startDetectStream, stderr)
startDetectStream := gbytes.NewBuffer()
ready = startDetectStream.Detect(ps.StartMessage)
stderr = safeMultiWriter(stderr, startDetectStream)
}

ps.Session, err = gexec.Start(command, stdout, stderr)
Expand All @@ -105,14 +121,71 @@ func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
}

select {
case <-detectedStart:
case <-ready:
return nil
case <-timedOut:
ps.Session.Terminate()
if ps.Session != nil {
ps.Session.Terminate()
}
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
}
}

func teeTimer(src <-chan time.Time) (chan time.Time, chan time.Time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach is a bit heavyweight if the goal is providing a stop channel to the goroutine. Consider the following example:

var stopCh chan struct{}
...
  stopCh = make(chan struct{})
  go pollURLUntilOK(healthCheckURL, ready, stopCh)
...
select {
...
case <-timedOut:
  if stopCh != nil {
    close(stopCh)
  }
}

destLeft := make(chan time.Time)
destRight := make(chan time.Time)

go func() {
for {
t, more := <-src
destLeft <- t
destRight <- t
if !more {
close(destLeft)
close(destRight)
}
}
}()

return destLeft, destRight
}

func safeMultiWriter(writers ...io.Writer) io.Writer {
safeWriters := []io.Writer{}
for _, w := range writers {
if w != nil {
safeWriters = append(safeWriters, w)
}
}
return io.MultiWriter(safeWriters...)
}

func pollURLUntilOK(url url.URL, ready chan bool, timedOut <-chan time.Time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(No action required) The name timedOut presumes the reason this function should exit. Consider using a name that is more functional (e.g. stopCh is a common convention in k/k).

checker := func() bool {
res, err := http.Get(url.String())
if err == nil && res.StatusCode == http.StatusOK {
ready <- true
return true
}
return false
}

if isReady := checker(); isReady {
return
}

for {
select {
case <-time.Tick(time.Millisecond * 100):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following comment remains unresolved: #48 (comment)

I'm also not sure of the value of adding indirection to such a simple function. Consider simplifying:

for {
  res, err := http.Get(url.String())
  if err == nil && res.StatusCode == http.StatusOK {
    ready <- true
    return
  }
    
  select {
    case <-stopCh:
      return
    default:
      time.Sleep(pollInterval)
  }  
}

if isReady := checker(); isReady {
return
}
case <-timedOut:
return
}
}
}

func (ps *ProcessState) Stop() error {
if ps.Session == nil {
return nil
Expand Down
Loading