@@ -64,11 +64,12 @@ func NewTaskCollector(logger log.Logger, u *url.URL, hc *http.Client) (Collector
64
64
}
65
65
66
66
func (t * TaskCollector ) Update (ctx context.Context , ch chan <- prometheus.Metric ) error {
67
- stats , err := t .fetchAndDecodeAndAggregateTaskStats ( )
67
+ tasks , err := t .fetchTasks ( ctx )
68
68
if err != nil {
69
- err = fmt .Errorf ("failed to fetch and decode task stats: %w" , err )
70
- return err
69
+ return fmt .Errorf ("failed to fetch and decode task stats: %w" , err )
71
70
}
71
+
72
+ stats := AggregateTasks (tasks )
72
73
for action , count := range stats .CountByAction {
73
74
ch <- prometheus .MustNewConstMetric (
74
75
taskActionDesc ,
@@ -80,16 +81,17 @@ func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric)
80
81
return nil
81
82
}
82
83
83
- func (t * TaskCollector ) fetchAndDecodeAndAggregateTaskStats ( ) (* AggregatedTaskStats , error ) {
84
+ func (t * TaskCollector ) fetchTasks ( _ context. Context ) (TasksResponse , error ) {
84
85
u := t .u .ResolveReference (& url.URL {Path : "_tasks" })
85
86
q := u .Query ()
86
87
q .Set ("group_by" , "none" )
87
88
q .Set ("actions" , actionFilter )
88
89
u .RawQuery = q .Encode ()
89
90
91
+ var tr TasksResponse
90
92
res , err := t .hc .Get (u .String ())
91
93
if err != nil {
92
- return nil , fmt .Errorf ("failed to get data stream stats health from %s://%s:%s%s: %s" ,
94
+ return tr , fmt .Errorf ("failed to get data stream stats health from %s://%s:%s%s: %s" ,
93
95
u .Scheme , u .Hostname (), u .Port (), u .Path , err )
94
96
}
95
97
@@ -104,21 +106,16 @@ func (t *TaskCollector) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskSt
104
106
}()
105
107
106
108
if res .StatusCode != http .StatusOK {
107
- return nil , fmt .Errorf ("HTTP Request to %v failed with code %d" , u .String (), res .StatusCode )
109
+ return tr , fmt .Errorf ("HTTP Request to %v failed with code %d" , u .String (), res .StatusCode )
108
110
}
109
111
110
112
bts , err := io .ReadAll (res .Body )
111
113
if err != nil {
112
- return nil , err
113
- }
114
-
115
- var tr TasksResponse
116
- if err := json .Unmarshal (bts , & tr ); err != nil {
117
- return nil , err
114
+ return tr , err
118
115
}
119
116
120
- stats := AggregateTasks ( tr )
121
- return stats , nil
117
+ err = json . Unmarshal ( bts , & tr )
118
+ return tr , err
122
119
}
123
120
124
121
// TasksResponse is a representation of the Task management API.
@@ -140,7 +137,7 @@ type AggregatedTaskStats struct {
140
137
func AggregateTasks (t TasksResponse ) * AggregatedTaskStats {
141
138
actions := map [string ]int64 {}
142
139
for _ , task := range t .Tasks {
143
- actions [task .Action ] += 1
140
+ actions [task .Action ]++
144
141
}
145
142
agg := & AggregatedTaskStats {CountByAction : actions }
146
143
return agg
0 commit comments