Skip to content

Cleanup/add gitlab exporters #819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6323907
Add wal receiver
Sticksman Jun 16, 2023
8ee74db
Finish pg_stat_walreceiver
Sticksman Jun 20, 2023
0117653
Add pg_archiver
Sticksman Jun 20, 2023
eecb3ba
Missed a label declaration
Sticksman Jun 20, 2023
a6998af
Add stat_user_indexes
Sticksman Jun 20, 2023
dff617a
Add pg statio user queries and fix a test name
Sticksman Jun 22, 2023
4b3e95c
Migrate from db -> instance
Sticksman Jun 22, 2023
29736ce
pg_index_size query
Sticksman Jun 22, 2023
9198ec9
Add total relation size query
Sticksman Jun 22, 2023
c7dad9d
Add pg_blocked query
Sticksman Jun 23, 2023
fb04e3c
Add pg_blocked queries
Sticksman Jun 23, 2023
f76c2ce
Add pg_slow
Sticksman Jun 23, 2023
5d06f64
Add long running transactions query
Sticksman Jun 23, 2023
eb86b4c
Add stuck in transaction query + xid query + fix some init
Sticksman Jun 23, 2023
b235947
Add database wraparound query
Sticksman Jun 23, 2023
eac7650
Fix test name
Sticksman Jun 23, 2023
18ba5bb
xlog location
Sticksman Jun 23, 2023
bb5dba8
Add pg_stat_activity_marginalia sampler
Sticksman Jun 23, 2023
8bbd6d4
Add pg stat activity autovacuum
Sticksman Jun 23, 2023
82a51fe
Add autovacuum active
Sticksman Jun 23, 2023
c9f83ae
Long running transactions marginalia
Sticksman Jun 23, 2023
9b4d845
Lint fixes
Sticksman Jun 23, 2023
49e1a46
Lint
Sticksman Jun 23, 2023
d822ec4
Fix the NaN tests
Sticksman Jun 23, 2023
7651eed
Remove broken tests for now
Sticksman Jun 23, 2023
534af41
Lint
Sticksman Jun 23, 2023
8d6499f
Disable all new queries by default, rename marginalia -> summary
Sticksman Jun 23, 2023
4891644
Update pg_blocked with nulls
Sticksman Jun 26, 2023
4adddb3
Db wraparound test nullable
Sticksman Jun 26, 2023
8b7ffa6
index size nullable
Sticksman Jun 26, 2023
532c04f
LongRunningTransactionsSUmmary nullable
Sticksman Jun 26, 2023
f839778
stat user indexes nullable
Sticksman Jun 26, 2023
fddba48
stat walreceiver nil
Sticksman Jun 26, 2023
6513418
statiouser_indexes nullable
Sticksman Jun 26, 2023
aa910f3
total relation size nullable
Sticksman Jun 26, 2023
2d33e61
Remove linebreak
Sticksman Jun 26, 2023
55034b3
Use labels pattern
Sticksman Jun 26, 2023
57ea9d8
stat activity summary nullable
Sticksman Jun 26, 2023
3718e52
Redo all change requests
Sticksman Jun 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
@@ -49,8 +49,12 @@ func readMetric(m prometheus.Metric) MetricResult {
func sanitizeQuery(q string) string {
q = strings.Join(strings.Fields(q), " ")
q = strings.Replace(q, "(", "\\(", -1)
q = strings.Replace(q, "?", "\\?", -1)
q = strings.Replace(q, ")", "\\)", -1)
q = strings.Replace(q, "[", "\\[", -1)
q = strings.Replace(q, "]", "\\]", -1)
q = strings.Replace(q, "*", "\\*", -1)
q = strings.Replace(q, "^", "\\^", -1)
q = strings.Replace(q, "$", "\\$", -1)
return q
}
81 changes: 81 additions & 0 deletions collector/pg_archiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector("archiver", defaultDisabled, NewPGArchiverCollector)
}

type PGArchiverCollector struct {
log log.Logger
}

const archiverSubsystem = "archiver"

func NewPGArchiverCollector(config collectorConfig) (Collector, error) {
return &PGArchiverCollector{log: config.logger}, nil
}

var (
pgArchiverPendingWalCount = prometheus.NewDesc(
prometheus.BuildFQName(namespace, archiverSubsystem, "pending_wals"),
"Number of WAL files waiting to be archived",
[]string{}, prometheus.Labels{},
)

pgArchiverQuery = `
WITH
current_wal_file AS (
SELECT CASE WHEN NOT pg_is_in_recovery() THEN pg_walfile_name(pg_current_wal_insert_lsn()) ELSE NULL END pg_walfile_name
),
current_wal AS (
SELECT
('x'||substring(pg_walfile_name,9,8))::bit(32)::int log,
('x'||substring(pg_walfile_name,17,8))::bit(32)::int seg,
pg_walfile_name
FROM current_wal_file
),
archive_wal AS(
SELECT
('x'||substring(last_archived_wal,9,8))::bit(32)::int log,
('x'||substring(last_archived_wal,17,8))::bit(32)::int seg,
last_archived_wal
FROM pg_stat_archiver
)
SELECT coalesce(((cw.log - aw.log) * 256) + (cw.seg-aw.seg),'NaN'::float) as pending_wal_count FROM current_wal cw, archive_wal aw
`
)

func (c *PGArchiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx,
pgArchiverQuery)
var pendingWalCount float64
err := row.Scan(&pendingWalCount)
if err != nil {
return err
}
ch <- prometheus.MustNewConstMetric(
pgArchiverPendingWalCount,
prometheus.GaugeValue,
pendingWalCount,
)
return nil
}
96 changes: 96 additions & 0 deletions collector/pg_archiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"math"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPgArchiverCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}
mock.ExpectQuery(sanitizeQuery(pgArchiverQuery)).WillReturnRows(sqlmock.NewRows([]string{"pending_wal_count"}).
AddRow(5))

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGArchiverCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGArchiverCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{}, value: 5, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgArchiverNaNCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}
mock.ExpectQuery(sanitizeQuery(pgArchiverQuery)).WillReturnRows(sqlmock.NewRows([]string{"pending_wal_count"}).
AddRow(math.NaN()))

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGArchiverCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGArchiverCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{}, value: math.NaN(), metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect.labels, convey.ShouldResemble, m.labels)
convey.So(math.IsNaN(m.value), convey.ShouldResemble, math.IsNaN(expect.value))
convey.So(expect.metricType, convey.ShouldEqual, m.metricType)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
101 changes: 101 additions & 0 deletions collector/pg_blocked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const blockedSubsystem = "blocked"

