14
14
package collector
15
15
16
16
import (
17
+ "context"
17
18
"encoding/json"
18
19
"fmt"
19
20
"io"
20
21
"net/http"
21
22
"net/url"
22
- "path"
23
23
24
+ "github.com/alecthomas/kingpin/v2"
24
25
"github.com/go-kit/log"
25
26
"github.com/go-kit/log/level"
26
27
"github.com/prometheus/client_golang/prometheus"
27
28
)
28
29
29
- type taskByAction struct {
30
- Type prometheus.ValueType
31
- Desc * prometheus.Desc
32
- Value func (action string , count int64 ) float64
33
- Labels func (action string , count int64 ) []string
34
- }
35
-
36
- var (
37
- taskLabels = []string {"cluster" , "action" }
38
- )
30
+ // filterByTask global required because collector interface doesn't expose any way to take
31
+ // constructor args.
32
+ var actionFilter string
39
33
40
- // Task Information Struct
41
- type Task struct {
42
- logger log.Logger
43
- client * http.Client
44
- url * url.URL
45
- actions string
34
+ var taskActionDesc = prometheus .NewDesc (
35
+ prometheus .BuildFQName (namespace , "task_stats" , "action_total" ),
36
+ "Number of tasks of a certain action" ,
37
+ []string {"action" }, nil )
46
38
47
- up prometheus.Gauge
48
- totalScrapes , jsonParseFailures prometheus.Counter
39
+ func init () {
40
+ kingpin .Flag ("tasks.actions" ,
41
+ "Filter on task actions. Used in same way as Task API actions param" ).
42
+ Default ("indices:*" ).StringVar (& actionFilter )
43
+ registerCollector ("tasks" , defaultDisabled , NewTaskCollector )
44
+ }
49
45
50
- byActionMetrics []* taskByAction
46
+ // Task Information Struct
47
+ type TaskCollector struct {
48
+ logger log.Logger
49
+ hc * http.Client
50
+ u * url.URL
51
51
}
52
52
53
- // NewTask defines Task Prometheus metrics
54
- func NewTask (logger log.Logger , client * http.Client , url * url.URL , actions string ) * Task {
55
- return & Task {
56
- logger : logger ,
57
- client : client ,
58
- url : url ,
59
- actions : actions ,
60
-
61
- up : prometheus .NewGauge (prometheus.GaugeOpts {
62
- Name : prometheus .BuildFQName (namespace , "task_stats" , "up" ),
63
- Help : "Was the last scrape of the ElasticSearch Task endpoint successful." ,
64
- }),
65
- totalScrapes : prometheus .NewCounter (prometheus.CounterOpts {
66
- Name : prometheus .BuildFQName (namespace , "task_stats" , "total_scrapes" ),
67
- Help : "Current total Elasticsearch snapshots scrapes." ,
68
- }),
69
- jsonParseFailures : prometheus .NewCounter (prometheus.CounterOpts {
70
- Name : prometheus .BuildFQName (namespace , "task_stats" , "json_parse_failures" ),
71
- Help : "Number of errors while parsing JSON." ,
72
- }),
73
- byActionMetrics : []* taskByAction {
74
- {
75
- Type : prometheus .GaugeValue ,
76
- Desc : prometheus .NewDesc (
77
- prometheus .BuildFQName (namespace , "task_stats" , "action_total" ),
78
- "Number of tasks of a certain action" ,
79
- []string {"action" }, nil ,
80
- ),
81
- Value : func (action string , count int64 ) float64 {
82
- return float64 (count )
83
- },
84
- Labels : func (action string , count int64 ) []string {
85
- return []string {action }
86
- },
87
- },
88
- },
89
- }
53
+ // NewTaskCollector defines Task Prometheus metrics
54
+ func NewTaskCollector (logger log.Logger , u * url.URL , hc * http.Client ) (Collector , error ) {
55
+ level .Info (logger ).Log ("msg" , "task collector created" ,
56
+ "actionFilter" , actionFilter ,
57
+ )
58
+
59
+ return & TaskCollector {
60
+ logger : logger ,
61
+ hc : hc ,
62
+ u : u ,
63
+ }, nil
90
64
}
91
65
92
- // Describe adds Task metrics descriptions
93
- func (t * Task ) Describe (ch chan <- * prometheus.Desc ) {
94
- for _ , metric := range t .byActionMetrics {
95
- ch <- metric .Desc
66
+ func (t * TaskCollector ) Update (ctx context.Context , ch chan <- prometheus.Metric ) error {
67
+ stats , err := t .fetchAndDecodeAndAggregateTaskStats ()
68
+ if err != nil {
69
+ err = fmt .Errorf ("failed to fetch and decode task stats: %w" , err )
70
+ return err
96
71
}
97
-
98
- ch <- t .up .Desc ()
99
- ch <- t .totalScrapes .Desc ()
100
- ch <- t .jsonParseFailures .Desc ()
72
+ for action , count := range stats .CountByAction {
73
+ ch <- prometheus .MustNewConstMetric (
74
+ taskActionDesc ,
75
+ prometheus .GaugeValue ,
76
+ float64 (count ),
77
+ action ,
78
+ )
79
+ }
80
+ return nil
101
81
}
102
82
103
- func (t * Task ) fetchAndDecodeAndAggregateTaskStats () (* AggregatedTaskStats , error ) {
104
- u := * t .url
105
- u .Path = path .Join (u .Path , "/_tasks" )
106
- u .RawQuery = "group_by=none&actions=" + t .actions
107
- res , err := t .client .Get (u .String ())
83
+ func (t * TaskCollector ) fetchAndDecodeAndAggregateTaskStats () (* AggregatedTaskStats , error ) {
84
+ u := t .u .ResolveReference (& url.URL {Path : "_tasks" })
85
+ q := u .Query ()
86
+ q .Set ("group_by" , "none" )
87
+ q .Set ("actions" , actionFilter )
88
+ u .RawQuery = q .Encode ()
89
+
90
+ res , err := t .hc .Get (u .String ())
108
91
if err != nil {
109
92
return nil , fmt .Errorf ("failed to get data stream stats health from %s://%s:%s%s: %s" ,
110
93
u .Scheme , u .Hostname (), u .Port (), u .Path , err )
@@ -126,49 +109,39 @@ func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, erro
126
109
127
110
bts , err := io .ReadAll (res .Body )
128
111
if err != nil {
129
- t .jsonParseFailures .Inc ()
130
112
return nil , err
131
113
}
132
114
133
115
var tr TasksResponse
134
116
if err := json .Unmarshal (bts , & tr ); err != nil {
135
- t .jsonParseFailures .Inc ()
136
117
return nil , err
137
118
}
138
119
139
120
stats := AggregateTasks (tr )
140
121
return stats , nil
141
122
}
142
123
143
- // Collect gets Task metric values
144
- func (ds * Task ) Collect (ch chan <- prometheus.Metric ) {
145
- ds .totalScrapes .Inc ()
146
- defer func () {
147
- ch <- ds .up
148
- ch <- ds .totalScrapes
149
- ch <- ds .jsonParseFailures
150
- }()
124
+ // TasksResponse is a representation of the Task management API.
125
+ type TasksResponse struct {
126
+ Tasks []TaskResponse `json:"tasks"`
127
+ }
151
128
152
- stats , err := ds .fetchAndDecodeAndAggregateTaskStats ()
153
- if err != nil {
154
- ds .up .Set (0 )
155
- level .Warn (ds .logger ).Log (
156
- "msg" , "failed to fetch and decode task stats" ,
157
- "err" , err ,
158
- )
159
- return
160
- }
129
+ // TaskResponse is a representation of the individual task item returned by task API endpoint.
130
+ //
131
+ // We only parse a very limited amount of this API for use in aggregation.
132
+ type TaskResponse struct {
133
+ Action string `json:"action"`
134
+ }
161
135
162
- for action , count := range stats .CountByAction {
163
- for _ , metric := range ds .byActionMetrics {
164
- ch <- prometheus .MustNewConstMetric (
165
- metric .Desc ,
166
- metric .Type ,
167
- metric .Value (action , count ),
168
- metric .Labels (action , count )... ,
169
- )
170
- }
171
- }
136
+ type AggregatedTaskStats struct {
137
+ CountByAction map [string ]int64
138
+ }
172
139
173
- ds .up .Set (1 )
140
+ func AggregateTasks (t TasksResponse ) * AggregatedTaskStats {
141
+ actions := map [string ]int64 {}
142
+ for _ , task := range t .Tasks {
143
+ actions [task .Action ] += 1
144
+ }
145
+ agg := & AggregatedTaskStats {CountByAction : actions }
146
+ return agg
174
147
}
0 commit comments