Skip to content

Add metrics collection for data stream statistics #592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ elasticsearch_exporter --help
| es.shards | 1.0.3rc1 | If true, query stats for all indices in the cluster, including shard-level stats (implies `es.indices=true`). | false |
| es.snapshots | 1.0.4rc1 | If true, query stats for the cluster snapshots. | false |
| es.slm | | If true, query stats for SLM. | false |
| es.data_stream | | If true, query state for Data Steams. | false |
| es.timeout | 1.0.2 | Timeout for trying to get stats from Elasticsearch. (ex: 20s) | 5s |
| es.ca | 1.0.2 | Path to PEM file that contains trusted Certificate Authorities for the Elasticsearch connection. | |
| es.client-private-key | 1.0.2 | Path to PEM file that contains the private key for client auth when connecting to Elasticsearch. | |
Expand Down Expand Up @@ -89,6 +90,7 @@ es.indices_settings | `indices` `monitor` (per index or `*`) |
es.shards | not sure if `indices` or `cluster` `monitor` or both |
es.snapshots | `cluster:admin/snapshot/status` and `cluster:admin/repository/get` | [ES Forum Post](https://discuss.elastic.co/t/permissions-for-backup-user-with-x-pack/88057)
es.slm | `read_slm`
es.data_stream | `monitor` or `manage` (per index or `*`) |

Further Information
- [Build in Users](https://www.elastic.co/guide/en/elastic-stack-overview/7.3/built-in-users.html)
Expand Down Expand Up @@ -240,6 +242,11 @@ Further Information
| elasticsearch_slm_stats_snapshots_deleted_total | counter | 1 | Snapshots deleted by policy
| elasticsearch_slm_stats_snapshot_deletion_failures_total | counter | 1 | Snapshot deletion failures by policy
| elasticsearch_slm_stats_operation_mode | gauge | 1 | SLM operation mode (Running, stopping, stopped)
| elasticsearch_data_stream_stats_up | gauge | 0 | Up metric for Data Stream collection
| elasticsearch_data_stream_stats_total_scrapes | counter | 0 | Total scrapes for Data Stream stats
| elasticsearch_data_stream_stats_json_parse_failures | counter | 0 | Number of parsing failures for Data Stream stats
| elasticsearch_data_stream_backing_indices_total | gauge | 1 | Number of backing indices for Data Stream
| elasticsearch_data_stream_store_size_bytes | gauge | 1 | Current size of data stream backing indices in bytes


### Alerts & Recording Rules
Expand Down
185 changes: 185 additions & 0 deletions collector/data_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

type dataStreamMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(dataStreamStats DataStreamStatsDataStream) float64
Labels func(dataStreamStats DataStreamStatsDataStream) []string
}

var (
defaultDataStreamLabels = []string{"data_stream"}
defaultDataStreamLabelValues = func(dataStreamStats DataStreamStatsDataStream) []string {
return []string{dataStreamStats.DataStream}
}
)

// DataStream Information Struct
type DataStream struct {
logger log.Logger
client *http.Client
url *url.URL

up prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter

dataStreamMetrics []*dataStreamMetric
}

// NewDataStream defines DataStream Prometheus metrics
func NewDataStream(logger log.Logger, client *http.Client, url *url.URL) *DataStream {
return &DataStream{
logger: logger,
client: client,
url: url,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "up"),
Help: "Was the last scrape of the ElasticSearch Data Stream stats endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "total_scrapes"),
Help: "Current total ElasticSearch Data STream scrapes.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
dataStreamMetrics: []*dataStreamMetric{
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "data_stream", "backing_indices_total"),
"Number of backing indices",
defaultDataStreamLabels, nil,
),
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
return float64(dataStreamStats.BackingIndices)
},
Labels: defaultDataStreamLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "data_stream", "store_size_bytes"),
"Store size of data stream",
defaultDataStreamLabels, nil,
),
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
return float64(dataStreamStats.StoreSizeBytes)
},
Labels: defaultDataStreamLabelValues,
},
},
}
}

// Describe adds DataStream metrics descriptions
func (ds *DataStream) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range ds.dataStreamMetrics {
ch <- metric.Desc
}

ch <- ds.up.Desc()
ch <- ds.totalScrapes.Desc()
ch <- ds.jsonParseFailures.Desc()
}