func init() {
registerCollector(blockedSubsystem, defaultDisabled, NewPGBlockedCollector)
}

type PGBlockedCollector struct {
log log.Logger
}

func NewPGBlockedCollector(config collectorConfig) (Collector, error) {
return &PGBlockedCollector{log: config.logger}, nil
}

var (
blockedQueries = prometheus.NewDesc(
prometheus.BuildFQName(namespace, blockedSubsystem, "queries"),
"The current number of blocked queries",
[]string{"table"},
prometheus.Labels{},
)

blockedQuery = `
SELECT
count(blocked.transactionid) AS queries,
'__transaction__' AS table
FROM pg_catalog.pg_locks blocked
WHERE NOT blocked.granted AND locktype = 'transactionid'
GROUP BY locktype
UNION
SELECT
count(blocked.relation) AS queries,
blocked.relation::regclass::text AS table
FROM pg_catalog.pg_locks blocked
WHERE NOT blocked.granted AND locktype != 'transactionid'
GROUP BY relation
`
)

func (PGBlockedCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
blockedQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var table sql.NullString
var queries sql.NullFloat64

if err := rows.Scan(&queries, &table); err != nil {
return err
}

tableLabel := "unknown"
if table.Valid {
tableLabel = table.String
}

queriesMetric := 0.0
if queries.Valid {
queriesMetric = queries.Float64
}
ch <- prometheus.MustNewConstMetric(
blockedQueries,
prometheus.GaugeValue,
queriesMetric,
tableLabel,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
101 changes: 101 additions & 0 deletions collector/pg_blocked_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPgBlockedCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"queries",
"table",
}
rows := sqlmock.NewRows(columns).
AddRow(1000, "pgbouncer")

mock.ExpectQuery(sanitizeQuery(blockedQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGBlockedCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGBlockedCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"table": "pgbouncer"}, value: 1000, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgBlockedCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"queries",
"table",
}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil)

mock.ExpectQuery(sanitizeQuery(blockedQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGBlockedCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGBlockedCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"table": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
112 changes: 112 additions & 0 deletions collector/pg_database_wraparound.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const databaseWraparoundSubsystem = "database_wraparound"

func init() {
registerCollector(databaseWraparoundSubsystem, defaultDisabled, NewPGDatabaseWraparoundCollector)
}

type PGDatabaseWraparoundCollector struct {
log log.Logger
}

func NewPGDatabaseWraparoundCollector(config collectorConfig) (Collector, error) {
return &PGDatabaseWraparoundCollector{log: config.logger}, nil
}

var (
databaseWraparoundAgeDatfrozenxid = prometheus.NewDesc(
prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datfrozenxid_seconds"),
"Age of the oldest transaction ID that has not been frozen.",
[]string{"datname"},
prometheus.Labels{},
)
databaseWraparoundAgeDatminmxid = prometheus.NewDesc(
prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datminmxid"),
"Age of the oldest multi-transaction ID that has been replaced with a transaction ID.",
[]string{"datname"},
prometheus.Labels{},
)

databaseWraparoundQuery = `
SELECT
datname,
age(d.datfrozenxid) as age_datfrozenxid,
mxid_age(d.datminmxid) as age_datminmxid
FROM
pg_catalog.pg_database d
WHERE
d.datallowconn
`
)

func (PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
databaseWraparoundQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var datname sql.NullString
var ageDatfrozenxid, ageDatminmxid sql.NullFloat64

if err := rows.Scan(&datname, &ageDatfrozenxid, &ageDatminmxid); err != nil {
return err
}

datnameLabel := "unknown"
if datname.Valid {
datnameLabel = datname.String
}

ageDatfrozenxidMetric := 0.0
if ageDatfrozenxid.Valid {
ageDatfrozenxidMetric = ageDatfrozenxid.Float64
}

ch <- prometheus.MustNewConstMetric(
databaseWraparoundAgeDatfrozenxid,
prometheus.GaugeValue,
ageDatfrozenxidMetric, datnameLabel,
)

ageDatminmxidMetric := 0.0
if ageDatminmxid.Valid {
ageDatminmxidMetric = ageDatminmxid.Float64
}
ch <- prometheus.MustNewConstMetric(
databaseWraparoundAgeDatminmxid,
prometheus.GaugeValue,
ageDatminmxidMetric, datnameLabel,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
105 changes: 105 additions & 0 deletions collector/pg_database_wraparound_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPGDatabaseWraparoundCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datname",
"age_datfrozenxid",
"age_datminmxid",
}
rows := sqlmock.NewRows(columns).
AddRow("newreddit", 87126426, 0)

mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGDatabaseWraparoundCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "newreddit"}, value: 87126426, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"datname": "newreddit"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPGDatabaseWraparoundCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datname",
"age_datfrozenxid",
"age_datminmxid",
}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil)

mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGDatabaseWraparoundCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"datname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
106 changes: 106 additions & 0 deletions collector/pg_index_size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const indexSizeSubsystem = "index_size"

func init() {
registerCollector(indexSizeSubsystem, defaultDisabled, NewPGIndexSizeCollector)
}

type PGIndexSizeCollector struct {
log log.Logger
}

func NewPGIndexSizeCollector(config collectorConfig) (Collector, error) {
return &PGIndexSizeCollector{log: config.logger}, nil
}

var (
indexSizeDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, indexSizeSubsystem, "bytes"),
"Size of the index as per pg_table_size function",
[]string{"schemaname", "relname", "indexrelname"},
prometheus.Labels{},
)

indexSizeQuery = `
SELECT
schemaname,
tablename as relname,
indexname as indexrelname,
pg_class.relpages * 8192::bigint as index_size
FROM
pg_indexes inner join pg_namespace on pg_indexes.schemaname = pg_namespace.nspname
inner join pg_class on pg_class.relnamespace = pg_namespace.oid and pg_class.relname = pg_indexes.indexname
WHERE
pg_indexes.schemaname != 'pg_catalog'
`
)

