diff --git a/integration/apiserver.go b/integration/apiserver.go index 2b8c575..4efe76a 100644 --- a/integration/apiserver.go +++ b/integration/apiserver.go @@ -87,7 +87,7 @@ func (s *APIServer) Start() error { return err } - s.processState.StartMessage = internal.GetAPIServerStartMessage(s.processState.URL) + s.processState.HealthCheckEndpoint = "/healthz" s.URL = &s.processState.URL s.CertDir = s.processState.Dir diff --git a/integration/internal/apiserver.go b/integration/internal/apiserver.go index 7c458cb..a5256ae 100644 --- a/integration/internal/apiserver.go +++ b/integration/internal/apiserver.go @@ -1,7 +1,5 @@ package internal -import "net/url" - var APIServerDefaultArgs = []string{ "--etcd-servers={{ if .EtcdURL }}{{ .EtcdURL.String }}{{ end }}", "--cert-dir={{ .CertDir }}", @@ -17,13 +15,3 @@ func DoAPIServerArgDefaulting(args []string) []string { return APIServerDefaultArgs } - -func GetAPIServerStartMessage(u url.URL) string { - if isSecureScheme(u.Scheme) { - // https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/staging/src/k8s.io/apiserver/pkg/server/serve.go#L89 - return "Serving securely on " + u.Host - } - - // https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/pkg/kubeapiserver/server/insecure_handler.go#L121 - return "Serving insecurely on " + u.Host -} diff --git a/integration/internal/apiserver_test.go b/integration/internal/apiserver_test.go index 13e4d95..248bc5c 100644 --- a/integration/internal/apiserver_test.go +++ b/integration/internal/apiserver_test.go @@ -1,8 +1,6 @@ package internal_test import ( - "net/url" - . "github.com/kubernetes-sig-testing/frameworks/integration/internal" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -23,26 +21,3 @@ var _ = Describe("Apiserver", func() { })) }) }) - -var _ = Describe("GetAPIServerStartMessage()", func() { - Context("when using a non tls URL", func() { - It("generates valid start message", func() { - url := url.URL{ - Scheme: "http", - Host: "some.insecure.apiserver:1234", - } - message := GetAPIServerStartMessage(url) - Expect(message).To(Equal("Serving insecurely on some.insecure.apiserver:1234")) - }) - }) - Context("when using a tls URL", func() { - It("generates valid start message", func() { - url := url.URL{ - Scheme: "https", - Host: "some.secure.apiserver:8443", - } - message := GetAPIServerStartMessage(url) - Expect(message).To(Equal("Serving securely on some.secure.apiserver:8443")) - }) - }) -}) diff --git a/integration/internal/integration_tests/integration_test.go b/integration/internal/integration_tests/integration_test.go index a190c16..fe077d5 100644 --- a/integration/internal/integration_tests/integration_test.go +++ b/integration/internal/integration_tests/integration_test.go @@ -38,8 +38,7 @@ var _ = Describe("The Testing Framework", func() { fmt.Sprintf("Expected Etcd to listen for clients on %s,", etcdClientURL.Host)) By("Ensuring APIServer is listening") - Expect(isAPIServerListening()).To(BeTrue(), - fmt.Sprintf("Expected APIServer to listen on %s", apiServerURL.Host)) + CheckAPIServerIsReady(controlPlane.KubeCtl()) By("getting a kubectl & run it against the control plane") kubeCtl := controlPlane.KubeCtl() @@ -122,3 +121,40 @@ func isSomethingListeningOnPort(hostAndPort string) portChecker { return true } } + +// CheckAPIServerIsReady checks if the APIServer is really ready and not only +// listening. +// +// While porting some tests in k/k +// (https://github.com/hoegaarden/kubernetes/blob/287fdef1bd98646bc521f4433c1009936d5cf7a2/hack/make-rules/test-cmd-util.sh#L1524-L1535) +// we found, that the APIServer was +// listening but not serving certain APIs yet. +// +// We changed the readiness detection in the PR at +// https://github.com/kubernetes-sigs/testing_frameworks/pull/48. To confirm +// this changed behaviour does what it should do, we used the same test as in +// k/k's test-cmd (see link above) and test if certain well-known known APIs +// are actually available. +func CheckAPIServerIsReady(kubeCtl *integration.KubeCtl) { + expectedAPIS := []string{ + "/api/v1/namespaces/default/pods 200 OK", + "/api/v1/namespaces/default/replicationcontrollers 200 OK", + "/api/v1/namespaces/default/services 200 OK", + "/apis/apps/v1/namespaces/default/daemonsets 200 OK", + "/apis/apps/v1/namespaces/default/deployments 200 OK", + "/apis/apps/v1/namespaces/default/replicasets 200 OK", + "/apis/apps/v1/namespaces/default/statefulsets 200 OK", + "/apis/autoscaling/v1/namespaces/default/horizontalpodautoscalers 200", + "/apis/batch/v1/namespaces/default/jobs 200 OK", + } + + _, output, err := kubeCtl.Run("--v=6", "--namespace", "default", "get", "all", "--chunk-size=0") + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + stdoutBytes, err := ioutil.ReadAll(output) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + for _, api := range expectedAPIS { + ExpectWithOffset(1, string(stdoutBytes)).To(ContainSubstring(api)) + } +} diff --git a/integration/internal/process.go b/integration/internal/process.go index 3df5027..620c6a9 100644 --- a/integration/internal/process.go +++ b/integration/internal/process.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "net/url" "os" "os/exec" @@ -16,7 +17,23 @@ import ( type ProcessState struct { DefaultedProcessInput - Session *gexec.Session + Session *gexec.Session + // Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we + // assume the process is ready to operate. E.g. "/healthz". If this is set, + // we ignore StartMessage. + HealthCheckEndpoint string + // HealthCheckPollInterval is the interval which will be used for polling the + // HealthCheckEndpoint. + // If left empty it will default to 100 Milliseconds. + HealthCheckPollInterval time.Duration + // StartMessage is the message to wait for on stderr. If we recieve this + // message, we assume the process is ready to operate. Ignored if + // HealthCheckEndpoint is specified. + // + // The usage of StartMessage is discouraged, favour HealthCheckEndpoint + // instead! + // + // Deprecated: Use HealthCheckEndpoint in favour of StartMessage StartMessage string Args []string } @@ -86,17 +103,24 @@ func DoDefaulting( return defaults, nil } +type stopChannel chan struct{} + func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) { command := exec.Command(ps.Path, ps.Args...) - startDetectStream := gbytes.NewBuffer() - detectedStart := startDetectStream.Detect(ps.StartMessage) + ready := make(chan bool) timedOut := time.After(ps.StartTimeout) + var pollerStopCh stopChannel - if stderr == nil { - stderr = startDetectStream + if ps.HealthCheckEndpoint != "" { + healthCheckURL := ps.URL + healthCheckURL.Path = ps.HealthCheckEndpoint + pollerStopCh = make(stopChannel) + go pollURLUntilOK(healthCheckURL, ps.HealthCheckPollInterval, ready, pollerStopCh) } else { - stderr = io.MultiWriter(startDetectStream, stderr) + startDetectStream := gbytes.NewBuffer() + ready = startDetectStream.Detect(ps.StartMessage) + stderr = safeMultiWriter(stderr, startDetectStream) } ps.Session, err = gexec.Start(command, stdout, stderr) @@ -105,14 +129,49 @@ func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) { } select { - case <-detectedStart: + case <-ready: return nil case <-timedOut: - ps.Session.Terminate() + if pollerStopCh != nil { + close(pollerStopCh) + } + if ps.Session != nil { + ps.Session.Terminate() + } return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path)) } } +func safeMultiWriter(writers ...io.Writer) io.Writer { + safeWriters := []io.Writer{} + for _, w := range writers { + if w != nil { + safeWriters = append(safeWriters, w) + } + } + return io.MultiWriter(safeWriters...) +} + +func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) { + if interval <= 0 { + interval = 100 * time.Millisecond + } + for { + res, err := http.Get(url.String()) + if err == nil && res.StatusCode == http.StatusOK { + ready <- true + return + } + + select { + case <-stopCh: + return + default: + time.Sleep(interval) + } + } +} + func (ps *ProcessState) Stop() error { if ps.Session == nil { return nil diff --git a/integration/internal/process_test.go b/integration/internal/process_test.go index 316a3d1..f99318b 100644 --- a/integration/internal/process_test.go +++ b/integration/internal/process_test.go @@ -2,7 +2,9 @@ package internal_test import ( "bytes" + "fmt" "io/ioutil" + "net/http" "net/url" "os" "os/exec" @@ -12,13 +14,20 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/onsi/gomega/gexec" + "github.com/onsi/gomega/ghttp" ) var _ = Describe("Start method", func() { - It("can start a process", func() { - processState := &ProcessState{} + var ( + processState *ProcessState + ) + BeforeEach(func() { + processState = &ProcessState{} processState.Path = "bash" processState.Args = simpleBashScript + }) + + It("can start a process", func() { processState.StartTimeout = 10 * time.Second processState.StartMessage = "loop 5" @@ -28,45 +37,159 @@ var _ = Describe("Start method", func() { Consistently(processState.Session.ExitCode).Should(BeNumerically("==", -1)) }) - Context("when process takes too long to start", func() { - It("returns a timeout error", func() { - processState := &ProcessState{} - processState.Path = "bash" - processState.Args = simpleBashScript - processState.StartTimeout = 200 * time.Millisecond - processState.StartMessage = "loop 5000" + Context("when a health check endpoint is provided", func() { + var server *ghttp.Server + BeforeEach(func() { + server = ghttp.NewServer() + }) + AfterEach(func() { + server.Close() + }) + + Context("when the healthcheck returns ok", func() { + BeforeEach(func() { + server.RouteToHandler("GET", "/healthz", ghttp.RespondWith(http.StatusOK, "")) + }) - err := processState.Start(nil, nil) - Expect(err).To(MatchError(ContainSubstring("timeout"))) + It("hits the endpoint, and successfully starts", func() { + processState.HealthCheckEndpoint = "/healthz" + processState.StartTimeout = 100 * time.Millisecond + processState.URL = getServerURL(server) - Eventually(processState.Session.ExitCode).Should(Equal(143)) + err := processState.Start(nil, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(server.ReceivedRequests()).To(HaveLen(1)) + Consistently(processState.Session.ExitCode).Should(BeNumerically("==", -1)) + }) }) - }) - Context("when the command cannot be started", func() { - var ( - processState *ProcessState - ) - BeforeEach(func() { - processState = &ProcessState{} - processState.Path = "/nonexistent" + Context("when the healthcheck always returns failure", func() { + BeforeEach(func() { + server.RouteToHandler("GET", "/healthz", ghttp.RespondWith(http.StatusInternalServerError, "")) + }) + It("returns a timeout error and stops health API checker", func() { + processState.HealthCheckEndpoint = "/healthz" + processState.StartTimeout = 500 * time.Millisecond + processState.URL = getServerURL(server) + + err := processState.Start(nil, nil) + Expect(err).To(MatchError(ContainSubstring("timeout"))) + + nrReceivedRequests := len(server.ReceivedRequests()) + Expect(nrReceivedRequests).To(Equal(5)) + time.Sleep(200 * time.Millisecond) + Expect(nrReceivedRequests).To(Equal(5)) + }) }) - It("propagates the error", func() { - err := processState.Start(nil, nil) + Context("when the healthcheck isn't even listening", func() { + BeforeEach(func() { + server.Close() + }) - Expect(os.IsNotExist(err)).To(BeTrue()) - }) + It("returns a timeout error", func() { + processState.HealthCheckEndpoint = "/healthz" + processState.StartTimeout = 500 * time.Millisecond - Context("but Stop() is called on it", func() { - It("does not panic", func() { - processState.Start(nil, nil) + am := &AddressManager{} + port, host, err := am.Initialize() + Expect(err).NotTo(HaveOccurred()) - stoppingFailedProcess := func() { - Expect(processState.Stop()).To(Succeed()) + processState.URL = url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", host, port), } - Expect(stoppingFailedProcess).NotTo(Panic()) + err = processState.Start(nil, nil) + Expect(err).To(MatchError(ContainSubstring("timeout"))) + }) + }) + + Context("when the healthcheck fails initially but succeeds eventually", func() { + BeforeEach(func() { + server.AppendHandlers( + ghttp.RespondWith(http.StatusInternalServerError, ""), + ghttp.RespondWith(http.StatusInternalServerError, ""), + ghttp.RespondWith(http.StatusInternalServerError, ""), + ghttp.RespondWith(http.StatusOK, ""), + ) + }) + + It("hits the endpoint repeatedly, and successfully starts", func() { + processState.HealthCheckEndpoint = "/healthz" + processState.StartTimeout = 20 * time.Second + processState.URL = getServerURL(server) + + err := processState.Start(nil, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(server.ReceivedRequests()).To(HaveLen(4)) + Consistently(processState.Session.ExitCode).Should(BeNumerically("==", -1)) + }) + + Context("when the polling interval is not configured", func() { + It("uses the default interval for polling", func() { + processState.HealthCheckEndpoint = "/helathz" + processState.StartTimeout = 300 * time.Millisecond + processState.URL = getServerURL(server) + + Expect(processState.Start(nil, nil)).To(MatchError(ContainSubstring("timeout"))) + Expect(server.ReceivedRequests()).To(HaveLen(3)) + }) + }) + + Context("when the polling interval is configured", func() { + BeforeEach(func() { + processState.HealthCheckPollInterval = time.Millisecond * 20 + }) + + It("hits the endpoint in the configured interval", func() { + processState.HealthCheckEndpoint = "/healthz" + processState.StartTimeout = 3 * processState.HealthCheckPollInterval + processState.URL = getServerURL(server) + + Expect(processState.Start(nil, nil)).To(MatchError(ContainSubstring("timeout"))) + Expect(server.ReceivedRequests()).To(HaveLen(3)) + }) + }) + }) + }) + + Context("when a health check endpoint is not provided", func() { + + Context("when process takes too long to start", func() { + It("returns a timeout error", func() { + processState.StartTimeout = 200 * time.Millisecond + processState.StartMessage = "loop 5000" + + err := processState.Start(nil, nil) + Expect(err).To(MatchError(ContainSubstring("timeout"))) + + Eventually(processState.Session.ExitCode).Should(Equal(143)) + }) + }) + + Context("when the command cannot be started", func() { + BeforeEach(func() { + processState = &ProcessState{} + processState.Path = "/nonexistent" + }) + + It("propagates the error", func() { + err := processState.Start(nil, nil) + + Expect(os.IsNotExist(err)).To(BeTrue()) + }) + + Context("but Stop() is called on it", func() { + It("does not panic", func() { + processState.Start(nil, nil) + + stoppingFailedProcess := func() { + Expect(processState.Stop()).To(Succeed()) + } + + Expect(stoppingFailedProcess).NotTo(Panic()) + }) }) }) }) @@ -76,8 +199,6 @@ var _ = Describe("Start method", func() { stdout := &bytes.Buffer{} stderr := &bytes.Buffer{} - processState := &ProcessState{} - processState.Path = "bash" processState.Args = []string{ "-c", ` @@ -239,3 +360,9 @@ var simpleBashScript = []string{ func getSimpleCommand() *exec.Cmd { return exec.Command("bash", simpleBashScript...) } + +func getServerURL(server *ghttp.Server) url.URL { + url, err := url.Parse(server.URL()) + Expect(err).NotTo(HaveOccurred()) + return *url +}