Skip to content

Commit 140ed1c

Browse files
committed
collector: use collector interface for tasks
Signed-off-by: Aaron Delaney <[email protected]>
1 parent dd0a9f3 commit 140ed1c

File tree

5 files changed

+112
-200
lines changed

5 files changed

+112
-200
lines changed

collector/cluster_settings_test.go

-13
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package collector
1515

1616
import (
17-
"context"
1817
"io"
1918
"net/http"
2019
"net/http/httptest"
@@ -24,21 +23,9 @@ import (
2423
"testing"
2524

2625
"github.com/go-kit/log"
27-
"github.com/prometheus/client_golang/prometheus"
2826
"github.com/prometheus/client_golang/prometheus/testutil"
2927
)
3028

31-
type wrapCollector struct {
32-
c Collector
33-
}
34-
35-
func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) {
36-
}
37-
38-
func (w wrapCollector) Collect(ch chan<- prometheus.Metric) {
39-
w.c.Update(context.Background(), ch)
40-
}
41-
4229
func TestClusterSettingsStats(t *testing.T) {
4330
// Testcases created using:
4431
// docker run -d -p 9200:9200 elasticsearch:VERSION-alpine

collector/tasks.go

+73-100
Original file line numberDiff line numberDiff line change
@@ -14,97 +14,80 @@
1414
package collector
1515

1616
import (
17+
"context"
1718
"encoding/json"
1819
"fmt"
1920
"io"
2021
"net/http"
2122
"net/url"
22-
"path"
2323

24+
"github.com/alecthomas/kingpin/v2"
2425
"github.com/go-kit/log"
2526
"github.com/go-kit/log/level"
2627
"github.com/prometheus/client_golang/prometheus"
2728
)
2829

29-
type taskByAction struct {
30-
Type prometheus.ValueType
31-
Desc *prometheus.Desc
32-
Value func(action string, count int64) float64
33-
Labels func(action string, count int64) []string
34-
}
35-
36-
var (
37-
taskLabels = []string{"cluster", "action"}
38-
)
30+
// filterByTask global required because collector interface doesn't expose any way to take
31+
// constructor args.
32+
var actionFilter string
3933

40-
// Task Information Struct
41-
type Task struct {
42-
logger log.Logger
43-
client *http.Client
44-
url *url.URL
45-
actions string
34+
var taskActionDesc = prometheus.NewDesc(
35+
prometheus.BuildFQName(namespace, "task_stats", "action_total"),
36+
"Number of tasks of a certain action",
37+
[]string{"action"}, nil)
4638

47-
up prometheus.Gauge
48-
totalScrapes, jsonParseFailures prometheus.Counter
39+
func init() {
40+
kingpin.Flag("tasks.actions",
41+
"Filter on task actions. Used in same way as Task API actions param").
42+
Default("indices:*").StringVar(&actionFilter)
43+
registerCollector("tasks", defaultDisabled, NewTaskCollector)
44+
}
4945

50-
byActionMetrics []*taskByAction
46+
// Task Information Struct
47+
type TaskCollector struct {
48+
logger log.Logger
49+
hc *http.Client
50+
u *url.URL
5151
}
5252

53-
// NewTask defines Task Prometheus metrics
54-
func NewTask(logger log.Logger, client *http.Client, url *url.URL, actions string) *Task {
55-
return &Task{
56-
logger: logger,
57-
client: client,
58-
url: url,
59-
actions: actions,
60-
61-
up: prometheus.NewGauge(prometheus.GaugeOpts{
62-
Name: prometheus.BuildFQName(namespace, "task_stats", "up"),
63-
Help: "Was the last scrape of the ElasticSearch Task endpoint successful.",
64-
}),
65-
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
66-
Name: prometheus.BuildFQName(namespace, "task_stats", "total_scrapes"),
67-
Help: "Current total Elasticsearch snapshots scrapes.",
68-
}),
69-
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
70-
Name: prometheus.BuildFQName(namespace, "task_stats", "json_parse_failures"),
71-
Help: "Number of errors while parsing JSON.",
72-
}),
73-
byActionMetrics: []*taskByAction{
74-
{
75-
Type: prometheus.GaugeValue,
76-
Desc: prometheus.NewDesc(
77-
prometheus.BuildFQName(namespace, "task_stats", "action_total"),
78-
"Number of tasks of a certain action",
79-
[]string{"action"}, nil,
80-
),
81-
Value: func(action string, count int64) float64 {
82-
return float64(count)
83-
},
84-
Labels: func(action string, count int64) []string {
85-
return []string{action}
86-
},
87-
},
88-
},
89-
}
53+
// NewTaskCollector defines Task Prometheus metrics
54+
func NewTaskCollector(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
55+
level.Info(logger).Log("msg", "task collector created",
56+
"actionFilter", actionFilter,
57+
)
58+
59+
return &TaskCollector{
60+
logger: logger,
61+
hc: hc,
62+
u: u,
63+
}, nil
9064
}
9165

