Skip to content

Commit b588ec9

Browse files
committed
Add replication slot test
Signed-off-by: Felix Yuan <[email protected]>
1 parent cd84f17 commit b588ec9

File tree

2 files changed

+109
-9
lines changed

2 files changed

+109
-9
lines changed

Diff for: collector/replication_slots.go

+20-9
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,22 @@ var pgReplicationSlotConfirmedFlushLSN = prometheus.NewDesc(
4747

4848
var pgReplicationSlotActive = prometheus.NewDesc(
4949
"pg_replication_slot_is_active",
50-
"last lsn confirmed flushed to the replication slot",
50+
"whether the replication slot is active or not",
5151
[]string{"slot_name"}, nil,
5252
)
5353

54+
var pgReplicationSlotQuery = `SELECT
55+
slot_name,
56+
pg_current_wal_lsn() - '0/0' AS current_wal_lsn,
57+
coalesce(confirmed_flush_lsn, '0/0') - '0/0',
58+
active
59+
FROM
60+
pg_replication_slots;
61+
`
62+
5463
func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error {
5564
rows, err := db.QueryContext(ctx,
56-
`SELECT
57-
slot_name,
58-
pg_current_wal_lsn() - '0/0' AS current_wal_lsn,
59-
coalesce(confirmed_flush_lsn, '0/0') - '0/0',
60-
active
61-
FROM
62-
pg_replication_slots;`)
65+
pgReplicationSlotQuery)
6366
if err != nil {
6467
return err
6568
}
@@ -70,10 +73,18 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch cha
7073
var wal_lsn int64
7174
var flush_lsn int64
7275
var is_active bool
76+
var is_active_value int
77+
7378
if err := rows.Scan(&slot_name, &wal_lsn, &flush_lsn, &is_active); err != nil {
7479
return err
7580
}
7681

82+
if is_active {
83+
is_active_value = 1
84+
} else {
85+
is_active_value = 0
86+
}
87+
7788
ch <- prometheus.MustNewConstMetric(
7889
pgReplicationSlotCurrentWalLSN,
7990
prometheus.GaugeValue, float64(wal_lsn), slot_name,
@@ -86,7 +97,7 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch cha
8697
}
8798
ch <- prometheus.MustNewConstMetric(
8899
pgReplicationSlotActive,
89-
prometheus.GaugeValue, float64(flush_lsn), slot_name,
100+
prometheus.GaugeValue, float64(is_active_value), slot_name,
90101
)
91102
}
92103
if err := rows.Err(); err != nil {

Diff for: collector/replication_slots_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package collector
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/DATA-DOG/go-sqlmock"
8+
"github.com/prometheus/client_golang/prometheus"
9+
dto "github.com/prometheus/client_model/go"
10+
"github.com/smartystreets/goconvey/convey"
11+
)
12+
13+
func TestPgReplicationSlotCollectorActive(t *testing.T) {
14+
db, mock, err := sqlmock.New()
15+
if err != nil {
16+
t.Fatalf("Error opening a stub db connection: %s", err)
17+
}
18+
defer db.Close()
19+
20+
columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
21+
rows := sqlmock.NewRows(columns).
22+
AddRow("test_slot", 5, 3, true)
23+
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
24+
25+
ch := make(chan prometheus.Metric)
26+
go func() {
27+
defer close(ch)
28+
c := PGReplicationSlotCollector{}
29+
30+
if err := c.Update(context.Background(), db, ch); err != nil {
31+
t.Errorf("Error calling PGPostmasterCollector.Update: %s", err)
32+
}
33+
}()
34+
35+
expected := []MetricResult{
36+
{labels: labelMap{"slot_name": "test_slot"}, value: 5, metricType: dto.MetricType_GAUGE},
37+
{labels: labelMap{"slot_name": "test_slot"}, value: 3, metricType: dto.MetricType_GAUGE},
38+
{labels: labelMap{"slot_name": "test_slot"}, value: 1, metricType: dto.MetricType_GAUGE},
39+
}
40+
41+
convey.Convey("Metrics comparison", t, func() {
42+
for _, expect := range expected {
43+
m := readMetric(<-ch)
44+
convey.So(expect, convey.ShouldResemble, m)
45+
}
46+
})
47+
if err := mock.ExpectationsWereMet(); err != nil {
48+
t.Errorf("there were unfulfilled exceptions: %s", err)
49+
}
50+
}
51+
52+
func TestPgReplicationSlotCollectorInActive(t *testing.T) {
53+
db, mock, err := sqlmock.New()
54+
if err != nil {
55+
t.Fatalf("Error opening a stub db connection: %s", err)
56+
}
57+
defer db.Close()
58+
59+
columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
60+
rows := sqlmock.NewRows(columns).
61+
AddRow("test_slot", 6, 12, false)
62+
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
63+
64+
ch := make(chan prometheus.Metric)
65+
go func() {
66+
defer close(ch)
67+
c := PGReplicationSlotCollector{}
68+
69+
if err := c.Update(context.Background(), db, ch); err != nil {
70+
t.Errorf("Error calling PGPostmasterCollector.Update: %s", err)
71+
}
72+
}()
73+
74+
expected := []MetricResult{
75+
{labels: labelMap{"slot_name": "test_slot"}, value: 6, metricType: dto.MetricType_GAUGE},
76+
{labels: labelMap{"slot_name": "test_slot"}, value: 0, metricType: dto.MetricType_GAUGE},
77+
}
78+
79+
convey.Convey("Metrics comparison", t, func() {
80+
for _, expect := range expected {
81+
m := readMetric(<-ch)
82+
convey.So(expect, convey.ShouldResemble, m)
83+
}
84+
})
85+
if err := mock.ExpectationsWereMet(); err != nil {
86+
t.Errorf("there were unfulfilled exceptions: %s", err)
87+
}
88+
89+
}

0 commit comments

Comments
 (0)