func (PGIndexSizeCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
indexSizeQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var schemaname, relname, indexrelname sql.NullString
var indexSize sql.NullFloat64

if err := rows.Scan(&schemaname, &relname, &indexrelname, &indexSize); err != nil {
return err
}
schemanameLabel := "unknown"
if schemaname.Valid {
schemanameLabel = schemaname.String
}
relnameLabel := "unknown"
if relname.Valid {
relnameLabel = relname.String
}
indexrelnameLabel := "unknown"
if indexrelname.Valid {
indexrelnameLabel = indexrelname.String
}
labels := []string{schemanameLabel, relnameLabel, indexrelnameLabel}

indexSizeMetric := 0.0
if indexSize.Valid {
indexSizeMetric = indexSize.Float64
}
ch <- prometheus.MustNewConstMetric(
indexSizeDesc,
prometheus.GaugeValue,
indexSizeMetric,
labels...,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
105 changes: 105 additions & 0 deletions collector/pg_index_size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPgIndexSizeCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"indexrelname",
"index_size",
}
rows := sqlmock.NewRows(columns).
AddRow("public", "foo", "foo_key", 100)

mock.ExpectQuery(sanitizeQuery(indexSizeQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGIndexSizeCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGIndexSizeCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "public", "relname": "foo", "indexrelname": "foo_key"}, value: 100, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgIndexSizeCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"indexrelname",
"index_size",
}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil)