92-
// Describe adds Task metrics descriptions
93-
func (t *Task) Describe(ch chan<- *prometheus.Desc) {
94-
for _, metric := range t.byActionMetrics {
95-
ch <- metric.Desc
66+
func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
67+
stats, err := t.fetchAndDecodeAndAggregateTaskStats()
68+
if err != nil {
69+
err = fmt.Errorf("failed to fetch and decode task stats: %w", err)
70+
return err
9671
}
97-
98-
ch <- t.up.Desc()
99-
ch <- t.totalScrapes.Desc()
100-
ch <- t.jsonParseFailures.Desc()
72+
for action, count := range stats.CountByAction {
73+
ch <- prometheus.MustNewConstMetric(
74+
taskActionDesc,
75+
prometheus.GaugeValue,
76+
float64(count),
77+
action,
78+
)
79+
}
80+
return nil
10181
}
10282

103-
func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) {
104-
u := *t.url
105-
u.Path = path.Join(u.Path, "/_tasks")
106-
u.RawQuery = "group_by=none&actions=" + t.actions
107-
res, err := t.client.Get(u.String())
83+
func (t *TaskCollector) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) {
84+
u := t.u.ResolveReference(&url.URL{Path: "_tasks"})
85+
q := u.Query()
86+
q.Set("group_by", "none")
87+
q.Set("actions", actionFilter)
88+
u.RawQuery = q.Encode()
89+
90+
res, err := t.hc.Get(u.String())
10891
if err != nil {
10992
return nil, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
11093
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
@@ -126,49 +109,39 @@ func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, erro
126109

127110
bts, err := io.ReadAll(res.Body)
128111
if err != nil {
129-
t.jsonParseFailures.Inc()
130112
return nil, err
131113
}
132114

133115
var tr TasksResponse
134116
if err := json.Unmarshal(bts, &tr); err != nil {
135-
t.jsonParseFailures.Inc()
136117
return nil, err
137118
}
138119

139120
stats := AggregateTasks(tr)
140121
return stats, nil
141122
}
142123

143-
// Collect gets Task metric values
144-
func (ds *Task) Collect(ch chan<- prometheus.Metric) {
145-
ds.totalScrapes.Inc()
146-
defer func() {
147-
ch <- ds.up
148-
ch <- ds.totalScrapes
149-
ch <- ds.jsonParseFailures
150-
}()
124+
// TasksResponse is a representation of the Task management API.
125+
type TasksResponse struct {
126+
Tasks []TaskResponse `json:"tasks"`
127+
}
151128

152-
stats, err := ds.fetchAndDecodeAndAggregateTaskStats()
153-
if err != nil {
154-
ds.up.Set(0)
155-
level.Warn(ds.logger).Log(
156-
"msg", "failed to fetch and decode task stats",
157-
"err", err,
158-
)
159-
return
160-
}
129+
// TaskResponse is a representation of the individual task item returned by task API endpoint.
130+
//
131+
// We only parse a very limited amount of this API for use in aggregation.
132+
type TaskResponse struct {
133+
Action string `json:"action"`
134+
}
161135

162-
for action, count := range stats.CountByAction {
163-
for _, metric := range ds.byActionMetrics {
164-
ch <- prometheus.MustNewConstMetric(
165-
metric.Desc,
166-
metric.Type,
167-
metric.Value(action, count),
168-
metric.Labels(action, count)...,
169-
)
170-
}
171-
}
136+
type AggregatedTaskStats struct {
137+
CountByAction map[string]int64
138+
}
172139

173-
ds.up.Set(1)
140+
func AggregateTasks(t TasksResponse) *AggregatedTaskStats {
141+
actions := map[string]int64{}
142+
for _, task := range t.Tasks {
143+
actions[task.Action] += 1
144+
}
145+
agg := &AggregatedTaskStats{CountByAction: actions}
146+
return agg
174147
}

collector/tasks_response.go

-39
This file was deleted.

collector/tasks_test.go

+39-38
Original file line numberDiff line numberDiff line change
@@ -18,60 +18,61 @@ import (
1818
"net/http"
1919
"net/http/httptest"
2020
"net/url"
21+
"strings"
2122
"testing"
2223

2324
"github.com/go-kit/log"
25+
"github.com/prometheus/client_golang/prometheus/testutil"
2426
)
2527

2628
func TestTasks(t *testing.T) {
2729
// Test data was collected by running the following:
30+
// # create container
2831
// docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.17.11
2932
// sleep 15
30-
// # start some busy work
31-
// for i in $(seq 1 1000); do \
32-
// curl -o /dev/null -s -X POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' \
33-
// -d'{"abc": "'$i'"}'; done &
34-
// curl -X POST "localhost:9200/a1/_delete_by_query?requests_per_second=1&wait_for_completion=false" \
35-
// -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}}
33+
// # start some busy work in background
34+
// for i in $(seq 1 500)
35+
// do
36+
// curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a1": "'"$i"'"}'
37+
// sleep .01
38+
// curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a2": "'"$i"'"}'
39+
// sleep .01
40+
// curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a3": "'"$i"'"}'
41+
// sleep .01
42+
// done &
3643
// # try and collect a good sample
3744
// curl -X GET 'localhost:9200/_tasks?group_by=none&actions=indices:*'
38-
// docker rm elasticsearch
45+
// # cleanup
46+
// docker rm --force elasticsearch
3947
tcs := map[string]string{
40-
"7.17": `{"tasks":[{"node":"NVe9ksxcSu6AJTKlIfI24A","id":17223,"type":"transport","action":"indices:data/write/delete/byquery","start_time_in_millis":1695214684290,"running_time_in_nanos":8003510219,"cancellable":true,"cancelled":false,"headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20890,"type":"transport","action":"indices:data/write/index","start_time_in_millis":1695214692292,"running_time_in_nanos":1611966,"cancellable":false,"headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20891,"type":"transport","action":"indices:data/write/bulk[s]","start_time_in_millis":1695214692292,"running_time_in_nanos":1467298,"cancellable":false,"parent_task_id":"NVe9ksxcSu6AJTKlIfI24A:20890","headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20892,"type":"direct","action":"indices:data/write/bulk[s][p]","start_time_in_millis":1695214692292,"running_time_in_nanos":1437170,"cancellable":false,"parent_task_id":"NVe9ksxcSu6AJTKlIfI24A:20891","headers":{}}]}`,
48+
"7.17": `{"tasks":[{"node":"9lWCm1y_QkujaAg75bVx7A","id":70,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464655,"running_time_in_nanos":308640039,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":73,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464683,"running_time_in_nanos":280672000,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":76,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464711,"running_time_in_nanos":253247906,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":93,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464904,"running_time_in_nanos":60230460,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":50,"type":"transport","action":"indices:data/write/index","start_time_in_millis":1695900464229,"running_time_in_nanos":734480468,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":51,"type":"transport","action":"indices:admin/auto_create","start_time_in_millis":1695900464235,"running_time_in_nanos":729223933,"cancellable":false,"headers":{}}]}`,
4149
}
50+
want := `# HELP elasticsearch_task_stats_action_total Number of tasks of a certain action
51+
# TYPE elasticsearch_task_stats_action_total gauge
52+
elasticsearch_task_stats_action_total{action="indices:admin/auto_create"} 1
53+
elasticsearch_task_stats_action_total{action="indices:admin/index_template/put"} 4
54+
elasticsearch_task_stats_action_total{action="indices:data/write/index"} 1
55+
`
4256
for ver, out := range tcs {
43-
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
44-
fmt.Fprintln(w, out)
45-
}))
46-
defer ts.Close()
57+
t.Run(ver, func(t *testing.T) {
58+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
59+
fmt.Fprintln(w, out)
60+
}))
61+
defer ts.Close()
4762

