Skip to content
This repository was archived by the owner on Nov 28, 2022. It is now read-only.

Commit d87a80c

Browse files
bern4rdelliDiogo Nicoleti
authored and
Diogo Nicoleti
committed
[JARVIS#DE-556] Send replication slots metrics to Prometheus (#60)
* Replication slots metrics * Fix metrics * Add unit tests * Improve tests * Try to change postgresql.conf * build * Try to set manually * Try to set manually * Restart PostgreSQL * set max_replication_slots * set max_replication_slots * Using different slots * Using PostgreSQL 11 * Revert to 9.6 * Using 10.0 * test 11.2 * Using user travis * Show PostgreSQL version` * Install pg 11 * Using pg 11.3 * Try using packages * Return warnning when pg is lower than 10 * Pass test * Support pg 9.6 * Update postgres/version.go Co-Authored-By: Diogo Nicoleti <[email protected]> * Update gauges/logical_replication.go Co-Authored-By: Diogo Nicoleti <[email protected]> * Update gauges/logical_replication.go Co-Authored-By: Diogo Nicoleti <[email protected]> * Update gauges/logical_replication.go Co-Authored-By: Diogo Nicoleti <[email protected]> * Update gauges/logical_replication.go Co-Authored-By: Diogo Nicoleti <[email protected]> * Apply suggestions from CR * Improve help message * Return lag in bytes
1 parent 41b0f58 commit d87a80c

File tree

5 files changed

+152
-0
lines changed

5 files changed

+152
-0
lines changed

Diff for: .travis.yml

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ language: go
44
go: "1.10"
55
addons:
66
postgresql: "9.6"
7+
before_script:
8+
- psql -U travis -c "ALTER SYSTEM SET wal_level TO 'logical';"
9+
- psql -U travis -c "ALTER SYSTEM SET max_replication_slots TO '10';"
10+
- sudo /etc/init.d/postgresql restart
711
services:
812
- docker
913
- postgresql

Diff for: gauges/logical_replication.go

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package gauges
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/ContaAzul/postgresql_exporter/postgres"
8+
"github.com/prometheus/client_golang/prometheus"
9+
)
10+
11+
type slots struct {
12+
Name string `db:"slot_name"`
13+
Active float64 `db:"active"`
14+
TotalLag float64 `db:"total_lag"`
15+
}
16+
17+
// ReplicationSlotStatus returns the state of the replication slots
18+
func (g *Gauges) ReplicationSlotStatus() *prometheus.GaugeVec {
19+
var gauge = prometheus.NewGaugeVec(
20+
prometheus.GaugeOpts{
21+
Name: "postgresql_replication_slot_status",
22+
Help: "Returns 1 if the slot is currently actively being used",
23+
ConstLabels: g.labels,
24+
},
25+
[]string{"slot_name"},
26+
)
27+
go func() {
28+
for {
29+
gauge.Reset()
30+
var slots []slots
31+
if err := g.query(
32+
`
33+
SELECT
34+
slot_name,
35+
active::int
36+
FROM pg_replication_slots
37+
WHERE slot_type = 'logical'
38+
AND "database" = current_database();
39+
`,
40+
&slots,
41+
emptyParams,
42+
); err == nil {
43+
for _, slot := range slots {
44+
gauge.With(prometheus.Labels{
45+
"slot_name": slot.Name,
46+
}).Set(slot.Active)
47+
}
48+
}
49+
time.Sleep(g.interval)
50+
}
51+
}()
52+
return gauge
53+
}
54+
55+
// ReplicationSlotLagInBytes returns the total lag in bytes from the replication slots
56+
func (g *Gauges) ReplicationSlotLagInBytes() *prometheus.GaugeVec {
57+
var gauge = prometheus.NewGaugeVec(
58+
prometheus.GaugeOpts{
59+
Name: "postgresql_replication_lag_bytes",
60+
Help: "Total lag of the replication slots in bytes",
61+
ConstLabels: g.labels,
62+
},
63+
[]string{"slot_name"},
64+
)
65+
go func() {
66+
for {
67+
gauge.Reset()
68+
var slots []slots
69+
if err := g.query(
70+
fmt.Sprintf(
71+
`
72+
SELECT
73+
slot_name,
74+
%s(%s(), confirmed_flush_lsn) AS total_lag
75+
FROM pg_replication_slots
76+
WHERE slot_type = 'logical'
77+
AND "database" = current_database();
78+
`,
79+
postgres.Version(g.version()).WalLsnDiffFunctionName(),
80+
postgres.Version(g.version()).CurrentWalLsnFunctionName(),
81+
),
82+
&slots,
83+
emptyParams,
84+
); err == nil {
85+
for _, slot := range slots {
86+
gauge.With(prometheus.Labels{
87+
"slot_name": slot.Name,
88+
}).Set(slot.TotalLag)
89+
}
90+
}
91+
time.Sleep(g.interval)
92+
}
93+
}()
94+
return gauge
95+
}

Diff for: gauges/logical_replication_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package gauges
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestReplicationSlotStatus(t *testing.T) {
13+
var assert = assert.New(t)
14+
db, gauges, close := prepare(t)
15+
defer close()
16+
dropTestLogicalReplicationSlot := createTestLogicalReplicationSlot("test_status", t, db)
17+
defer dropTestLogicalReplicationSlot()
18+
var metrics = evaluate(t, gauges.ReplicationSlotStatus())
19+
assert.Len(metrics, 1)
20+
assertEqual(t, 0, metrics[0])
21+
assertNoErrs(t, gauges)
22+
}
23+
24+
func TestReplicationSlotLagInMegabytes(t *testing.T) {
25+
var assert = assert.New(t)
26+
db, gauges, close := prepare(t)
27+
defer close()
28+
dropTestLogicalReplicationSlot := createTestLogicalReplicationSlot("test_lag", t, db)
29+
defer dropTestLogicalReplicationSlot()
30+
var metrics = evaluate(t, gauges.ReplicationDelayInBytes())
31+
assert.Len(metrics, 1)
32+
assertEqual(t, 0, metrics[0])
33+
assertNoErrs(t, gauges)
34+
}
35+
36+
func createTestLogicalReplicationSlot(slotName string, t *testing.T, db *sql.DB) func() {
37+
_, err := db.Exec(fmt.Sprintf("SELECT * FROM pg_create_logical_replication_slot('%s', 'test_decoding');", slotName))
38+
require.NoError(t, err)
39+
return func() {
40+
_, err := db.Exec(fmt.Sprintf("SELECT pg_drop_replication_slot('%s');", slotName))
41+
assert.New(t).NoError(err)
42+
}
43+
}

Diff for: main.go

+2
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,6 @@ func watch(db *sql.DB, reg prometheus.Registerer, name string) {
120120
reg.MustRegister(gauges.LastTimeVacuumRan())
121121
reg.MustRegister(gauges.LastTimeAutoVacuumRan())
122122
reg.MustRegister(gauges.VacuumRunningTotal())
123+
reg.MustRegister(gauges.ReplicationSlotStatus())
124+
reg.MustRegister(gauges.ReplicationSlotLagInBytes())
123125
}

Diff for: postgres/version.go

+8
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,13 @@ func (v Version) LastWalReplayedLsnFunctionName() string {
4747
return "pg_last_wal_replay_lsn"
4848
}
4949
return "pg_last_xlog_replay_location"
50+
}
5051

52+
// CurrentWalLsnFunctionName returns the name of the function that gets current
53+
// write-ahead log write location according to the postgres version
54+
func (v Version) CurrentWalLsnFunctionName() string {
55+
if v.IsEqualOrGreaterThan10() {
56+
return "pg_current_wal_lsn"
57+
}
58+
return "pg_current_xlog_location"
5159
}

0 commit comments

Comments
 (0)