Skip to content

refactor pg_stat_bgwriter metrics into standalone collector #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## master / unreleased

* [CHANGE] pg_stat_bgwriter counter metrics had the `_total` suffix added #556
* [ENHANCEMENT] Add pg_database_size_bytes metric #613

## 0.10.1 / 2022-01-14
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra
* `help`
Show context-sensitive help (also try --help-long and --help-man).

* `collector.database`
Enable the pg_database collector. Default is `enabled`

* `collector.bgwriter`
Enable the pg_stat_bgwriter collector. Default is `enabled`

* `web.listen-address`
Address to listen on for web interface and telemetry. Default is `:9187`.

Expand Down
7 changes: 6 additions & 1 deletion cmd/postgres_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ func main() {

prometheus.MustRegister(exporter)

pe, err := collector.NewPostgresCollector(logger, dsn)
pe, err := collector.NewPostgresCollector(
logger,
dsn,
[]string{},
collector.WithAutoDiscoverDatabases(*autoDiscoverDatabases),
)
if err != nil {
level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
os.Exit(1)
Expand Down
17 changes: 0 additions & 17 deletions cmd/postgres_exporter/postgres_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,6 @@ func dumpMaps() {
}

var builtinMetricMaps = map[string]intermediateMetricMap{
"pg_stat_bgwriter": {
map[string]ColumnMapping{
"checkpoints_timed": {COUNTER, "Number of scheduled checkpoints that have been performed", nil, nil},
"checkpoints_req": {COUNTER, "Number of requested checkpoints that have been performed", nil, nil},
"checkpoint_write_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are written to disk, in milliseconds", nil, nil},
"checkpoint_sync_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are synchronized to disk, in milliseconds", nil, nil},
"buffers_checkpoint": {COUNTER, "Number of buffers written during checkpoints", nil, nil},
"buffers_clean": {COUNTER, "Number of buffers written by the background writer", nil, nil},
"maxwritten_clean": {COUNTER, "Number of times the background writer stopped a cleaning scan because it had written too many buffers", nil, nil},
"buffers_backend": {COUNTER, "Number of buffers written directly by a backend", nil, nil},
"buffers_backend_fsync": {COUNTER, "Number of times a backend had to execute its own fsync call (normally the background writer handles those even when the backend does its own write)", nil, nil},
"buffers_alloc": {COUNTER, "Number of buffers allocated", nil, nil},
"stats_reset": {COUNTER, "Time at which these statistics were last reset", nil, nil},
},
true,
0,
},
"pg_stat_database": {
map[string]ColumnMapping{
"datid": {LABEL, "OID of a database", nil, nil},
Expand Down
61 changes: 46 additions & 15 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,27 @@ type PostgresCollector struct {
logger log.Logger

servers map[string]*server

// autoDiscoverDatabases will cause the collector to query the database
// to find other servers and also scrape them.
autoDiscoverDatabases bool
}

type Option func(*PostgresCollector) error

// NewPostgresCollector creates a new PostgresCollector.
func NewPostgresCollector(logger log.Logger, dsns []string, filters ...string) (*PostgresCollector, error) {
func NewPostgresCollector(logger log.Logger, dsns []string, filters []string, options ...Option) (*PostgresCollector, error) {
p := &PostgresCollector{
logger: logger,
}
// Apply options to customize the collector
for _, o := range options {
err := o(p)
if err != nil {
return nil, err
}
}

f := make(map[string]bool)
for _, filter := range filters {
enabled, exist := collectorState[filter]
Expand Down Expand Up @@ -121,48 +138,62 @@ func NewPostgresCollector(logger log.Logger, dsns []string, filters ...string) (
}
}

p.Collectors = collectors

servers := make(map[string]*server)
for _, dsn := range dsns {
s, err := makeServer(dsn)
if err != nil {
return nil, err
}
// Manually provided servers are always classified as "primary"
s.isPrimary = true

// TODO(@sysadmind): We need to discover the downstream servers and add them here.
// if p.autoDiscoverDatabases {
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're intending to drop the autoDiscoverDatabases feature. IIRC It's misused by the queries.yaml to turn this exporter into a generic SQL exporter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to hear that. I was going to ask you about the usage of this flag because it is challenging to understand from the code. I'm not surprised that it causes issues for users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's a confusing mess. Part of why I haven't done much maintenance work on this exporter myself. 😹


servers[dsn] = s
}

return &PostgresCollector{
Collectors: collectors,
logger: logger,
servers: servers,
}, nil
p.servers = servers

return p, nil
}

func WithAutoDiscoverDatabases(discover bool) Option {
return func(p *PostgresCollector) error {
p.autoDiscoverDatabases = discover
return nil
}
}

// Describe implements the prometheus.Collector interface.
func (n PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeDurationDesc
ch <- scrapeSuccessDesc
}

// Collect implements the prometheus.Collector interface.
func (n PostgresCollector) Collect(ch chan<- prometheus.Metric) {
func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
ctx := context.TODO()
wg := sync.WaitGroup{}
wg.Add(len(n.servers))
for _, s := range n.servers {
wg.Add(len(p.servers))
for _, s := range p.servers {
go func(s *server) {
n.subCollect(ctx, s, ch)
p.subCollect(ctx, s, ch)
wg.Done()
}(s)
}
wg.Wait()
}

func (n PostgresCollector) subCollect(ctx context.Context, server *server, ch chan<- prometheus.Metric) {
func (p PostgresCollector) subCollect(ctx context.Context, server *server, ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(n.Collectors))
for name, c := range n.Collectors {
wg.Add(len(p.Collectors))
for name, c := range p.Collectors {
go func(name string, c Collector) {
execute(ctx, name, c, server, ch, n.logger)
execute(ctx, name, c, server, ch, p.logger)
wg.Done()
}(name, c)
}
Expand Down
212 changes: 212 additions & 0 deletions collector/pg_stat_bgwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector("bgwriter", defaultEnabled, NewPGStatBGWriterCollector)
}

type PGStatBGWriterCollector struct {
}

func NewPGStatBGWriterCollector(logger log.Logger) (Collector, error) {
return &PGStatBGWriterCollector{}, nil
}

const bgWriterSubsystem = "stat_bgwriter"

var statBGWriter = map[string]*prometheus.Desc{
"checkpoints_timed": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoints_timed_total"),
"Number of scheduled checkpoints that have been performed",
[]string{"server"},
prometheus.Labels{},
),
"checkpoints_req": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoints_req_total"),
"Number of requested checkpoints that have been performed",
[]string{"server"},
prometheus.Labels{},
),
"checkpoint_write_time": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoint_write_time_total"),
"Total amount of time that has been spent in the portion of checkpoint processing where files are written to disk, in milliseconds",
[]string{"server"},
prometheus.Labels{},
),
"checkpoint_sync_time": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoint_sync_time_total"),
"Total amount of time that has been spent in the portion of checkpoint processing where files are synchronized to disk, in milliseconds",
[]string{"server"},
prometheus.Labels{},
),
"buffers_checkpoint": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_checkpoint_total"),
"Number of buffers written during checkpoints",
[]string{"server"},
prometheus.Labels{},
),
"buffers_clean": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_clean_total"),
"Number of buffers written by the background writer",
[]string{"server"},
prometheus.Labels{},
),
"maxwritten_clean": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "maxwritten_clean_total"),
"Number of times the background writer stopped a cleaning scan because it had written too many buffers",
[]string{"server"},
prometheus.Labels{},
),
"buffers_backend": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_backend_total"),
"Number of buffers written directly by a backend",
[]string{"server"},
prometheus.Labels{},
),
"buffers_backend_fsync": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_backend_fsync_total"),
"Number of times a backend had to execute its own fsync call (normally the background writer handles those even when the backend does its own write)",
[]string{"server"},
prometheus.Labels{},
),
"buffers_alloc": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_alloc_total"),
"Number of buffers allocated",
[]string{"server"},
prometheus.Labels{},
),
"stats_reset": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "stats_reset_total"),
"Time at which these statistics were last reset",
[]string{"server"},
prometheus.Labels{},
),
}

