Skip to content
This repository was archived by the owner on Jan 17, 2020. It is now read-only.

Commit 2909416

Browse files
authored
Merge pull request #48 from totherme/apiserver-startup-message
Change APIServer startup message
2 parents 2105dfc + 49018b7 commit 2909416

File tree

6 files changed

+265
-80
lines changed

6 files changed

+265
-80
lines changed

integration/apiserver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (s *APIServer) Start() error {
8787
return err
8888
}
8989

90-
s.processState.StartMessage = internal.GetAPIServerStartMessage(s.processState.URL)
90+
s.processState.HealthCheckEndpoint = "/healthz"
9191

9292
s.URL = &s.processState.URL
9393
s.CertDir = s.processState.Dir

integration/internal/apiserver.go

-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package internal
22

3-
import "net/url"
4-
53
var APIServerDefaultArgs = []string{
64
"--etcd-servers={{ if .EtcdURL }}{{ .EtcdURL.String }}{{ end }}",
75
"--cert-dir={{ .CertDir }}",
@@ -17,13 +15,3 @@ func DoAPIServerArgDefaulting(args []string) []string {
1715

1816
return APIServerDefaultArgs
1917
}
20-
21-
func GetAPIServerStartMessage(u url.URL) string {
22-
if isSecureScheme(u.Scheme) {
23-
// https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/staging/src/k8s.io/apiserver/pkg/server/serve.go#L89
24-
return "Serving securely on " + u.Host
25-
}
26-
27-
// https://github.com/kubernetes/kubernetes/blob/5337ff8009d02fad613440912e540bb41e3a88b1/pkg/kubeapiserver/server/insecure_handler.go#L121
28-
return "Serving insecurely on " + u.Host
29-
}
-25
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package internal_test
22

33
import (
4-
"net/url"
5-
64
. "github.com/kubernetes-sig-testing/frameworks/integration/internal"
75
. "github.com/onsi/ginkgo"
86
. "github.com/onsi/gomega"
@@ -23,26 +21,3 @@ var _ = Describe("Apiserver", func() {
2321
}))
2422
})
2523
})
26-
27-
var _ = Describe("GetAPIServerStartMessage()", func() {
28-
Context("when using a non tls URL", func() {
29-
It("generates valid start message", func() {
30-
url := url.URL{
31-
Scheme: "http",
32-
Host: "some.insecure.apiserver:1234",
33-
}
34-
message := GetAPIServerStartMessage(url)
35-
Expect(message).To(Equal("Serving insecurely on some.insecure.apiserver:1234"))
36-
})
37-
})
38-
Context("when using a tls URL", func() {
39-
It("generates valid start message", func() {
40-
url := url.URL{
41-
Scheme: "https",
42-
Host: "some.secure.apiserver:8443",
43-
}
44-
message := GetAPIServerStartMessage(url)
45-
Expect(message).To(Equal("Serving securely on some.secure.apiserver:8443"))
46-
})
47-
})
48-
})

integration/internal/integration_tests/integration_test.go

+38-2
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ var _ = Describe("The Testing Framework", func() {
3838
fmt.Sprintf("Expected Etcd to listen for clients on %s,", etcdClientURL.Host))
3939

4040
By("Ensuring APIServer is listening")
41-
Expect(isAPIServerListening()).To(BeTrue(),
42-
fmt.Sprintf("Expected APIServer to listen on %s", apiServerURL.Host))
41+
CheckAPIServerIsReady(controlPlane.KubeCtl())
4342

4443
By("getting a kubectl & run it against the control plane")
4544
kubeCtl := controlPlane.KubeCtl()
@@ -122,3 +121,40 @@ func isSomethingListeningOnPort(hostAndPort string) portChecker {
122121
return true
123122
}
124123
}
124+
125+
// CheckAPIServerIsReady checks if the APIServer is really ready and not only
126+
// listening.
127+
//
128+
// While porting some tests in k/k
129+
// (https://github.com/hoegaarden/kubernetes/blob/287fdef1bd98646bc521f4433c1009936d5cf7a2/hack/make-rules/test-cmd-util.sh#L1524-L1535)
130+
// we found, that the APIServer was
131+
// listening but not serving certain APIs yet.
132+
//
133+
// We changed the readiness detection in the PR at
134+
// https://github.com/kubernetes-sigs/testing_frameworks/pull/48. To confirm
135+
// this changed behaviour does what it should do, we used the same test as in
136+
// k/k's test-cmd (see link above) and test if certain well-known known APIs
137+
// are actually available.
138+
func CheckAPIServerIsReady(kubeCtl *integration.KubeCtl) {
139+
expectedAPIS := []string{
140+
"/api/v1/namespaces/default/pods 200 OK",
141+
"/api/v1/namespaces/default/replicationcontrollers 200 OK",
142+
"/api/v1/namespaces/default/services 200 OK",
143+
"/apis/apps/v1/namespaces/default/daemonsets 200 OK",
144+
"/apis/apps/v1/namespaces/default/deployments 200 OK",
145+
"/apis/apps/v1/namespaces/default/replicasets 200 OK",
146+
"/apis/apps/v1/namespaces/default/statefulsets 200 OK",
147+
"/apis/autoscaling/v1/namespaces/default/horizontalpodautoscalers 200",
148+
"/apis/batch/v1/namespaces/default/jobs 200 OK",
149+
}
150+
151+
_, output, err := kubeCtl.Run("--v=6", "--namespace", "default", "get", "all", "--chunk-size=0")
152+
ExpectWithOffset(1, err).NotTo(HaveOccurred())
153+
154+
stdoutBytes, err := ioutil.ReadAll(output)
155+
ExpectWithOffset(1, err).NotTo(HaveOccurred())
156+
157+
for _, api := range expectedAPIS {
158+
ExpectWithOffset(1, string(stdoutBytes)).To(ContainSubstring(api))
159+
}
160+
}

