Skip to content

Commit 283d9c1

Browse files
authored
Merge pull request #70 from verult/le-healthcheck
Add health checker to leader election library
2 parents d9b74ba + ef07e3e commit 283d9c1

File tree

3 files changed

+68
-34
lines changed

3 files changed

+68
-34
lines changed

Diff for: leaderelection/leader_election.go

+46
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"io/ioutil"
23+
"net/http"
2324
"os"
2425
"regexp"
2526
"strings"
@@ -39,6 +40,14 @@ const (
3940
defaultLeaseDuration = 15 * time.Second
4041
defaultRenewDeadline = 10 * time.Second
4142
defaultRetryPeriod = 5 * time.Second
43+
44+
DefaultHealthCheckTimeout = 20 * time.Second
45+
46+
// HealthCheckerAddress is the address at which the leader election health
47+
// checker reports status.
48+
// The caller sidecar should document this address in appropriate flag
49+
// descriptions.
50+
HealthCheckerAddress = "/healthz/leader-election"
4251
)
4352

4453
// leaderElection is a convenience wrapper around client-go's leader election library.
@@ -55,6 +64,9 @@ type leaderElection struct {
5564
// valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock,
5665
// and resourcelock.ConfigMapsResourceLock
5766
resourceLock string
67+
// healthCheck reports unhealthy if leader election fails to renew leadership
68+
// within a timeout period.
69+
healthCheck *leaderelection.HealthzAdaptor
5870

5971
leaseDuration time.Duration
6072
renewDeadline time.Duration
@@ -134,6 +146,27 @@ func (l *leaderElection) WithContext(ctx context.Context) {
134146
l.ctx = ctx
135147
}
136148

149+
// Server represents any type that could serve HTTP requests for the leader
150+
// election health check endpoint.
151+
type Server interface {
152+
Handle(pattern string, handler http.Handler)
153+
}
154+
155+
// PrepareHealthCheck creates a health check for this leader election object
156+
// with the given healthCheckTimeout and registers its HTTP handler to the given
157+
// server at the path specified by the constant "healthCheckerAddress".
158+
// healthCheckTimeout determines the max duration beyond lease expiration
159+
// allowed before reporting unhealthy.
160+
// The caller sidecar should document the handler address in appropriate flag
161+
// descriptions.
162+
func (l *leaderElection) PrepareHealthCheck(
163+
s Server,
164+
healthCheckTimeout time.Duration) {
165+
166+
l.healthCheck = leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout)
167+
s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check))
168+
}
169+
137170
func (l *leaderElection) Run() error {
138171
if l.identity == "" {
139172
id, err := defaultLeaderElectionIdentity()
@@ -179,6 +212,7 @@ func (l *leaderElection) Run() error {
179212
klog.V(3).Infof("new leader detected, current leader: %s", identity)
180213
},
181214
},
215+
WatchDog: l.healthCheck,
182216
}
183217

184218
ctx := l.ctx
@@ -220,3 +254,15 @@ func inClusterNamespace() string {
220254

221255
return "default"
222256
}
257+
258+
// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
259+
func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc {
260+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
261+
err := c(r)
262+
if err != nil {
263+
http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError)
264+
} else {
265+
fmt.Fprint(w, "ok")
266+
}
267+
})
268+
}

Diff for: metrics/metrics.go

+13-23
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
2929
"k8s.io/component-base/metrics"
30-
"k8s.io/klog/v2"
3130
)
3231

3332
const (
@@ -90,10 +89,15 @@ type CSIMetricsManager interface {
9089
// driverName - Name of the CSI driver against which this operation was executed.
9190
SetDriverName(driverName string)
9291

93-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
94-
// for this metrics manager.
95-
// If the metricsAddress is an empty string, this will be a no op.
96-
StartMetricsEndpoint(metricsAddress, metricsPath string)
92+
// RegisterToServer registers an HTTP handler for this metrics manager to the
93+
// given server at the specified address/path.
94+
RegisterToServer(s Server, metricsPath string)
95+
}
96+
97+
// Server represents any type that could serve HTTP requests for the metrics
98+
// endpoint.
99+
type Server interface {
100+
Handle(pattern string, handler http.Handler)
97101
}
98102

99103
// MetricsManagerOption is used to pass optional configuration to a
@@ -351,27 +355,13 @@ func (cmm *csiMetricsManager) SetDriverName(driverName string) {
351355
}
352356
}
353357

354-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
355-
// for this metrics manager on a new go routine.
356-
// If the metricsAddress is an empty string, this will be a no op.
357-
func (cmm *csiMetricsManager) StartMetricsEndpoint(metricsAddress, metricsPath string) {
358-
if metricsAddress == "" {
359-
klog.Warningf("metrics endpoint will not be started because `metrics-address` was not specified.")
360-
return
361-
}
362-
363-
http.Handle(metricsPath, metrics.HandlerFor(
358+
// RegisterToServer registers an HTTP handler for this metrics manager to the
359+
// given server at the specified address/path.
360+
func (cmm *csiMetricsManager) RegisterToServer(s Server, metricsPath string) {
361+
s.Handle(metricsPath, metrics.HandlerFor(
364362
cmm.GetRegistry(),
365363
metrics.HandlerOpts{
366364
ErrorHandling: metrics.ContinueOnError}))
367-
368-
// Spawn a new go routine to listen on specified endpoint
369-
go func() {
370-
err := http.ListenAndServe(metricsAddress, nil)
371-
if err != nil {
372-
klog.Fatalf("Failed to start prometheus metrics endpoint on specified address (%q) and path (%q): %s", metricsAddress, metricsPath, err)
373-
}
374-
}()
375365
}
376366

377367
// VerifyMetricsMatch is a helper function that verifies that the expected and

Diff for: metrics/metrics_test.go

+9-11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package metrics
1919
import (
2020
"io/ioutil"
2121
"net/http"
22+
"net/http/httptest"
2223
"strings"
2324
"testing"
2425
"time"
@@ -481,29 +482,26 @@ func TestRecordMetrics_Negative(t *testing.T) {
481482
}
482483
}
483484

484-
func TestStartMetricsEndPoint_Noop(t *testing.T) {
485+
func TestRegisterToServer_Noop(t *testing.T) {
485486
// Arrange
486487
cmm := NewCSIMetricsManagerForSidecar(
487488
"fake.csi.driver.io" /* driverName */)
488489
operationDuration, _ := time.ParseDuration("20s")
490+
mux := http.NewServeMux()
489491

490492
// Act
491-
cmm.StartMetricsEndpoint(":8080", "/metrics")
493+
cmm.RegisterToServer(mux, "/metrics")
492494
cmm.RecordMetrics(
493495
"/csi.v1.Controller/ControllerGetCapabilities", /* operationName */
494496
nil, /* operationErr */
495497
operationDuration /* operationDuration */)
496498

497499
// Assert
498-
request, err := http.NewRequest("GET", "http://localhost:8080/metrics", strings.NewReader(""))
499-
if err != nil {
500-
t.Fatalf("Creating request for metrics endpoint failed: %v", err)
501-
}
502-
client := &http.Client{}
503-
resp, err := client.Do(request)
504-
if err != nil {
505-
t.Fatalf("Failed to GET metrics. Error: %v", err)
506-
}
500+
request := httptest.NewRequest("GET", "/metrics", strings.NewReader(""))
501+
rec := httptest.NewRecorder()
502+
mux.ServeHTTP(rec, request)
503+
resp := rec.Result()
504+
507505
if resp.StatusCode != 200 {
508506
t.Fatalf("/metrics response status not 200. Response was: %+v", resp)
509507
}

0 commit comments

Comments
 (0)