func (PGStatBGWriterCollector) Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error {
db, err := server.GetDB()
if err != nil {
return err
}

row := db.QueryRowContext(ctx,
`SELECT
checkpoints_timed
,checkpoints_req
,checkpoint_write_time
,checkpoint_sync_time
,buffers_checkpoint
,buffers_clean
,maxwritten_clean
,buffers_backend
,buffers_backend_fsync
,buffers_alloc
,stats_reset
FROM pg_stat_bgwriter;`)

var cpt int
var cpr int
var cpwt int
var cpst int
var bcp int
var bc int
var mwc int
var bb int
var bbf int
var ba int
var sr time.Time

err = row.Scan(&cpt, &cpr, &cpwt, &cpst, &bcp, &bc, &mwc, &bb, &bbf, &ba, &sr)
if err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
statBGWriter["checkpoints_timed"],
prometheus.CounterValue,
float64(cpt),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["checkpoints_req"],
prometheus.CounterValue,
float64(cpr),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["checkpoint_write_time"],
prometheus.CounterValue,
float64(cpwt),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["checkpoint_sync_time"],
prometheus.CounterValue,
float64(cpst),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_checkpoint"],
prometheus.CounterValue,
float64(bcp),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_clean"],
prometheus.CounterValue,
float64(bc),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["maxwritten_clean"],
prometheus.CounterValue,
float64(mwc),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_backend"],
prometheus.CounterValue,
float64(bb),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_backend_fsync"],
prometheus.CounterValue,
float64(bbf),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_alloc"],
prometheus.CounterValue,
float64(ba),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["stats_reset"],
prometheus.CounterValue,
float64(sr.Unix()),
server.GetName(),
)

return nil
}
7 changes: 4 additions & 3 deletions collector/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
)

type server struct {
dsn string
name string
db *sql.DB
dsn string
name string
db *sql.DB
isPrimary bool // Certain queries are only run on the primary server
}

func makeServer(dsn string) (*server, error) {
Expand Down
Loading