mock.ExpectQuery(sanitizeQuery(indexSizeQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGIndexSizeCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGIndexSizeCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
93 changes: 93 additions & 0 deletions collector/pg_long_running_transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const longRunningTransactionsSubsystem = "long_running_transactions"

func init() {
registerCollector(longRunningTransactionsSubsystem, defaultDisabled, NewPGLongRunningTransactionsCollector)
}

type PGLongRunningTransactionsCollector struct {
log log.Logger
}

func NewPGLongRunningTransactionsCollector(config collectorConfig) (Collector, error) {
return &PGLongRunningTransactionsCollector{log: config.logger}, nil
}

var (
longRunningTransactionsCount = prometheus.NewDesc(
"pg_long_running_transactions",
"Current number of long running transactions",
[]string{},
prometheus.Labels{},
)

longRunningTransactionsAgeInSeconds = prometheus.NewDesc(
prometheus.BuildFQName(namespace, longRunningTransactionsSubsystem, "age_in_seconds"),
"The current maximum transaction age in seconds",
[]string{},
prometheus.Labels{},
)

longRunningTransactionsQuery = `
SELECT
COUNT(*) as transactions,
MAX(EXTRACT(EPOCH FROM (clock_timestamp() - xact_start))) AS age_in_seconds
FROM pg_catalog.pg_stat_activity
WHERE state is distinct from 'idle' AND (now() - xact_start) > '1 minutes'::interval AND query not like 'autovacuum:%'
`
)

func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
longRunningTransactionsQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var transactions, ageInSeconds float64

if err := rows.Scan(&transactions, &ageInSeconds); err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
longRunningTransactionsCount,
prometheus.GaugeValue,
transactions,
)
ch <- prometheus.MustNewConstMetric(
longRunningTransactionsAgeInSeconds,
prometheus.GaugeValue,
ageInSeconds,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
109 changes: 109 additions & 0 deletions collector/pg_long_running_transactions_summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const longRunningTransactionsSummarySubsystem = "long_running_transactions_summary"

func init() {
registerCollector(longRunningTransactionsSummarySubsystem, defaultDisabled, NewPGLongRunningTransactionsSummaryCollector)
}

type PGLongRunningTransactionsSummaryCollector struct {
log log.Logger
}

func NewPGLongRunningTransactionsSummaryCollector(config collectorConfig) (Collector, error) {
return &PGLongRunningTransactionsSummaryCollector{log: config.logger}, nil
}

var (
longRunningTransactionsSummaryMaxAgeInSeconds = prometheus.NewDesc(
prometheus.BuildFQName(namespace, longRunningTransactionsSummarySubsystem, "max_age_in_seconds"),
"The current maximum transaction age in seconds",
[]string{"application", "endpoint"},
prometheus.Labels{},
)

longRunningTransactionsSummaryQuery = `
SELECT
activity.matches[1] AS application,
activity.matches[2] AS endpoint,
MAX(age_in_seconds) AS max_age_in_seconds
FROM (
SELECT
regexp_matches(query, '^\s*(?:\/\*(?:application:(\w+),?)?(?:correlation_id:\w+,?)?(?:jid:\w+,?)?(?:endpoint_id:([\w/\-\.:\#\s]+),?)?.*?\*\/)?\s*(\w+)') AS matches,
EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds
FROM
pg_catalog.pg_stat_activity
WHERE state <> 'idle'
AND (clock_timestamp() - xact_start) > '30 seconds'::interval
AND query NOT LIKE 'autovacuum:%'
) activity
GROUP BY application, endpoint
ORDER BY max_age_in_seconds DESC
`
)

func (PGLongRunningTransactionsSummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
longRunningTransactionsSummaryQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var application, endpoint sql.NullString
var maxAgeInSeconds sql.NullFloat64

if err := rows.Scan(&application, &endpoint, &maxAgeInSeconds); err != nil {
return err
}

applicationLabel := "unknown"
if application.Valid {
applicationLabel = application.String
}
endpointLabel := "unknown"
if endpoint.Valid {
endpointLabel = endpoint.String
}
labels := []string{applicationLabel, endpointLabel}

maxAgeInSecondsMetric := 0.0
if maxAgeInSeconds.Valid {
maxAgeInSecondsMetric = maxAgeInSeconds.Float64
}
ch <- prometheus.MustNewConstMetric(
longRunningTransactionsSummaryMaxAgeInSeconds,
prometheus.GaugeValue,
maxAgeInSecondsMetric,
labels...,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
63 changes: 63 additions & 0 deletions collector/pg_long_running_transactions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPGLongRunningTransactionsCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"transactions",
"age_in_seconds",
}
rows := sqlmock.NewRows(columns).
AddRow(20, 1200)

mock.ExpectQuery(sanitizeQuery(longRunningTransactionsQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGLongRunningTransactionsCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGLongRunningTransactionsCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, value: 20, metricType: dto.MetricType_GAUGE},
{labels: labelMap{}, value: 1200, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
83 changes: 83 additions & 0 deletions collector/pg_oldest_blocked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const oldestBlockedSubsystem = "oldest_blocked"

func init() {
registerCollector(oldestBlockedSubsystem, defaultDisabled, NewPGOldestBlockedCollector)
}

type PGOldestBlockedCollector struct {
log log.Logger
}

func NewPGOldestBlockedCollector(config collectorConfig) (Collector, error) {
return &PGOldestBlockedCollector{log: config.logger}, nil
}

var (
oldestBlockedAgeSeconds = prometheus.NewDesc(
prometheus.BuildFQName(namespace, oldestBlockedSubsystem, "age_seconds"),
"Largest number of seconds any transaction is currently waiting on a lock",
[]string{},
prometheus.Labels{},
)

oldestBlockedQuery = `
SELECT
coalesce(extract('epoch' from max(clock_timestamp() - state_change)), 0) age_seconds
FROM
pg_catalog.pg_stat_activity
WHERE
wait_event_type = 'Lock'
AND state='active'
`
)

func (PGOldestBlockedCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
oldestBlockedQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var ageSeconds float64

if err := rows.Scan(&ageSeconds); err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
oldestBlockedAgeSeconds,
prometheus.GaugeValue,
ageSeconds,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
61 changes: 61 additions & 0 deletions collector/pg_oldest_blocked_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPgOldestBlockedCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"age_seconds",
}
rows := sqlmock.NewRows(columns).
AddRow(100000)

mock.ExpectQuery(sanitizeQuery(oldestBlockedQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGOldestBlockedCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGOldestBlockedCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, value: 100000, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
1 change: 1 addition & 0 deletions collector/pg_replication_slot_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
)

func TestPgReplicationSlotCollectorActive(t *testing.T) {

db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
82 changes: 82 additions & 0 deletions collector/pg_slow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const slowSubsystem = "slow"

func init() {
registerCollector(slowSubsystem, defaultDisabled, NewPGSlowCollector)
}

type PGSlowCollector struct {
log log.Logger
}

func NewPGSlowCollector(config collectorConfig) (Collector, error) {
return &PGSlowCollector{log: config.logger}, nil
}

var (
slowQueries = prometheus.NewDesc(
prometheus.BuildFQName(namespace, slowSubsystem, "queries"),
"Current number of slow queries",
[]string{},
prometheus.Labels{},
)

slowQuery = `
SELECT
COUNT(*) AS queries
FROM
pg_catalog.pg_stat_activity
WHERE
state = 'active' AND (now() - query_start) > '1 seconds'::interval
`
)

func (PGSlowCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
slowQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var queries float64

if err := rows.Scan(&queries); err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
slowQueries,
prometheus.GaugeValue,
queries,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
61 changes: 61 additions & 0 deletions collector/pg_slow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPGSlowCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"queries",
}
rows := sqlmock.NewRows(columns).
AddRow(25)

mock.ExpectQuery(sanitizeQuery(slowQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGSlowCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGSlowCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, value: 25, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
85 changes: 85 additions & 0 deletions collector/pg_stat_activity_autovacuum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const statActivityAutovacuumSubsystem = "stat_activity_autovacuum"

func init() {
registerCollector(statActivityAutovacuumSubsystem, defaultDisabled, NewPGStatActivityAutovacuumCollector)
}

type PGStatActivityAutovacuumCollector struct {
log log.Logger
}

func NewPGStatActivityAutovacuumCollector(config collectorConfig) (Collector, error) {
return &PGStatActivityAutovacuumCollector{log: config.logger}, nil
}

var (
statActivityAutovacuumAgeInSeconds = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statActivityAutovacuumSubsystem, "age_in_seconds"),
"The age of the vacuum process in seconds",
[]string{"relname"},
prometheus.Labels{},
)

statActivityAutovacuumQuery = `
SELECT
SPLIT_PART(query, '.', 2) AS relname,
EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds
FROM
pg_catalog.pg_stat_activity
WHERE
query like 'autovacuum:%' AND
EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) > 300
`
)

func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statActivityAutovacuumQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var relname string
var ageInSeconds float64

if err := rows.Scan(&relname, &ageInSeconds); err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
statActivityAutovacuumAgeInSeconds,
prometheus.GaugeValue,
ageInSeconds, relname,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
91 changes: 91 additions & 0 deletions collector/pg_stat_activity_autovacuum_active.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const statActivityAutovacuumActiveSubsystem = "stat_activity_autovacuum_active"

func init() {
registerCollector(statActivityAutovacuumActiveSubsystem, defaultDisabled, NewPGStatActivityAutovacuumActiveCollector)
}

type PGStatActivityAutovacuumActiveCollector struct {
log log.Logger
}

func NewPGStatActivityAutovacuumActiveCollector(config collectorConfig) (Collector, error) {
return &PGStatActivityAutovacuumActiveCollector{log: config.logger}, nil
}

var (
statActivityAutovacuumActiveWorkersCount = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statActivityAutovacuumActiveSubsystem, "workers"),
"Current number of statActivityAutovacuumActive queries",
[]string{"phase", "mode"},
prometheus.Labels{},
)

statActivityAutovacuumActiveQuery = `
SELECT
v.phase,
CASE
when a.query ~ '^autovacuum.*to prevent wraparound' then 'wraparound'
when a.query ~* '^vacuum' then 'user'
when a.pid is null then 'idle'
ELSE 'regular'
END as mode,
count(1) as workers_count
FROM pg_stat_progress_vacuum v
LEFT JOIN pg_catalog.pg_stat_activity a using (pid)
GROUP BY 1,2
`
)

func (PGStatActivityAutovacuumActiveCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statActivityAutovacuumActiveQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var phase, mode string
var workersCount float64

if err := rows.Scan(&phase, &mode, &workersCount); err != nil {
return err
}
labels := []string{phase, mode}

ch <- prometheus.MustNewConstMetric(
statActivityAutovacuumActiveWorkersCount,
prometheus.GaugeValue,
workersCount,
labels...,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
62 changes: 62 additions & 0 deletions collector/pg_stat_activity_autovacuum_active_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPGStatActivityAutovacuumActiveCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"phase",
"mode",
"workers_count",
}
rows := sqlmock.NewRows(columns).
AddRow("Scanning heap", "regular", 2)
mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumActiveQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatActivityAutovacuumActiveCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatActivityAutovacuumActiveCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"phase": "Scanning heap", "mode": "regular"}, value: 2, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
62 changes: 62 additions & 0 deletions collector/pg_stat_activity_autovacuum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPGStatActivityAutovacuumCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"relname",
"timestamp_seconds",
}
rows := sqlmock.NewRows(columns).
AddRow("test", 3600)

mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatActivityAutovacuumCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatActivityAutovacuumCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"relname": "test"}, value: 3600, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
152 changes: 152 additions & 0 deletions collector/pg_stat_activity_summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const statActivitySummarySubsystem = "stat_activity_summary"

func init() {
registerCollector(statActivitySummarySubsystem, defaultEnabled, NewPGStatActivitySummaryCollector)
}

type PGStatActivitySummaryCollector struct {
log log.Logger
}

func NewPGStatActivitySummaryCollector(config collectorConfig) (Collector, error) {
return &PGStatActivitySummaryCollector{log: config.logger}, nil
}

var (
statActivitySummaryActiveCount = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statActivitySummarySubsystem, "active_count"),
"Number of active queries at time of sample",
[]string{"usename", "application", "endpoint", "command", "state", "wait_event", "wait_event_type"},
prometheus.Labels{},
)
statActivitySummaryMaxTxAgeInSeconds = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statActivitySummarySubsystem, "max_tx_age_in_seconds"),
"Number of active queries at time of sample",
[]string{"usename", "application", "endpoint", "command", "state", "wait_event", "wait_event_type"},
prometheus.Labels{},
)

