Skip to content

Export query itself together with queryId in stat_statement metrics #940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra
* `[no-]collector.stat_statements`
Enable the `stat_statements` collector (default: disabled).

* `[no-]collector.stat_statements.include_query`
Enable selecting statement query together with queryId. (default: disabled)

* `--collector.stat_statements.query_length`
Maximum length of the statement text. Default is 120.

* `[no-]collector.stat_user_tables`
Enable the `stat_user_tables` collector (default: enabled).

Expand Down
7 changes: 4 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ const (
// Namespace for all metrics.
namespace = "pg"

defaultEnabled = true
defaultDisabled = false
collectorFlagPrefix = "collector."
defaultEnabled = true
defaultDisabled = false
)

var (
Expand Down Expand Up @@ -74,7 +75,7 @@ func registerCollector(name string, isDefaultEnabled bool, createFunc func(colle
}

// Create flag for this collector
flagName := fmt.Sprintf("collector.%s", name)
flagName := collectorFlagPrefix + name
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)

Expand Down
89 changes: 79 additions & 10 deletions collector/pg_stat_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,51 @@ package collector
import (
"context"
"database/sql"
"fmt"
"log/slog"

"github.com/alecthomas/kingpin/v2"
"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
)

const statStatementsSubsystem = "stat_statements"

var (
includeQueryFlag *bool = nil
statementLengthFlag *uint = nil
)

func init() {
// WARNING:
// Disabled by default because this set of metrics can be quite expensive on a busy server
// Every unique query will cause a new timeseries to be created
registerCollector(statStatementsSubsystem, defaultDisabled, NewPGStatStatementsCollector)

includeQueryFlag = kingpin.Flag(
fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".include_query"),
"Enable selecting statement query together with queryId. (default: disabled)").
Default(fmt.Sprintf("%v", defaultDisabled)).
Bool()
statementLengthFlag = kingpin.Flag(
fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".query_length"),
"Maximum length of the statement text.").
Default("120").
Uint()
}

type PGStatStatementsCollector struct {
log *slog.Logger
log *slog.Logger
includeQueryStatement bool
statementLength uint
}

func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) {
return &PGStatStatementsCollector{log: config.logger}, nil
return &PGStatStatementsCollector{
log: config.logger,
includeQueryStatement: *includeQueryFlag,
statementLength: *statementLengthFlag,
}, nil
}

var (
Expand Down Expand Up @@ -71,10 +95,22 @@ var (
prometheus.Labels{},
)

statStatementsQuery = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statStatementsSubsystem, "query_id"),
"SQL Query to queryid mapping",
[]string{"queryid", "query"},
prometheus.Labels{},
)
)

const (
pgStatStatementQuerySelect = `LEFT(pg_stat_statements.query, %d) as query,`

pgStatStatementsQuery = `SELECT
pg_get_userbyid(userid) as user,
pg_database.datname,
pg_stat_statements.queryid,
%s
pg_stat_statements.calls as calls_total,
pg_stat_statements.total_time / 1000.0 as seconds_total,
pg_stat_statements.rows as rows_total,
Expand All @@ -96,6 +132,7 @@ var (
pg_get_userbyid(userid) as user,
pg_database.datname,
pg_stat_statements.queryid,
%s
pg_stat_statements.calls as calls_total,
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
pg_stat_statements.rows as rows_total,
Expand All @@ -117,6 +154,7 @@ var (
pg_get_userbyid(userid) as user,
pg_database.datname,
pg_stat_statements.queryid,
%s
pg_stat_statements.calls as calls_total,
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
pg_stat_statements.rows as rows_total,
Expand All @@ -135,30 +173,42 @@ var (
LIMIT 100;`
)

func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
var query string
func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
var queryTemplate string
switch {
case instance.version.GE(semver.MustParse("17.0.0")):
query = pgStatStatementsQuery_PG17
queryTemplate = pgStatStatementsQuery_PG17
case instance.version.GE(semver.MustParse("13.0.0")):
query = pgStatStatementsNewQuery
queryTemplate = pgStatStatementsNewQuery
default:
query = pgStatStatementsQuery
queryTemplate = pgStatStatementsQuery
}
var querySelect = ""
if c.includeQueryStatement {
querySelect = fmt.Sprintf(pgStatStatementQuerySelect, c.statementLength)
}
query := fmt.Sprintf(queryTemplate, querySelect)

db := instance.getDB()
rows, err := db.QueryContext(ctx, query)

var presentQueryIds = make(map[string]struct{})

if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var user, datname, queryid sql.NullString
var user, datname, queryid, statement sql.NullString
var callsTotal, rowsTotal sql.NullInt64
var secondsTotal, blockReadSecondsTotal, blockWriteSecondsTotal sql.NullFloat64

if err := rows.Scan(&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal); err != nil {
var columns []any
if c.includeQueryStatement {
columns = []any{&user, &datname, &queryid, &statement, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
} else {
columns = []any{&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
}
if err := rows.Scan(columns...); err != nil {
return err
}

Expand Down Expand Up @@ -229,6 +279,25 @@ func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance,
blockWriteSecondsTotalMetric,
userLabel, datnameLabel, queryidLabel,
)

if c.includeQueryStatement {
_, ok := presentQueryIds[queryidLabel]
if !ok {
presentQueryIds[queryidLabel] = struct{}{}

queryLabel := "unknown"
if statement.Valid {
queryLabel = statement.String
}

ch <- prometheus.MustNewConstMetric(
statStatementsQuery,
prometheus.CounterValue,
1,
queryidLabel, queryLabel,
)
}
}
}
if err := rows.Err(); err != nil {
return err
Expand Down
Loading