48-
u, err := url.Parse(ts.URL)
49-
if err != nil {
50-
t.Fatalf("Failed to parse URL: %s", err)
51-
}
63+
u, err := url.Parse(ts.URL)
64+
if err != nil {
65+
t.Fatalf("Failed to parse URL: %s", err)
66+
}
5267

53-
task := NewTask(log.NewNopLogger(), http.DefaultClient, u, "indices:*")
54-
stats, err := task.fetchAndDecodeAndAggregateTaskStats()
55-
if err != nil {
56-
t.Fatalf("Failed to fetch or decode data stream stats: %s", err)
57-
}
58-
t.Logf("[%s] Task Response: %+v", ver, stats)
68+
c, err := NewTaskCollector(log.NewNopLogger(), u, ts.Client())
69+
if err != nil {
70+
t.Fatalf("Failed to create collector: %v", err)
71+
}
5972

60-
// validate actions aggregations
61-
if len(stats.CountByAction) != 4 {
62-
t.Fatal("expected to get 4 tasks")
63-
}
64-
if stats.CountByAction["indices:data/write/index"] != 1 {
65-
t.Fatal("excpected action indices:data/write/delete/byquery to have count 1")
66-
}
67-
if stats.CountByAction["indices:data/write/bulk[s]"] != 1 {
68-
t.Fatal("excpected action indices:data/write/bulk[s] to have count 1")
69-
}
70-
if stats.CountByAction["indices:data/write/bulk[s][p]"] != 1 {
71-
t.Fatal("excpected action indices:data/write/bulk[s][p] to have count 1")
72-
}
73-
if stats.CountByAction["indices:data/write/delete/byquery"] != 1 {
74-
t.Fatal("excpected action indices:data/write/delete/byquery to have count 1")
75-
}
73+
if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want)); err != nil {
74+
t.Fatalf("Metrics did not match: %v", err)
75+
}
76+
})
7677
}
7778
}

0 commit comments

Comments
 (0)