Skip to content
This repository was archived by the owner on Jan 17, 2020. It is now read-only.

Change APIServer startup message #48

Merged
2 changes: 1 addition & 1 deletion integration/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *APIServer) Start() error {
return err
}

s.processState.StartMessage = internal.GetAPIServerStartMessage(s.processState.URL)
s.processState.HealthCheckEndpoint = "/healthz"

s.URL = &s.processState.URL
s.CertDir = s.processState.Dir
Expand Down
12 changes: 0 additions & 12 deletions integration/internal/apiserver.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package internal

import "net/url"

var APIServerDefaultArgs = []string{
"--etcd-servers={{ if .EtcdURL }}{{ .EtcdURL.String }}{{ end }}",
"--cert-dir={{ .CertDir }}",
Expand All @@ -17,13 +15,3 @@ func DoAPIServerArgDefaulting(args []string) []string {

return APIServerDefaultArgs
}

func GetAPIServerStartMessage(u url.URL) string {
if isSecureScheme(u.Scheme) {
// https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/staging/src/k8s.io/apiserver/pkg/server/serve.go#L89
return "Serving securely on " + u.Host
}

// https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/pkg/kubeapiserver/server/insecure_handler.go#L121
return "Serving insecurely on " + u.Host
}
25 changes: 0 additions & 25 deletions integration/internal/apiserver_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package internal_test

import (
"net/url"

. "github.com/kubernetes-sig-testing/frameworks/integration/internal"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -23,26 +21,3 @@ var _ = Describe("Apiserver", func() {
}))
})
})

var _ = Describe("GetAPIServerStartMessage()", func() {
Context("when using a non tls URL", func() {
It("generates valid start message", func() {
url := url.URL{
Scheme: "http",
Host: "some.insecure.apiserver:1234",
}
message := GetAPIServerStartMessage(url)
Expect(message).To(Equal("Serving insecurely on some.insecure.apiserver:1234"))
})
})
Context("when using a tls URL", func() {
It("generates valid start message", func() {
url := url.URL{
Scheme: "https",
Host: "some.secure.apiserver:8443",
}
message := GetAPIServerStartMessage(url)
Expect(message).To(Equal("Serving securely on some.secure.apiserver:8443"))
})
})
})
40 changes: 38 additions & 2 deletions integration/internal/integration_tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ var _ = Describe("The Testing Framework", func() {
fmt.Sprintf("Expected Etcd to listen for clients on %s,", etcdClientURL.Host))

By("Ensuring APIServer is listening")
Expect(isAPIServerListening()).To(BeTrue(),
fmt.Sprintf("Expected APIServer to listen on %s", apiServerURL.Host))
CheckAPIServerIsReady(controlPlane.KubeCtl())

By("getting a kubectl & run it against the control plane")
kubeCtl := controlPlane.KubeCtl()
Expand Down Expand Up @@ -122,3 +121,40 @@ func isSomethingListeningOnPort(hostAndPort string) portChecker {
return true
}
}

// CheckAPIServerIsReady checks if the APIServer is really ready and not only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

