@@ -31,13 +31,20 @@ limitations under the License.
31
31
package app
32
32
33
33
import (
34
+ "context"
35
+ "fmt"
36
+ "net/http"
37
+ "time"
38
+
34
39
"k8s.io/client-go/rest"
35
40
"k8s.io/client-go/tools/clientcmd"
36
- "net/http"
37
41
38
42
"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options"
39
43
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob"
40
44
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health"
45
+ "github.com/prometheus/client_golang/prometheus"
46
+ "github.com/prometheus/client_golang/prometheus/collectors"
47
+ "github.com/prometheus/client_golang/prometheus/promhttp"
41
48
42
49
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
43
50
)
@@ -49,41 +56,113 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) {
49
56
return rest .InClusterConfig ()
50
57
}
51
58
52
- func Run (opt * options.ServerOption ) error {
59
+ func Run (ctx context. Context , opt * options.ServerOption ) error {
53
60
config , err := buildConfig (opt .Master , opt .Kubeconfig )
54
61
if err != nil {
55
62
return err
56
63
}
57
64
58
- neverStop := make (chan struct {})
59
-
60
65
config .QPS = 100.0
61
66
config .Burst = 200.0
62
67
63
68
jobctrl := queuejob .NewJobController (config , opt )
64
69
if jobctrl == nil {
65
- return nil
70
+ return fmt . Errorf ( "failed to create a job controller" )
66
71
}
67
- jobctrl .Run (neverStop )
68
72
69
- // This call is blocking (unless an error occurs) which equates to <-neverStop
70
- err = listenHealthProbe (opt )
73
+ stopCh := make (chan struct {})
74
+
75
+ go func () {
76
+ defer close (stopCh )
77
+ <- ctx .Done ()
78
+ }()
79
+
80
+ go jobctrl .Run (stopCh )
81
+
82
+ err = startHealthAndMetricsServers (ctx , opt )
71
83
if err != nil {
72
84
return err
73
85
}
74
86
87
+ <- ctx .Done ()
75
88
return nil
76
89
}
77
90
78
91
// Starts the health probe listener
79
- func listenHealthProbe (opt * options.ServerOption ) error {
80
- handler := http .NewServeMux ()
81
- handler .Handle ("/healthz" , & health.Handler {})
82
- err := http .ListenAndServe (opt .HealthProbeListenAddr , handler )
83
- if err != nil {
84
- return err
92
+ func startHealthAndMetricsServers (ctx context.Context , opt * options.ServerOption ) error {
93
+
94
+ // Create a new registry.
95
+ reg := prometheus .NewRegistry ()
96
+
97
+ // Add Go module build info.
98
+ reg .MustRegister (collectors .NewBuildInfoCollector ())
99
+ reg .MustRegister (collectors .NewGoCollector ())
100
+ reg .MustRegister (collectors .NewProcessCollector (collectors.ProcessCollectorOpts {}))
101
+
102
+ metricsHandler := http .NewServeMux ()
103
+
104
+ // Use the HTTPErrorOnError option for the Prometheus handler
105
+ handlerOpts := promhttp.HandlerOpts {
106
+ ErrorHandling : promhttp .HTTPErrorOnError ,
85
107
}
86
108
87
- return nil
109
+ metricsHandler .Handle ("/metrics" , promhttp .HandlerFor (prometheus .DefaultGatherer , handlerOpts ))
110
+
111
+ healthHandler := http .NewServeMux ()
112
+ healthHandler .Handle ("/healthz" , & health.Handler {})
113
+
114
+ metricsServer := & http.Server {
115
+ Addr : opt .MetricsListenAddr ,
116
+ Handler : metricsHandler ,
117
+ }
118
+
119
+ healthServer := & http.Server {
120
+ Addr : opt .HealthProbeListenAddr ,
121
+ Handler : healthHandler ,
122
+ }
123
+
124
+ // make a channel for errors for each server
125
+ metricsServerErrChan := make (chan error )
126
+ healthServerErrChan := make (chan error )
127
+
128
+ // start servers in their own goroutines
129
+ go func () {
130
+ defer close (metricsServerErrChan )
131
+ err := metricsServer .ListenAndServe ()
132
+ if err != nil && err != http .ErrServerClosed {
133
+ metricsServerErrChan <- err
134
+ }
135
+ }()
136
+
137
+ go func () {
138
+ defer close (healthServerErrChan )
139
+ err := healthServer .ListenAndServe ()
140
+ if err != nil && err != http .ErrServerClosed {
141
+ healthServerErrChan <- err
142
+ }
143
+ }()
144
+
145
+ // use select to wait for either a shutdown signal or an error
146
+ select {
147
+ case <- ctx .Done ():
148
+ // received an OS shutdown signal, shut down servers gracefully
149
+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
150
+ defer cancel ()
151
+
152
+ errM := metricsServer .Shutdown (ctx )
153
+ if errM != nil {
154
+ return fmt .Errorf ("metrics server shutdown error: %v" , errM )
155
+ }
156
+ errH := healthServer .Shutdown (ctx )
157
+ if errH != nil {
158
+ return fmt .Errorf ("health server shutdown error: %v" , errH )
159
+ }
160
+ case err := <- metricsServerErrChan :
161
+ return fmt .Errorf ("metrics server error: %v" , err )
162
+ case err := <- healthServerErrChan :
163
+ return fmt .Errorf ("health server error: %v" , err )
164
+ }
165
+
166
+ return nil
88
167
}
89
168
0 commit comments