statActivitySummaryQuery = `
SELECT
usename AS usename,
a.matches[1] AS application,
a.matches[2] AS endpoint,
a.matches[3] AS command,
a.state AS state,
a.wait_event AS wait_event,
a.wait_event_type AS wait_event_type,
COUNT(*) active_count,
MAX(age_in_seconds) AS max_tx_age_in_seconds
FROM (
SELECT
usename,
regexp_matches(query, '^\s*(?:\/\*(?:application:(\w+),?)?(?:correlation_id:\w+,?)?(?:jid:\w+,?)?(?:endpoint_id:([\w/\-\.:\#\s]+),?)?.*?\*\/)?\s*(\w+)') AS matches,
state,
wait_event,
wait_event_type,
EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds
FROM
pg_catalog.pg_stat_activity
) a
GROUP BY usename, application, endpoint, command, state, wait_event, wait_event_type
ORDER BY active_count DESC
`
)

func (PGStatActivitySummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statActivitySummaryQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var usename, application, endpoint, command, state, waitEvent, waitEventType sql.NullString
var count, maxTxAge sql.NullFloat64

if err := rows.Scan(&usename, &application, &endpoint, &command, &state, &waitEvent, &waitEventType, &count, &maxTxAge); err != nil {
return err
}
usenameLabel := "unknown"
if usename.Valid {
usenameLabel = usename.String
}
applicationLabel := "unknown"
if application.Valid {
applicationLabel = application.String
}
endpointLabel := "unknown"
if endpoint.Valid {
endpointLabel = endpoint.String
}
commandLabel := "unknown"
if command.Valid {
commandLabel = command.String
}
stateLabel := "unknown"
if state.Valid {
stateLabel = state.String
}
waitEventLabel := "unknown"
if waitEvent.Valid {
waitEventLabel = waitEvent.String
}
waitEventTypeLabel := "unknown"
if waitEventType.Valid {
waitEventTypeLabel = waitEventType.String
}
labels := []string{usenameLabel, applicationLabel, endpointLabel, commandLabel, stateLabel, waitEventLabel, waitEventTypeLabel}

countMetric := 0.0
if count.Valid {
countMetric = count.Float64
}
ch <- prometheus.MustNewConstMetric(
statActivitySummaryActiveCount,
prometheus.GaugeValue,
countMetric,
labels...,
)

maxTxAgeMetric := 0.0
if maxTxAge.Valid {
maxTxAgeMetric = maxTxAge.Float64
}
ch <- prometheus.MustNewConstMetric(
statActivitySummaryMaxTxAgeInSeconds,
prometheus.GaugeValue,
maxTxAgeMetric,
labels...,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
136 changes: 136 additions & 0 deletions collector/pg_stat_user_indexes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector(statUserIndexesSubsystem, defaultDisabled, NewPGStatUserIndexesCollector)
}

type PGStatUserIndexesCollector struct {
log log.Logger
}

const statUserIndexesSubsystem = "stat_user_indexes"

func NewPGStatUserIndexesCollector(config collectorConfig) (Collector, error) {
return &PGStatUserIndexesCollector{log: config.logger}, nil
}

var (
statUserIndexesIdxScan = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statUserIndexesSubsystem, "idx_scans_total"),
"Number of index scans initiated on this index",
[]string{"schemaname", "relname", "indexrelname"},
prometheus.Labels{},
)
statUserIndexesIdxTupRead = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statUserIndexesSubsystem, "idx_tup_reads_total"),
"Number of index entries returned by scans on this index",
[]string{"schemaname", "relname", "indexrelname"},
prometheus.Labels{},
)
statUserIndexesIdxTupFetch = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statUserIndexesSubsystem, "idx_tup_fetches_total"),
"Number of live table rows fetched by simple index scans using this index",
[]string{"schemaname", "relname", "indexrelname"},
prometheus.Labels{},
)

statUserIndexesQuery = `
SELECT
schemaname,
relname,
indexrelname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
`
)

func (c *PGStatUserIndexesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statUserIndexesQuery)

if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var schemaname, relname, indexrelname sql.NullString
var idxScan, idxTupRead, idxTupFetch sql.NullFloat64

if err := rows.Scan(&schemaname, &relname, &indexrelname, &idxScan, &idxTupRead, &idxTupFetch); err != nil {
return err
}
schemanameLabel := "unknown"
if schemaname.Valid {
schemanameLabel = schemaname.String
}
relnameLabel := "unknown"
if relname.Valid {
relnameLabel = relname.String
}
indexrelnameLabel := "unknown"
if indexrelname.Valid {
indexrelnameLabel = indexrelname.String
}
labels := []string{schemanameLabel, relnameLabel, indexrelnameLabel}

idxScanMetric := 0.0
if idxScan.Valid {
idxScanMetric = idxScan.Float64
}
ch <- prometheus.MustNewConstMetric(
statUserIndexesIdxScan,
prometheus.CounterValue,
idxScanMetric,
labels...,
)

idxTupReadMetric := 0.0
if idxTupRead.Valid {
idxTupReadMetric = idxTupRead.Float64
}
ch <- prometheus.MustNewConstMetric(
statUserIndexesIdxTupRead,
prometheus.CounterValue,
idxTupReadMetric,
labels...,
)