// listening.
//
// While porting some tests in k/k
// (https://github.com/hoegaarden/kubernetes/blob/287fdef1bd98646bc521f4433c1009936d5cf7a2/hack/make-rules/test-cmd-util.sh#L1524-L1535)
// we found, that the APIServer was
// listening but not serving certain APIs yet.
//
// We changed the readiness detection in the PR at
// https://github.com/kubernetes-sigs/testing_frameworks/pull/48. To confirm
// this changed behaviour does what it should do, we used the same test as in
// k/k's test-cmd (see link above) and test if certain well-known known APIs
// are actually available.
func CheckAPIServerIsReady(kubeCtl *integration.KubeCtl) {
expectedAPIS := []string{
"/api/v1/namespaces/default/pods 200 OK",
"/api/v1/namespaces/default/replicationcontrollers 200 OK",
"/api/v1/namespaces/default/services 200 OK",
"/apis/apps/v1/namespaces/default/daemonsets 200 OK",
"/apis/apps/v1/namespaces/default/deployments 200 OK",
"/apis/apps/v1/namespaces/default/replicasets 200 OK",
"/apis/apps/v1/namespaces/default/statefulsets 200 OK",
"/apis/autoscaling/v1/namespaces/default/horizontalpodautoscalers 200",
"/apis/batch/v1/namespaces/default/jobs 200 OK",
}

_, output, err := kubeCtl.Run("--v=6", "--namespace", "default", "get", "all", "--chunk-size=0")
ExpectWithOffset(1, err).NotTo(HaveOccurred())

stdoutBytes, err := ioutil.ReadAll(output)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

for _, api := range expectedAPIS {
ExpectWithOffset(1, string(stdoutBytes)).To(ContainSubstring(api))
}
}
75 changes: 67 additions & 8 deletions integration/internal/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/exec"
Expand All @@ -16,7 +17,23 @@ import (

type ProcessState struct {
DefaultedProcessInput
Session *gexec.Session
Session *gexec.Session
// Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we
// assume the process is ready to operate. E.g. "/healthz". If this is set,
// we ignore StartMessage.
HealthCheckEndpoint string
// HealthCheckPollInterval is the interval which will be used for polling the
// HealthCheckEndpoint.
// If left empty it will default to 100 Milliseconds.
HealthCheckPollInterval time.Duration
// StartMessage is the message to wait for on stderr. If we recieve this
// message, we assume the process is ready to operate. Ignored if
// HealthCheckEndpoint is specified.
//
// The usage of StartMessage is discouraged, favour HealthCheckEndpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

// instead!
//
// Deprecated: Use HealthCheckEndpoint in favour of StartMessage
StartMessage string
Args []string
}
Expand Down Expand Up @@ -86,17 +103,24 @@ func DoDefaulting(
return defaults, nil
}

type stopChannel chan struct{}

func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
command := exec.Command(ps.Path, ps.Args...)

startDetectStream := gbytes.NewBuffer()
detectedStart := startDetectStream.Detect(ps.StartMessage)
ready := make(chan bool)
timedOut := time.After(ps.StartTimeout)
var pollerStopCh stopChannel

if stderr == nil {
stderr = startDetectStream
if ps.HealthCheckEndpoint != "" {
healthCheckURL := ps.URL
healthCheckURL.Path = ps.HealthCheckEndpoint
pollerStopCh = make(stopChannel)
go pollURLUntilOK(healthCheckURL, ps.HealthCheckPollInterval, ready, pollerStopCh)
} else {
stderr = io.MultiWriter(startDetectStream, stderr)
startDetectStream := gbytes.NewBuffer()
ready = startDetectStream.Detect(ps.StartMessage)
stderr = safeMultiWriter(stderr, startDetectStream)
}

ps.Session, err = gexec.Start(command, stdout, stderr)
Expand All @@ -105,14 +129,49 @@ func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
}

select {
case <-detectedStart:
case <-ready:
return nil
case <-timedOut:
ps.Session.Terminate()
if pollerStopCh != nil {
close(pollerStopCh)
}
if ps.Session != nil {
ps.Session.Terminate()
}
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
}
}

func safeMultiWriter(writers ...io.Writer) io.Writer {
safeWriters := []io.Writer{}
for _, w := range writers {
if w != nil {
safeWriters = append(safeWriters, w)
}
}
return io.MultiWriter(safeWriters...)
}

func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
if interval <= 0 {
interval = 100 * time.Millisecond
}
for {
res, err := http.Get(url.String())
if err == nil && res.StatusCode == http.StatusOK {
ready <- true
return
}

select {
case <-stopCh:
return
default:
time.Sleep(interval)
}
}
}

func (ps *ProcessState) Stop() error {
if ps.Session == nil {
return nil
Expand Down
Loading