Skip to content

Commit f064629

Browse files
committed
feat: support for scraping multiple elasticsearch instance
1 parent d13c555 commit f064629

File tree

2 files changed

+167
-121
lines changed

2 files changed

+167
-121
lines changed

main.go

+154-108
Original file line numberDiff line numberDiff line change
@@ -143,117 +143,13 @@ func main() {
143143
esURL.User = url.UserPassword(esUsername, esPassword)
144144
}
145145

146-
// returns nil if not provided and falls back to simple TCP.
147-
tlsConfig := createTLSConfig(*esCA, *esClientCert, *esClientPrivateKey, *esInsecureSkipVerify)
148-
149-
var httpTransport http.RoundTripper
150-
151-
httpTransport = &http.Transport{
152-
TLSClientConfig: tlsConfig,
153-
Proxy: http.ProxyFromEnvironment,
154-
}
155-
156-
esAPIKey := os.Getenv("ES_API_KEY")
157-
158-
if esAPIKey != "" {
159-
httpTransport = &transportWithAPIKey{
160-
underlyingTransport: httpTransport,
161-
apiKey: esAPIKey,
162-
}
163-
}
164-
165-
httpClient := &http.Client{
166-
Timeout: *esTimeout,
167-
Transport: httpTransport,
168-
}
169-
170-
if *awsRegion != "" {
171-
httpClient.Transport, err = roundtripper.NewAWSSigningTransport(httpTransport, *awsRegion, *awsRoleArn, logger)
172-
if err != nil {
173-
level.Error(logger).Log("msg", "failed to create AWS transport", "err", err)
174-
os.Exit(1)
175-
}
176-
}
177-
178-
// version metric
179-
prometheus.MustRegister(version.NewCollector(name))
180-
181-
// create the exporter
182-
exporter, err := collector.NewElasticsearchCollector(
183-
logger,
184-
[]string{},
185-
collector.WithElasticsearchURL(esURL),
186-
collector.WithHTTPClient(httpClient),
187-
)
188-
if err != nil {
189-
level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err)
190-
os.Exit(1)
191-
}
192-
prometheus.MustRegister(exporter)
193-
194-
// TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors.
195-
// cluster info retriever
196-
clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)
197-
198-
prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
199-
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))
200-
201-
if *esExportIndices || *esExportShards {
202-
sC := collector.NewShards(logger, httpClient, esURL)
203-
prometheus.MustRegister(sC)
204-
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases)
205-
prometheus.MustRegister(iC)
206-
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
207-
level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
208-
os.Exit(1)
209-
}
210-
if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil {
211-
level.Error(logger).Log("msg", "failed to register shards collector in cluster info")
212-
os.Exit(1)
213-
}
214-
}
215-
216-
if *esExportSLM {
217-
prometheus.MustRegister(collector.NewSLM(logger, httpClient, esURL))
218-
}
219-
220-
if *esExportDataStream {
221-
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
222-
}
223-
224-
if *esExportIndicesSettings {
225-
prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
226-
}
227-
228-
if *esExportIndicesMappings {
229-
prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
230-
}
231-
232-
if *esExportILM {
233-
prometheus.MustRegister(collector.NewIlmStatus(logger, httpClient, esURL))
234-
prometheus.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL))
235-
}
146+
clusterRetrieverMap := make(map[string]*clusterinfo.Retriever)
236147

237148
// Create a context that is cancelled on SIGKILL or SIGINT.
238149
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
239150
defer cancel()
240151

241-
// start the cluster info retriever
242-
switch runErr := clusterInfoRetriever.Run(ctx); runErr {
243-
case nil:
244-
level.Info(logger).Log(
245-
"msg", "started cluster info retriever",
246-
"interval", (*esClusterInfoInterval).String(),
247-
)
248-
case clusterinfo.ErrInitialCallTimeout:
249-
level.Info(logger).Log("msg", "initial cluster info call timed out")
250-
default:
251-
level.Error(logger).Log("msg", "failed to run cluster info retriever", "err", err)
252-
os.Exit(1)
253-
}
254-
255-
// register cluster info retriever as prometheus collector
256-
prometheus.MustRegister(clusterInfoRetriever)
152+
probePath := "/probe"
257153