idxTupFetchMetric := 0.0
if idxTupFetch.Valid {
idxTupFetchMetric = idxTupFetch.Float64
}
ch <- prometheus.MustNewConstMetric(
statUserIndexesIdxTupFetch,
prometheus.CounterValue,
idxTupFetchMetric,
labels...,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
113 changes: 113 additions & 0 deletions collector/pg_stat_user_indexes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPgStatUserIndexesCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"indexrelname",
"idx_scan",
"idx_tup_read",
"idx_tup_fetch",
}
rows := sqlmock.NewRows(columns).
AddRow("public", "pgbench_accounts", "pgbench_accounts_pkey", 5, 6, 7)

mock.ExpectQuery(sanitizeQuery(statUserIndexesQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatUserIndexesCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatUserIndexesCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "public", "relname": "pgbench_accounts", "indexrelname": "pgbench_accounts_pkey"}, value: 5, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schemaname": "public", "relname": "pgbench_accounts", "indexrelname": "pgbench_accounts_pkey"}, value: 6, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schemaname": "public", "relname": "pgbench_accounts", "indexrelname": "pgbench_accounts_pkey"}, value: 7, metricType: dto.MetricType_COUNTER},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgStatUserIndexesCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"indexrelname",
"idx_scan",
"idx_tup_read",
"idx_tup_fetch",
}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil, nil)

mock.ExpectQuery(sanitizeQuery(statUserIndexesQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatUserIndexesCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatUserIndexesCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
306 changes: 306 additions & 0 deletions collector/pg_stat_walreceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector)
}

type PGStatWalReceiverCollector struct {
log log.Logger
}

const statWalReceiverSubsystem = "stat_wal_receiver"

func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) {
return &PGStatWalReceiverCollector{log: config.logger}, nil
}

var (
statWalReceiverStatus = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "status"),
"Activity status of the WAL receiver process",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverReceiveStartLsn = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"),
"First write-ahead log location used when WAL receiver is started represented as a decimal",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverReceiveStartTli = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"),
"First timeline number used when WAL receiver is started",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverFlushedLSN = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"),
"Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverReceivedTli = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"),
"Timeline number of last write-ahead log location received and flushed to disk",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverLastMsgSendTime = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"),
"Send time of last message received from origin WAL sender",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverLastMsgReceiptTime = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"),
"Send time of last message received from origin WAL sender",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverLatestEndLsn = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"),
"Last write-ahead log location reported to origin WAL sender as integer",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverLatestEndTime = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"),
"Time of last write-ahead log location reported to origin WAL sender",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)
statWalReceiverUpstreamNode = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"),
"Node ID of the upstream node",
[]string{"upstream_host", "slot_name"},
prometheus.Labels{},
)

pgStatWalColumnQuery = `
SELECT
column_name
FROM information_schema.columns
WHERE
table_name = 'pg_stat_wal_receiver' and
column_name = 'flushed_lsn'
`

pgStatWalReceiverQueryWithNoFlushedLSN = `
SELECT
trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host,
slot_name,
case status
when 'stopped' then 0
when 'starting' then 1
when 'streaming' then 2
when 'waiting' then 3
when 'restarting' then 4
when 'stopping' then 5 else -1
end as status,
(receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn,
receive_start_tli,
received_tli,
extract(epoch from last_msg_send_time) as last_msg_send_time,
extract(epoch from last_msg_receipt_time) as last_msg_receipt_time,
(latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn,
extract(epoch from latest_end_time) as latest_end_time,
substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node
FROM pg_catalog.pg_stat_wal_receiver
`

pgStatWalReceiverQueryWithFlushedLSN = `
SELECT
trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host,
slot_name,
case status
when 'stopped' then 0
when 'starting' then 1
when 'streaming' then 2
when 'waiting' then 3
when 'restarting' then 4
when 'stopping' then 5 else -1
end as status,
(receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn,
receive_start_tli,
(flushed_lsn- '0/0') % (2^52)::bigint as flushed_lsn,
received_tli,
extract(epoch from last_msg_send_time) as last_msg_send_time,
extract(epoch from last_msg_receipt_time) as last_msg_receipt_time,
(latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn,
extract(epoch from latest_end_time) as latest_end_time,
substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node
FROM pg_catalog.pg_stat_wal_receiver
`
)

func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery)
if err != nil {
return err
}

defer hasFlushedLSNRows.Close()
hasFlushedLSN := hasFlushedLSNRows.Next()
var query string
if hasFlushedLSN {
query = pgStatWalReceiverQueryWithFlushedLSN
} else {
query = pgStatWalReceiverQueryWithNoFlushedLSN
}
rows, err := db.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var upstreamHost, slotName sql.NullString
var status, receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64
var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64

if hasFlushedLSN {
if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil {
return err
}
} else {
if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil {
return err
}
}
upstreamHostLabel := "unknown"
if upstreamHost.Valid {
upstreamHostLabel = upstreamHost.String
}
slotNameLabel := "unknown"
if slotName.Valid {
slotNameLabel = slotName.String
}
labels := []string{upstreamHostLabel, slotNameLabel}

