Skip to content

Expose metrics endpoint #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f1abcdc
refactor: addition of root ctx to main
dimakis Aug 9, 2023
16ad276
refactor: addition of metrics address
dimakis Aug 9, 2023
3298bbf
refactor: edit of the deployment and service to expose metrics ports
dimakis Aug 9, 2023
4180e7a
refactor: edit of run to start controller and health + metrics concur…
dimakis Aug 9, 2023
e42175c
refactor: making sure that the health and metrics servers only start …
dimakis Aug 9, 2023
f35715d
refactor: update health and metric port defaults from strings to ints
dimakis Aug 11, 2023
a20d09d
refactor: addition of a generic server to use for health, metrics etc
dimakis Aug 11, 2023
3e98689
refactor: use errgroup and new generic-server
dimakis Aug 11, 2023
394f411
refactor: update metrics port in deployment to standard prom port
dimakis Aug 14, 2023
6cd08e7
docs: add license
dimakis Aug 14, 2023
fc7abaa
refactor: update metrics port to use conventional prom port
dimakis Aug 14, 2023
c1518e7
refactor: move metrics funcs to own file
dimakis Aug 14, 2023
7941a94
refactor: add better error and use custom timeout
dimakis Aug 22, 2023
935698a
refactor: remove not needed comments and context
dimakis Aug 22, 2023
4b6e864
refactor: start shutdown in their own goroutines and start metrics se…
dimakis Aug 22, 2023
19a5662
docs: remove duplicate copyright notice
dimakis Aug 22, 2023
0d92fca
refactor: addition of wait groups to ensure order of shutdown and oth…
dimakis Aug 23, 2023
2b5987c
refactor: simplify the concurrency logic
dimakis Aug 23, 2023
6a0e175
refactor: update after rebase
dimakis Sep 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions cmd/kar-controllers/app/generic-server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app

import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"time"

logger "k8s.io/klog/v2"
)

type ServerOption func(*Server)

// WithTimeout sets the shutdown timeout for the server.
func WithTimeout(timeout time.Duration) ServerOption {
return func(s *Server) {
s.shutdownTimeout = timeout
}
}

type Server struct {
httpServer http.Server
listener net.Listener
endpoint string
shutdownTimeout time.Duration
}

func NewServer(port int, endpoint string, handler http.Handler, options ...ServerOption) (*Server, error) {
addr := "0"
if port != 0 {
addr = ":" + strconv.Itoa(port)
}

listener, err := newListener(addr)
if err != nil {
return nil, err
}

mux := http.NewServeMux()
mux.Handle(endpoint, handler)

s := &Server{
endpoint: endpoint,
listener: listener,
httpServer: http.Server{Handler: mux},
shutdownTimeout: 30 * time.Second, // Default value
}

for _, opt := range options {
opt(s)
}

return s, nil
}

func (s *Server) Start() (err error) {
if s.listener == nil {
logger.Infof("Serving endpoint %s is disabled", s.endpoint)
return
}

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, r)
}
}()

logger.Infof("Started serving endpoint %s at %s", s.endpoint, s.listener.Addr())
if e := s.httpServer.Serve(s.listener); e != http.ErrServerClosed {
return fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, e)
}
return
}

func (s *Server) Shutdown() error {
if s.listener == nil {
return nil
}

logger.Infof("Shutting down endpoint %s at %s (gracefully waiting for %s)", s.endpoint, s.listener.Addr(), s.shutdownTimeout)

shutdownCtx, cancel := context.WithTimeout(context.Background(), s.shutdownTimeout)
defer cancel()

// Try graceful shutdown
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("failed to shutdown server gracefully: %v", err)
}
return nil
}

// newListener creates a new TCP listener bound to the given address.
func newListener(addr string) (net.Listener, error) {
// Add a case to disable serving altogether
if addr == "0" {
return nil, nil
}

listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to create listener: %v", err)
}

return listener, nil
}
26 changes: 26 additions & 0 deletions cmd/kar-controllers/app/metrics/prom-metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package metrics

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Global Prometheus Registry
var globalPromRegistry = prometheus.NewRegistry()

// metricsHandler returns a http.Handler that serves the prometheus metrics
func PrometheusHandler() http.Handler {
// Add Go module build info.
globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector())
globalPromRegistry.MustRegister(collectors.NewGoCollector())
globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))

handlerOpts := promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
}

return promhttp.HandlerFor(globalPromRegistry, handlerOpts)
}
6 changes: 4 additions & 2 deletions cmd/kar-controllers/app/options/options.go
Original file line number Diff line number Diff line change
@@ -38,8 +38,9 @@ type ServerOption struct {
HeadOfLineHoldingTime int
QuotaEnabled bool // Controller is to evaluate quota per request
QuotaRestURL string
HealthProbeListenAddr string
HealthProbeListenPort int
DispatchResourceReservationTimeout int64
MetricsListenPort int
}