func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse, error) {
var dsr DataStreamStatsResponse

u := *ds.url
u.Path = path.Join(u.Path, "/_data_stream/*/_stats")
res, err := ds.client.Get(u.String())
if err != nil {
return dsr, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

defer func() {
err = res.Body.Close()
if err != nil {
_ = level.Warn(ds.logger).Log(
"msg", "failed to close http.Client",
"err", err,
)
}
}()

if res.StatusCode != http.StatusOK {
return dsr, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

bts, err := ioutil.ReadAll(res.Body)
if err != nil {
ds.jsonParseFailures.Inc()
return dsr, err
}

if err := json.Unmarshal(bts, &dsr); err != nil {
ds.jsonParseFailures.Inc()
return dsr, err
}

return dsr, nil
}

// Collect gets DataStream metric values
func (ds *DataStream) Collect(ch chan<- prometheus.Metric) {
ds.totalScrapes.Inc()
defer func() {
ch <- ds.up
ch <- ds.totalScrapes
ch <- ds.jsonParseFailures
}()

dataStreamStatsResp, err := ds.fetchAndDecodeDataStreamStats()
if err != nil {
ds.up.Set(0)
_ = level.Warn(ds.logger).Log(
"msg", "failed to fetch and decode data stream stats",
"err", err,
)
return
}

ds.up.Set(1)

for _, metric := range ds.dataStreamMetrics {
for _, dataStream := range dataStreamStatsResp.DataStreamStats {
fmt.Printf("Metric: %+v", dataStream)
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(dataStream),
metric.Labels(dataStream)...,
)
}
}
}
38 changes: 38 additions & 0 deletions collector/data_stream_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

// DataStreamStatsResponse is a representation of the Data Stream stats
type DataStreamStatsResponse struct {
Shards DataStreamStatsShards `json:"_shards"`
DataStreamCount int64 `json:"data_stream_count"`
BackingIndices int64 `json:"backing_indices"`
TotalStoreSizeBytes int64 `json:"total_store_size_bytes"`
DataStreamStats []DataStreamStatsDataStream `json:"data_streams"`
}

// DataStreamStatsShards defines data stream stats shards information structure
type DataStreamStatsShards struct {
Total int64 `json:"total"`
Successful int64 `json:"successful"`
Failed int64 `json:"failed"`
}

// DataStreamStatsDataStream defines the structure of per data stream stats
type DataStreamStatsDataStream struct {
DataStream string `json:"data_stream"`
BackingIndices int64 `json:"backing_indices"`
StoreSizeBytes int64 `json:"store_size_bytes"`
MaximumTimestamp int64 `json:"maximum_timestamp"`
}
57 changes: 57 additions & 0 deletions collector/data_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/go-kit/log"
)

func TestDataStream(t *testing.T) {
tcs := map[string]string{
"7.15.0": `{"_shards":{"total":30,"successful":30,"failed":0},"data_stream_count":2,"backing_indices":7,"total_store_size_bytes":1103028116,"data_streams":[{"data_stream":"foo","backing_indices":5,"store_size_bytes":429205396,"maximum_timestamp":1656079894000},{"data_stream":"bar","backing_indices":2,"store_size_bytes":673822720,"maximum_timestamp":1656028796000}]}`,
}
for ver, out := range tcs {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, out)
}))
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
s := NewDataStream(log.NewNopLogger(), http.DefaultClient, u)
stats, err := s.fetchAndDecodeDataStreamStats()
if err != nil {
t.Fatalf("Failed to fetch or decode data stream stats: %s", err)
}
t.Logf("[%s] Data Stream Response: %+v", ver, stats)
dataStreamStats := stats.DataStreamStats[0]

if dataStreamStats.BackingIndices != 5 {
t.Errorf("Bad number of backing indices")
}

if dataStreamStats.StoreSizeBytes != 429205396 {
t.Errorf("Bad store size bytes valuee")
}
}

}
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func main() {
esExportSLM = kingpin.Flag("es.slm",
"Export stats for SLM snapshots.").
Default("false").Bool()
esExportDataStream = kingpin.Flag("es.data_stream",
"Export stas for Data Streams.").
Default("false").Bool()
esClusterInfoInterval = kingpin.Flag("es.clusterinfo.interval",
"Cluster info update interval for the cluster label").
Default("5m").Duration()
Expand Down Expand Up @@ -201,6 +204,10 @@ func main() {
prometheus.MustRegister(collector.NewSLM(logger, httpClient, esURL))
}

if *esExportDataStream {
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
}

if *esExportClusterSettings {
prometheus.MustRegister(collector.NewClusterSettings(logger, httpClient, esURL))
}
Expand Down