statusMetric := 0.0
if status.Valid {
statusMetric = float64(status.Int64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverStatus,
prometheus.GaugeValue,
statusMetric,
labels...)

receiveStartLsnMetric := 0.0
if receiveStartLsn.Valid {
receiveStartLsnMetric = float64(receiveStartLsn.Int64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverReceiveStartLsn,
prometheus.CounterValue,
receiveStartLsnMetric,
labels...)

receiveStartTliMetric := 0.0
if receiveStartTli.Valid {
receiveStartTliMetric = float64(receiveStartTli.Int64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverReceiveStartTli,
prometheus.GaugeValue,
receiveStartTliMetric,
labels...)

if hasFlushedLSN {
flushedLsnMetric := 0.0
if flushedLsn.Valid {
flushedLsnMetric = float64(flushedLsn.Int64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverFlushedLSN,
prometheus.CounterValue,
flushedLsnMetric,
labels...)
}

receivedTliMetric := 0.0
if receivedTli.Valid {
receivedTliMetric = float64(receivedTli.Int64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverReceivedTli,
prometheus.GaugeValue,
receivedTliMetric,
labels...)

lastMsgSendTimeMetric := 0.0
if lastMsgSendTime.Valid {
lastMsgSendTimeMetric = float64(lastMsgSendTime.Float64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverLastMsgSendTime,
prometheus.CounterValue,
lastMsgSendTimeMetric,
labels...)

lastMsgReceiptTimeMetric := 0.0
if lastMsgReceiptTime.Valid {
lastMsgReceiptTimeMetric = float64(lastMsgReceiptTime.Float64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverLastMsgReceiptTime,
prometheus.CounterValue,
lastMsgReceiptTimeMetric,
labels...)

latestEndLsnMetric := 0.0
if latestEndLsn.Valid {
latestEndLsnMetric = float64(latestEndLsn.Int64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverLatestEndLsn,
prometheus.CounterValue,
latestEndLsnMetric,
labels...)

latestEndTimeMetric := 0.0
if latestEndTime.Valid {
latestEndTimeMetric = float64(latestEndTime.Float64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverLatestEndTime,
prometheus.CounterValue,
latestEndTimeMetric,
labels...)

upstreamNodeMetric := 0.0
if upstreamNode.Valid {
upstreamNodeMetric = float64(upstreamNode.Int64)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverUpstreamNode,
prometheus.GaugeValue,
upstreamNodeMetric,
labels...)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
266 changes: 266 additions & 0 deletions collector/pg_stat_walreceiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}
infoSchemaColumns := []string{
"column_name",
}

infoSchemaRows := sqlmock.NewRows(infoSchemaColumns).
AddRow(
"flushed_lsn",
)

mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows)

columns := []string{
"upstream_host",
"slot_name",
"status",
"receive_start_lsn",
"receive_start_tli",
"flushed_lsn",
"received_tli",
"last_msg_send_time",
"last_msg_receipt_time",
"latest_end_lsn",
"latest_end_time",
"upstream_node",
}
rows := sqlmock.NewRows(columns).
AddRow(
"foo",
"bar",
2,
1200668684563608,
1687321285,
1200668684563609,
1687321280,
1687321275,
1687321276,
1200668684563610,
1687321277,
5,
)
mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatWalReceiverCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}

}

func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}
infoSchemaColumns := []string{
"column_name",
}

infoSchemaRows := sqlmock.NewRows(infoSchemaColumns)

mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows)

columns := []string{
"upstream_host",
"slot_name",
"status",
"receive_start_lsn",
"receive_start_tli",
"received_tli",
"last_msg_send_time",
"last_msg_receipt_time",
"latest_end_lsn",
"latest_end_time",
"upstream_node",
}
rows := sqlmock.NewRows(columns).
AddRow(
"foo",
"bar",
2,
1200668684563608,
1687321285,
1687321280,
1687321275,
1687321276,
1200668684563610,
1687321277,
5,
)
mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithNoFlushedLSN)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatWalReceiverCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}

}

func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}
infoSchemaColumns := []string{
"column_name",
}

infoSchemaRows := sqlmock.NewRows(infoSchemaColumns).
AddRow(
"flushed_lsn",
)

mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows)

columns := []string{
"upstream_host",
"slot_name",
"status",
"receive_start_lsn",
"receive_start_tli",
"flushed_lsn",
"received_tli",
"last_msg_send_time",
"last_msg_receipt_time",
"latest_end_lsn",
"latest_end_time",
"upstream_node",
}
rows := sqlmock.NewRows(columns).
AddRow(
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
)
mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatWalReceiverCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}

}
118 changes: 118 additions & 0 deletions collector/pg_statio_user_indexes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector(statioUserIndexesSubsystem, defaultDisabled, NewPGStatioUserIndexesCollector)
}

type PGStatioUserIndexesCollector struct {
log log.Logger
}

const statioUserIndexesSubsystem = "statio_user_indexes"

func NewPGStatioUserIndexesCollector(config collectorConfig) (Collector, error) {
return &PGStatioUserIndexesCollector{log: config.logger}, nil
}

var (
statioUserIndexesIdxBlksRead = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statioUserIndexesSubsystem, "idx_blks_read"),
"Number of disk blocks read from this index",
[]string{"schemaname", "relname", "indexrelname"},
prometheus.Labels{},
)
statioUserIndexesIdxBlksHit = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statioUserIndexesSubsystem, "idx_blks_hit"),
"Number of buffer hits in this index",
[]string{"schemaname", "relname", "indexrelname"},
prometheus.Labels{},
)

statioUserIndexesQuery = `
SELECT
schemaname,
relname,
indexrelname,
idx_blks_read,
idx_blks_hit
FROM pg_statio_user_indexes
`
)

func (c *PGStatioUserIndexesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statioUserIndexesQuery)

if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var schemaname, relname, indexrelname sql.NullString
var idxBlksRead, idxBlksHit sql.NullFloat64

if err := rows.Scan(&schemaname, &relname, &indexrelname, &idxBlksRead, &idxBlksHit); err != nil {
return err
}
schemanameLabel := "unknown"
if schemaname.Valid {
schemanameLabel = schemaname.String
}
relnameLabel := "unknown"
if relname.Valid {
relnameLabel = relname.String
}
indexrelnameLabel := "unknown"
if indexrelname.Valid {
indexrelnameLabel = indexrelname.String
}
labels := []string{schemanameLabel, relnameLabel, indexrelnameLabel}

idxBlksReadMetric := 0.0
if idxBlksRead.Valid {
idxBlksReadMetric = idxBlksRead.Float64
}
ch <- prometheus.MustNewConstMetric(
statioUserIndexesIdxBlksRead,
prometheus.CounterValue,
idxBlksReadMetric,
labels...,
)

idxBlksHitMetric := 0.0
if idxBlksHit.Valid {
idxBlksHitMetric = idxBlksHit.Float64
}
ch <- prometheus.MustNewConstMetric(
statioUserIndexesIdxBlksHit,
prometheus.CounterValue,
idxBlksHitMetric,
labels...,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
109 changes: 109 additions & 0 deletions collector/pg_statio_user_indexes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPgStatioUserIndexesCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"indexrelname",
"idx_blks_read",
"idx_blks_hit",
}
rows := sqlmock.NewRows(columns).
AddRow("public", "pgtest_accounts", "pgtest_accounts_pkey", 8, 9)

mock.ExpectQuery(sanitizeQuery(statioUserIndexesQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatioUserIndexesCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatioUserIndexesCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "public", "relname": "pgtest_accounts", "indexrelname": "pgtest_accounts_pkey"}, value: 8, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schemaname": "public", "relname": "pgtest_accounts", "indexrelname": "pgtest_accounts_pkey"}, value: 9, metricType: dto.MetricType_COUNTER},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgStatioUserIndexesCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"indexrelname",
"idx_blks_read",
"idx_blks_hit",
}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil)