// NewServerOption creates a new CMServer with a default config.
@@ -64,7 +65,8 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&s.QuotaEnabled, "quotaEnabled", s.QuotaEnabled, "Enable quota policy evaluation. Default is false.")
fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.")
fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.")
fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'")
fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'")
fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 9090, "Listen port for metrics. Defaults to ':9090'")
fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes")
}

78 changes: 53 additions & 25 deletions cmd/kar-controllers/app/server.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,36 +17,42 @@ limitations under the License.
package app

import (
"net/http"
"strings"
"context"
"fmt"
"net/http"
"sync"

_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/pointer"

"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/metrics"
"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health"

_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"k8s.io/utils/pointer"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config"
)


func buildConfig(master, kubeconfig string) (*rest.Config, error) {
if master != "" || kubeconfig != "" {
return clientcmd.BuildConfigFromFlags(master, kubeconfig)
}
return rest.InClusterConfig()
}

func Run(opt *options.ServerOption) error {
func Run(ctx context.Context, opt *options.ServerOption) error {
restConfig, err := buildConfig(opt.Master, opt.Kubeconfig)
if err != nil {
return err
}

neverStop := make(chan struct{})

restConfig.QPS = 100.0
restConfig.Burst = 200.0

@@ -62,29 +68,51 @@ func Run(opt *options.ServerOption) error {
AgentConfigs: strings.Split(opt.AgentConfigs, ","),
}

jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig)
if jobctrl == nil {
return nil
}
jobctrl.Run(neverStop)

// This call is blocking (unless an error occurs) which equates to <-neverStop
err = listenHealthProbe(opt)
g, gCtx := errgroup.WithContext(ctx)
Copy link
Contributor

@astefanutti astefanutti Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the creation of the metrics server should be moved before jobctrl.Run, as it can fail and the context would not be cancelled, so the queue controller would not gracefully shutdown in case of errors.


// metrics server
metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler())
if err != nil {
return err
}

return nil
}

// Starts the health probe listener
func listenHealthProbe(opt *options.ServerOption) error {
handler := http.NewServeMux()
handler.Handle("/healthz", &health.Handler{})
err := http.ListenAndServe(opt.HealthProbeListenAddr, handler)
healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler())
if err != nil {
return err
}

return nil
jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig)
if jobctrl == nil {
return fmt.Errorf("failed to create a job controller")
}

wg := &sync.WaitGroup{}
wg.Add(1)
g.Go(func() error {
defer wg.Done()
jobctrl.Run(gCtx.Done())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimakis,
The jobctrl.Run function is asysnchrouous, it will fire off a few deamon goroutines and return, thusly triggering the defer call and efectively shutting down the process. (the container is constant crashbackoff loop). I would suggest making the jobctrl.Run be synchrounous or revisiting this implementation.

cc: @astefanutti

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I see when running the controller:

Screenshot 2023-09-19 at 09 52 46

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@z103cb thanks, very good catch. That Go routine should block for the group context channel to close.

return nil
})

g.Go(metricsServer.Start)
g.Go(healthServer.Start)

g.Go(func() error {
wg.Wait()
return metricsServer.Shutdown()
})

g.Go(func() error {
wg.Wait()
return healthServer.Shutdown()
})

return g.Wait()
}

func healthHandler() http.Handler {
healthHandler := http.NewServeMux()
healthHandler.Handle("/healthz", &health.Handler{})
return healthHandler
}
22 changes: 6 additions & 16 deletions cmd/kar-controllers/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,4 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors.
Licensed under the Apache License, Version 2.0 (the "License");
@@ -35,6 +20,7 @@ import (
"fmt"
"os"

"k8s.io/apiserver/pkg/server"
"k8s.io/klog/v2"

"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app"
@@ -49,8 +35,12 @@ func main() {
s.AddFlags(flagSet)
flag.Parse()

if err := app.Run(s); err != nil {
ctx := server.SetupSignalContext()

// Run the server
if err := app.Run(ctx, s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

}
18 changes: 18 additions & 0 deletions deployment/mcad-controller/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -11,9 +11,25 @@ spec:
- name: http
port: 80
targetPort: 8080
- name: metrics
port: 9090
targetPort: 9090
selector:
app: custom-metrics-apiserver
---
apiVersion: v1
kind: Service
metadata:
name: metrics
namespace: kube-system
spec:
ports:
- name: metrics
port: 9090
targetPort: 9090
selector:
app: metrics
---
#{{ if .Values.configMap.quotaRestUrl }}
apiVersion: v1
kind: Service
@@ -260,6 +276,8 @@ spec:
name: https
- containerPort: 8080
name: http
- containerPort: 9090
name: metrics
volumeMounts:
- mountPath: /tmp
name: temp-vol