258154
http.Handle(*metricsPath, promhttp.Handler())
259155
if *metricsPath != "/" && *metricsPath != "" {
@@ -263,8 +159,14 @@ func main() {
263159
Version: version.Info(),
264160
Links: []web.LandingLinks{
265161
{
266-
Address: *metricsPath,
267-
Text: "Metrics",
162+
Address: *metricsPath,
163+
Text: "Metrics",
164+
Description: "Metrics endpoint exposing elasticsearch-exporter metrics in the Prometheus exposition format.",
165+
},
166+
{
167+
Address: probePath,
168+
Text: "Probe",
169+
Description: "Probe endpoint for testing the exporter against a specific Elasticsearch instance.",
268170
},
269171
},
270172
}
@@ -276,6 +178,150 @@ func main() {
276178
http.Handle("/", landingPage)
277179
}
278180

181+
http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) {
182+
params := r.URL.Query()
183+
target := params.Get("target")
184+
185+
if target != "" {
186+
targetURL, err := url.Parse(target)
187+
if err != nil {
188+
http.Error(w, "invalid target", http.StatusBadRequest)
189+
return
190+
}
191+
192+
targetUsername := os.Getenv("ES_USERNAME")
193+
targetPassword := os.Getenv("ES_PASSWORD")
194+
195+
authModule := params.Get("auth_module")
196+
if authModule != "" {
197+
targetUsername = os.Getenv(fmt.Sprintf("ES_%s_USERNAME", authModule))
198+
targetPassword = os.Getenv(fmt.Sprintf("ES_%s_PASSWORD", authModule))
199+
}
200+
201+
if targetUsername != "" && targetPassword != "" {
202+
targetURL.User = url.UserPassword(targetUsername, targetPassword)
203+
}
204+
205+
esURL = targetURL
206+
}
207+
208+
registry := prometheus.NewRegistry()
209+
// returns nil if not provided and falls back to simple TCP.
210+
tlsConfig := createTLSConfig(*esCA, *esClientCert, *esClientPrivateKey, *esInsecureSkipVerify)
211+
212+
var httpTransport http.RoundTripper
213+
214+
httpTransport = &http.Transport{
215+
TLSClientConfig: tlsConfig,
216+
Proxy: http.ProxyFromEnvironment,
217+
}
218+
219+
esAPIKey := os.Getenv("ES_API_KEY")
220+
221+
if esAPIKey != "" {
222+
httpTransport = &transportWithAPIKey{
223+
underlyingTransport: httpTransport,
224+
apiKey: esAPIKey,
225+
}
226+
}
227+
228+
httpClient := &http.Client{
229+
Timeout: *esTimeout,
230+
Transport: httpTransport,
231+
}
232+
233+
if *awsRegion != "" {
234+
httpClient.Transport, err = roundtripper.NewAWSSigningTransport(httpTransport, *awsRegion, *awsRoleArn, logger)
235+
if err != nil {
236+
level.Error(logger).Log("msg", "failed to create AWS transport", "err", err)
237+
os.Exit(1)
238+
}
239+
}
240+
241+
// version metric
242+
registry.MustRegister(version.NewCollector(name))
243+
244+
// create the exporter
245+
exporter, err := collector.NewElasticsearchCollector(
246+
logger,
247+
[]string{},
248+
collector.WithElasticsearchURL(esURL),
249+
collector.WithHTTPClient(httpClient),
250+
)
251+
if err != nil {
252+
level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err)
253+
os.Exit(1)
254+
}
255+
registry.MustRegister(exporter)
256+
257+
// TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors.
258+
// cluster info retriever
259+
260+
clusterInfoRetriever, ok := clusterRetrieverMap[target]
261+
if !ok {
262+
clusterInfoRetriever = clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)
263+
clusterRetrieverMap[target] = clusterInfoRetriever
264+
265+
// start the cluster info retriever
266+
switch runErr := clusterInfoRetriever.Run(ctx); runErr {
267+
case nil:
268+
level.Info(logger).Log(
269+
"msg", fmt.Sprintf("[%s]started cluster info retriever", esURL.Host),
270+
"interval", (*esClusterInfoInterval).String(),
271+
)
272+
case clusterinfo.ErrInitialCallTimeout:
273+
level.Info(logger).Log("msg", fmt.Sprintf("[%s]initial cluster info call timed out", esURL.Host))
274+
default:
275+
level.Error(logger).Log("msg", fmt.Sprintf("[%s]failed to run cluster info retriever", esURL.Host), "err", err)
276+
}
277+
}
278+
279+
registry.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
280+
registry.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))
281+
282+
if *esExportIndices || *esExportShards {
283+
sC := collector.NewShards(logger, httpClient, esURL)
284+
prometheus.MustRegister(sC)
285+
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases)
286+
prometheus.MustRegister(iC)
287+
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
288+
level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
289+
os.Exit(1)
290+
}
291+
if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil {
292+
level.Error(logger).Log("msg", "failed to register shards collector in cluster info")
293+
os.Exit(1)
294+
}
295+
}
296+
297+
if *esExportSLM {
298+
registry.MustRegister(collector.NewSLM(logger, httpClient, esURL))
299+
}
300+
301+
if *esExportDataStream {
302+
registry.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
303+
}
304+
305+
if *esExportIndicesSettings {
306+
registry.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
307+
}
308+
309+
if *esExportIndicesMappings {
310+
registry.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
311+
}
312+
313+
if *esExportILM {
314+
registry.MustRegister(collector.NewIlmStatus(logger, httpClient, esURL))
315+
registry.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL))
316+
}
317+
318+
// register cluster info retriever as prometheus collector
319+
registry.MustRegister(clusterInfoRetriever)
320+
321+
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
322+
h.ServeHTTP(w, r)
323+
})
324+
279325
// health endpoint
280326
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
281327
http.Error(w, http.StatusText(http.StatusOK), http.StatusOK)