mock.ExpectQuery(sanitizeQuery(statioUserIndexesQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatioUserIndexesCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatioUserIndexesCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
86 changes: 86 additions & 0 deletions collector/pg_stuck_idle_in_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const stuckIdleInTransactionSubsystem = "stuck_in_transaction"

func init() {
registerCollector(stuckIdleInTransactionSubsystem, defaultDisabled, NewPGStuckIdleInTransactionCollector)
}

type PGStuckIdleInTransactionCollector struct {
log log.Logger
}

func NewPGStuckIdleInTransactionCollector(config collectorConfig) (Collector, error) {
return &PGStuckIdleInTransactionCollector{log: config.logger}, nil
}

var (
stuckIdleInTransactionQueries = prometheus.NewDesc(
prometheus.BuildFQName(namespace, longRunningTransactionsSubsystem, "queries"),
"Current number of queries that are stuck being idle in transactions",
[]string{},
prometheus.Labels{},
)

stuckIdleInTransactionQuery = `
SELECT
COUNT(*) AS queries
FROM pg_catalog.pg_stat_activity
WHERE
state = 'idle in transaction' AND (now() - query_start) > '10 minutes'::interval
`
)

func (PGStuckIdleInTransactionCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
stuckIdleInTransactionQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var queries float64

if err := rows.Scan(&queries); err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
stuckIdleInTransactionQueries,
prometheus.GaugeValue,
queries,
)
ch <- prometheus.MustNewConstMetric(
longRunningTransactionsAgeInSeconds,
prometheus.GaugeValue,
queries,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
61 changes: 61 additions & 0 deletions collector/pg_stuck_idle_in_transaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPGStuckIdleInTransactionCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"queries",
}
rows := sqlmock.NewRows(columns).
AddRow(30)

mock.ExpectQuery(sanitizeQuery(stuckIdleInTransactionQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStuckIdleInTransactionCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStuckIdleInTransactionCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, value: 30, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
98 changes: 98 additions & 0 deletions collector/pg_total_relation_size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const totalRelationSizeSubsystem = "total_relation_size"

func init() {
registerCollector(totalRelationSizeSubsystem, defaultDisabled, NewPGTotalRelationSizeCollector)
}

type PGTotalRelationSizeCollector struct {
log log.Logger
}

func NewPGTotalRelationSizeCollector(config collectorConfig) (Collector, error) {
return &PGTotalRelationSizeCollector{log: config.logger}, nil
}

var (
totalRelationSizeBytes = prometheus.NewDesc(
prometheus.BuildFQName(namespace, totalRelationSizeSubsystem, "bytes"),
"total disk space usage for the specified table and associated indexes",
[]string{"schemaname", "relname"},
prometheus.Labels{},
)

totalRelationSizeQuery = `
SELECT
relnamespace::regnamespace as schemaname,
relname as relname,
pg_total_relation_size(oid) bytes
FROM pg_class
WHERE relkind = 'r';
`
)

func (PGTotalRelationSizeCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
totalRelationSizeQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var schemaname, relname sql.NullString
var bytes sql.NullFloat64

if err := rows.Scan(&schemaname, &relname, &bytes); err != nil {
return err
}
schemanameLabel := "unknown"
if schemaname.Valid {
schemanameLabel = schemaname.String
}
relnameLabel := "unknown"
if relname.Valid {
relnameLabel = relname.String
}
labels := []string{schemanameLabel, relnameLabel}

bytesMetric := 0.0
if bytes.Valid {
bytesMetric = bytes.Float64
}
ch <- prometheus.MustNewConstMetric(
totalRelationSizeBytes,
prometheus.GaugeValue,
bytesMetric,
labels...,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
103 changes: 103 additions & 0 deletions collector/pg_total_relation_size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestPgTotalRelationSizeCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"bytes",
}
rows := sqlmock.NewRows(columns).
AddRow("public", "bar", 200)

mock.ExpectQuery(sanitizeQuery(totalRelationSizeQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGTotalRelationSizeCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGTotalRelationSizeCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "public", "relname": "bar"}, value: 200, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgTotalRelationSizeCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"schemaname",
"relname",
"bytes",
}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil)

mock.ExpectQuery(sanitizeQuery(totalRelationSizeQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGTotalRelationSizeCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGTotalRelationSizeCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"schemaname": "unknown", "relname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
99 changes: 99 additions & 0 deletions collector/pg_xid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const xidSubsystem = "xid"

func init() {
registerCollector(xidSubsystem, defaultDisabled, NewPGXidCollector)
}

type PGXidCollector struct {
log log.Logger
}

func NewPGXidCollector(config collectorConfig) (Collector, error) {
return &PGXidCollector{log: config.logger}, nil
}

var (
xidCurrent = prometheus.NewDesc(
prometheus.BuildFQName(namespace, xidSubsystem, "current"),
"Current 64-bit transaction id of the query used to collect this metric (truncated to low 52 bits)",
[]string{}, prometheus.Labels{},
)
xidXmin = prometheus.NewDesc(
prometheus.BuildFQName(namespace, xidSubsystem, "xmin"),
"Oldest transaction id of a transaction still in progress, i.e. not known committed or aborted (truncated to low 52 bits)",
[]string{}, prometheus.Labels{},
)
xidXminAge = prometheus.NewDesc(
prometheus.BuildFQName(namespace, xidSubsystem, "xmin_age"),
"Age of oldest transaction still not committed or aborted measured in transaction ids",
[]string{}, prometheus.Labels{},
)

xidQuery = `
SELECT
CASE WHEN pg_is_in_recovery() THEN 'NaN'::float ELSE txid_current() % (2^52)::bigint END AS current,
CASE WHEN pg_is_in_recovery() THEN 'NaN'::float ELSE txid_snapshot_xmin(txid_current_snapshot()) % (2^52)::bigint END AS xmin,
CASE WHEN pg_is_in_recovery() THEN 'NaN'::float ELSE txid_current() - txid_snapshot_xmin(txid_current_snapshot()) END AS xmin_age
`
)

func (PGXidCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
xidQuery)

if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var current, xmin, xminAge float64

if err := rows.Scan(&current, &xmin, &xminAge); err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
xidCurrent,
prometheus.CounterValue,
current,
)
ch <- prometheus.MustNewConstMetric(
xidXmin,
prometheus.CounterValue,
xmin,
)
ch <- prometheus.MustNewConstMetric(
xidXminAge,
prometheus.GaugeValue,
xminAge,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}
Loading