integration/internal/process.go

+67-8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"io"
66
"io/ioutil"
7+
"net/http"
78
"net/url"
89
"os"
910
"os/exec"
@@ -16,7 +17,23 @@ import (
1617

1718
type ProcessState struct {
1819
DefaultedProcessInput
19-
Session *gexec.Session
20+
Session *gexec.Session
21+
// Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we
22+
// assume the process is ready to operate. E.g. "/healthz". If this is set,
23+
// we ignore StartMessage.
24+
HealthCheckEndpoint string
25+
// HealthCheckPollInterval is the interval which will be used for polling the
26+
// HealthCheckEndpoint.
27+
// If left empty it will default to 100 Milliseconds.
28+
HealthCheckPollInterval time.Duration
29+
// StartMessage is the message to wait for on stderr. If we recieve this
30+
// message, we assume the process is ready to operate. Ignored if
31+
// HealthCheckEndpoint is specified.
32+
//
33+
// The usage of StartMessage is discouraged, favour HealthCheckEndpoint
34+
// instead!
35+
//
36+
// Deprecated: Use HealthCheckEndpoint in favour of StartMessage
2037
StartMessage string
2138
Args []string
2239
}
@@ -86,17 +103,24 @@ func DoDefaulting(
86103
return defaults, nil
87104
}
88105

106+
type stopChannel chan struct{}
107+
89108
func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
90109
command := exec.Command(ps.Path, ps.Args...)
91110

92-
startDetectStream := gbytes.NewBuffer()
93-
detectedStart := startDetectStream.Detect(ps.StartMessage)
111+
ready := make(chan bool)
94112
timedOut := time.After(ps.StartTimeout)
113+
var pollerStopCh stopChannel
95114

96-
if stderr == nil {
97-
stderr = startDetectStream
115+
if ps.HealthCheckEndpoint != "" {
116+
healthCheckURL := ps.URL
117+
healthCheckURL.Path = ps.HealthCheckEndpoint
118+
pollerStopCh = make(stopChannel)
119+
go pollURLUntilOK(healthCheckURL, ps.HealthCheckPollInterval, ready, pollerStopCh)
98120
} else {
99-
stderr = io.MultiWriter(startDetectStream, stderr)
121+
startDetectStream := gbytes.NewBuffer()
122+
ready = startDetectStream.Detect(ps.StartMessage)
123+
stderr = safeMultiWriter(stderr, startDetectStream)
100124
}
101125

102126
ps.Session, err = gexec.Start(command, stdout, stderr)
@@ -105,14 +129,49 @@ func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
105129
}
106130

107131
select {
108-
case <-detectedStart:
132+
case <-ready:
109133
return nil
110134
case <-timedOut:
111-
ps.Session.Terminate()
135+
if pollerStopCh != nil {
136+
close(pollerStopCh)
137+
}
138+
if ps.Session != nil {
139+
ps.Session.Terminate()
140+
}
112141
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
113142
}
114143
}
115144

145+
func safeMultiWriter(writers ...io.Writer) io.Writer {
146+
safeWriters := []io.Writer{}
147+
for _, w := range writers {
148+
if w != nil {
149+
safeWriters = append(safeWriters, w)
150+
}
151+
}
152+
return io.MultiWriter(safeWriters...)
153+
}
154+
155+
func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
156+
if interval <= 0 {
157+
interval = 100 * time.Millisecond
158+
}
159+
for {
160+
res, err := http.Get(url.String())
161+
if err == nil && res.StatusCode == http.StatusOK {
162+
ready <- true
163+
return
164+
}
165+
166+
select {
167+
case <-stopCh:
168+
return
169+
default:
170+
time.Sleep(interval)
171+
}
172+
}
173+
}
174+
116175
func (ps *ProcessState) Stop() error {
117176
if ps.Session == nil {
118177
return nil

0 commit comments

Comments
 (0)