pkg/clusterinfo/clusterinfo.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (r *Retriever) updateMetrics(res *Response) {
131131
u := *r.url
132132
u.User = nil
133133
url := u.String()
134-
level.Debug(r.logger).Log("msg", "updating cluster info metrics")
134+
level.Debug(r.logger).Log("msg", fmt.Sprintf("[%s]updating cluster info metrics", u.Host))
135135
// scrape failed, response is nil
136136
if res == nil {
137137
r.up.WithLabelValues(url).Set(0.0)
@@ -175,18 +175,18 @@ func (r *Retriever) Run(ctx context.Context) error {
175175
select {
176176
case <-ctx.Done():
177177
level.Info(r.logger).Log(
178-
"msg", "context cancelled, exiting cluster info update loop",
178+
"msg", fmt.Sprintf("[%s]context cancelled, exiting cluster info update loop", r.url.Host),
179179
"err", ctx.Err(),
180180
)
181181
return
182182
case <-r.sync:
183183
level.Info(r.logger).Log(
184-
"msg", "providing consumers with updated cluster info label",
184+
"msg", fmt.Sprintf("[%s]providing consumers with updated cluster info label", r.url.Host),
185185
)
186186
res, err := r.fetchAndDecodeClusterInfo()
187187
if err != nil {
188188
level.Error(r.logger).Log(
189-
"msg", "failed to retrieve cluster info from ES",
189+
"msg", fmt.Sprintf("[%s]failed to retrieve cluster info from ES", r.url.Host),
190190
"err", err,
191191
)
192192
r.updateMetrics(nil)
@@ -195,7 +195,7 @@ func (r *Retriever) Run(ctx context.Context) error {
195195
r.updateMetrics(res)
196196
for name, consumerCh := range r.consumerChannels {
197197
level.Debug(r.logger).Log(
198-
"msg", "sending update",
198+
"msg", fmt.Sprintf("[%s]sending update", r.url.Host),
199199
"consumer", name,
200200
"res", fmt.Sprintf("%+v", res),
201201
)
@@ -212,15 +212,15 @@ func (r *Retriever) Run(ctx context.Context) error {
212212
}(ctx)
213213
// trigger initial cluster info call
214214
level.Info(r.logger).Log(
215-
"msg", "triggering initial cluster info call",
215+
"msg", fmt.Sprintf("[%s]triggering initial cluster info call", r.url.Host),
216216
)
217217
r.sync <- struct{}{}
218218

219219
// start a ticker routine
220220
go func(ctx context.Context) {
221221
if r.interval <= 0 {
222222
level.Info(r.logger).Log(
223-
"msg", "no periodic cluster info label update requested",
223+
"msg", fmt.Sprintf("[%s]no periodic cluster info label update requested", r.url.Host),
224224
)
225225
return
226226
}
@@ -229,13 +229,13 @@ func (r *Retriever) Run(ctx context.Context) error {
229229
select {
230230
case <-ctx.Done():
231231
level.Info(r.logger).Log(
232-
"msg", "context cancelled, exiting cluster info trigger loop",
232+
"msg", fmt.Sprintf("[%s]context cancelled, exiting cluster info trigger loop", r.url.Host),
233233
"err", ctx.Err(),
234234
)
235235
return
236236
case <-ticker.C:
237237
level.Debug(r.logger).Log(
238-
"msg", "triggering periodic update",
238+
"msg", fmt.Sprintf("[%s]triggering periodic update", r.url.Host),
239239
)
240240
r.sync <- struct{}{}
241241
}
@@ -246,7 +246,7 @@ func (r *Retriever) Run(ctx context.Context) error {
246246
select {
247247
case <-startupComplete:
248248
// first sync has been successful
249-
level.Debug(r.logger).Log("msg", "initial clusterinfo sync succeeded")
249+
level.Debug(r.logger).Log("msg", fmt.Sprintf("[%s]initial clusterinfo sync succeeded", r.url.Host))
250250
return nil
251251
case <-time.After(initialTimeout):
252252
// initial call timed out
@@ -265,7 +265,7 @@ func (r *Retriever) fetchAndDecodeClusterInfo() (*Response, error) {
265265
res, err := r.client.Get(u.String())
266266
if err != nil {
267267
level.Error(r.logger).Log(
268-
"msg", "failed to get cluster info",
268+
"msg", fmt.Sprintf("[%s]failed to get cluster info", r.url.Host),
269269
"err", err,
270270
)
271271
return nil, err
@@ -275,14 +275,14 @@ func (r *Retriever) fetchAndDecodeClusterInfo() (*Response, error) {
275275
err = res.Body.Close()
276276
if err != nil {
277277
level.Warn(r.logger).Log(
278-
"msg", "failed to close http.Client",
278+
"msg", fmt.Sprintf("[%s]failed to close http.Client", r.url.Host),
279279
"err", err,
280280
)
281281
}
282282
}()
283283

284284
if res.StatusCode != http.StatusOK {
285-
return nil, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
285+
return nil, fmt.Errorf("[%s]HTTP Request failed with code %d", r.url.Host, res.StatusCode)
286286
}
287287

288288
bts, err := io.ReadAll(res.Body)

0 commit comments

Comments
 (0)