Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): introduce maxResultSize limit and fix pg errors handling #911

Merged
merged 1 commit into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7,981 changes: 2,566 additions & 5,415 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
"test": "run-s db:clean db:run test:run db:clean",
"db:clean": "cd test/db && docker compose down",
"db:run": "cd test/db && docker compose up --detach --wait",
"test:run": "vitest run --coverage",
"test:update": "run-s db:clean db:run && vitest run --update && run-s db:clean"
"test:run": "PG_META_MAX_RESULT_SIZE=20971520 vitest run --coverage",
"test:update": "run-s db:clean db:run && PG_META_MAX_RESULT_SIZE=20971520 vitest run --update && run-s db:clean"
},
"engines": {
"node": ">=20",
Expand All @@ -46,10 +46,10 @@
"crypto-js": "^4.0.0",
"fastify": "^4.24.3",
"fastify-metrics": "^10.0.0",
"pg": "^8.13.1",
"pg": "npm:@supabase/[email protected]",
"pg-connection-string": "^2.7.0",
"pg-format": "^1.0.4",
"pg-protocol": "^1.7.0",
"pg-protocol": "npm:@supabase/[email protected]",
"pgsql-parser": "^13.16.0",
"pino": "^9.5.0",
"postgres-array": "^3.0.1",
Expand Down
3 changes: 1 addition & 2 deletions src/lib/PostgresMeta.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { PoolConfig } from 'pg'
import * as Parser from './Parser.js'
import PostgresMetaColumnPrivileges from './PostgresMetaColumnPrivileges.js'
import PostgresMetaColumns from './PostgresMetaColumns.js'
Expand All @@ -20,7 +19,7 @@ import PostgresMetaTypes from './PostgresMetaTypes.js'
import PostgresMetaVersion from './PostgresMetaVersion.js'
import PostgresMetaViews from './PostgresMetaViews.js'
import { init } from './db.js'
import { PostgresMetaResult } from './types.js'
import { PostgresMetaResult, PoolConfig } from './types.js'

export default class PostgresMeta {
query: (sql: string) => Promise<PostgresMetaResult<any>>
Expand Down
87 changes: 74 additions & 13 deletions src/lib/db.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pg, { PoolConfig } from 'pg'
import { DatabaseError } from 'pg-protocol'
import pg from 'pg'
import { parse as parseArray } from 'postgres-array'
import { PostgresMetaResult } from './types.js'
import { PostgresMetaResult, PoolConfig } from './types.js'

pg.types.setTypeParser(pg.types.builtins.INT8, (x) => {
const asNumber = Number(x)
Expand All @@ -21,6 +20,42 @@ pg.types.setTypeParser(1185, parseArray) // _timestamptz
pg.types.setTypeParser(600, (x) => x) // point
pg.types.setTypeParser(1017, (x) => x) // _point

// Ensure any query will have an appropriate error handler on the pool to prevent connections errors
// to bubble up all the stack eventually killing the server
const poolerQueryHandleError = (pgpool: pg.Pool, sql: string): Promise<pg.QueryResult<any>> => {
return new Promise((resolve, reject) => {
let rejected = false
const connectionErrorHandler = (err: any) => {
// If the error hasn't already be propagated to the catch
if (!rejected) {
// This is a trick to wait for the next tick, leaving a chance for handled errors such as
// RESULT_SIZE_LIMIT to take over other stream errors such as `unexpected commandComplete message`
setTimeout(() => {
rejected = true
return reject(err)
})
}
}
// This listened avoid getting uncaught exceptions for errors happening at connection level within the stream
// such as parse or RESULT_SIZE_EXCEEDED errors instead, handle the error gracefully by bubbling in up to the caller
pgpool.once('error', connectionErrorHandler)
pgpool
.query(sql)
.then((results: pg.QueryResult<any>) => {
if (!rejected) {
return resolve(results)
}
})
.catch((err: any) => {
// If the error hasn't already be handled within the error listener
if (!rejected) {
rejected = true
return reject(err)
}
})
})
}

export const init: (config: PoolConfig) => {
query: (sql: string) => Promise<PostgresMetaResult<any>>
end: () => Promise<void>
Expand Down Expand Up @@ -60,26 +95,27 @@ export const init: (config: PoolConfig) => {
// compromise: if we run `query` after `pool.end()` is called (i.e. pool is
// `null`), we temporarily create a pool and close it right after.
let pool: pg.Pool | null = new pg.Pool(config)

return {
async query(sql) {
try {
if (!pool) {
const pool = new pg.Pool(config)
let res = await pool.query(sql)
let res = await poolerQueryHandleError(pool, sql)
if (Array.isArray(res)) {
res = res.reverse().find((x) => x.rows.length !== 0) ?? { rows: [] }
}
await pool.end()
return { data: res.rows, error: null }
}

let res = await pool.query(sql)
let res = await poolerQueryHandleError(pool, sql)
if (Array.isArray(res)) {
res = res.reverse().find((x) => x.rows.length !== 0) ?? { rows: [] }
}
return { data: res.rows, error: null }
} catch (error: any) {
if (error instanceof DatabaseError) {
if (error.constructor.name === 'DatabaseError') {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note

With the new error propagation, had to slightly tweak this as the instanceof wasn't properly working. Even tough the constructor was the right one, the instance kept being an Error because of the promise wrapping.

// Roughly based on:
// - https://github.com/postgres/postgres/blob/fc4089f3c65a5f1b413a3299ba02b66a8e5e37d0/src/interfaces/libpq/fe-protocol3.c#L1018
// - https://github.com/brianc/node-postgres/blob/b1a8947738ce0af004cb926f79829bb2abc64aa6/packages/pg/lib/native/query.js#L33
Expand Down Expand Up @@ -146,17 +182,42 @@ ${' '.repeat(5 + lineNumber.toString().length + 2 + lineOffset)}^
},
}
}

return { data: null, error: { message: error.message } }
try {
// Handle stream errors and result size exceeded errors
if (error.code === 'RESULT_SIZE_EXCEEDED') {
// Force kill the connection without waiting for graceful shutdown
return {
data: null,
error: {
message: `Query result size (${error.resultSize} bytes) exceeded the configured limit (${error.maxResultSize} bytes)`,
code: error.code,
resultSize: error.resultSize,
maxResultSize: error.maxResultSize,
},
}
}
return { data: null, error: { code: error.code, message: error.message } }
} finally {
// If the error isn't a "DatabaseError" assume it's a connection related we kill the connection
// To attempt a clean reconnect on next try
await this.end()
}
}
},

async end() {
const _pool = pool
pool = null
// Gracefully wait for active connections to be idle, then close all
// connections in the pool.
if (_pool) await _pool.end()
try {
const _pool = pool
pool = null
// Gracefully wait for active connections to be idle, then close all
// connections in the pool.
if (_pool) {
await _pool.end()
}
} catch (endError) {
// Ignore any errors during cleanup just log them
console.error('Failed ending connection pool', endError)
}
},
}
}
20 changes: 7 additions & 13 deletions src/lib/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Static, Type } from '@sinclair/typebox'
import { DatabaseError } from 'pg-protocol'
import type { Options as PrettierOptions } from 'prettier'
import { PoolConfig as PgPoolConfig } from 'pg'

export interface FormatterOptions extends PrettierOptions {}

Expand Down Expand Up @@ -251,13 +252,7 @@ export const postgresPublicationSchema = Type.Object({
publish_delete: Type.Boolean(),
publish_truncate: Type.Boolean(),
tables: Type.Union([
Type.Array(
Type.Object({
id: Type.Integer(),
name: Type.String(),
schema: Type.String(),
})
),
Type.Array(Type.Object({ id: Type.Integer(), name: Type.String(), schema: Type.String() })),
Type.Null(),
]),
})
Expand Down Expand Up @@ -445,12 +440,7 @@ export const postgresTypeSchema = Type.Object({
schema: Type.String(),
format: Type.String(),
enums: Type.Array(Type.String()),
attributes: Type.Array(
Type.Object({
name: Type.String(),
type_id: Type.Integer(),
})
),
attributes: Type.Array(Type.Object({ name: Type.String(), type_id: Type.Integer() })),
comment: Type.Union([Type.String(), Type.Null()]),
})
export type PostgresType = Static<typeof postgresTypeSchema>
Expand Down Expand Up @@ -596,3 +586,7 @@ export const postgresColumnPrivilegesRevokeSchema = Type.Object({
]),
})
export type PostgresColumnPrivilegesRevoke = Static<typeof postgresColumnPrivilegesRevokeSchema>

export interface PoolConfig extends PgPoolConfig {
maxResultSize?: number
}
8 changes: 2 additions & 6 deletions src/server/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@ import { PG_META_REQ_HEADER } from './constants.js'
import routes from './routes/index.js'
import { extractRequestForLogging } from './utils.js'
// Pseudo package declared only for this module
import pkg from '#package.json' assert { type: 'json' }
import pkg from '#package.json' with { type: 'json' }

export const build = (opts: FastifyServerOptions = {}): FastifyInstance => {
const app = fastify({
disableRequestLogging: true,
requestIdHeader: PG_META_REQ_HEADER,
...opts,
})
const app = fastify({ disableRequestLogging: true, requestIdHeader: PG_META_REQ_HEADER, ...opts })

app.setErrorHandler((error, request, reply) => {
app.log.error({ error: error.toString(), request: extractRequestForLogging(request) })
Expand Down
9 changes: 7 additions & 2 deletions src/server/constants.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import crypto from 'crypto'
import { PoolConfig } from 'pg'
import { PoolConfig } from '../lib/types.js'
import { getSecret } from '../lib/secrets.js'
import { AccessControl } from './templates/swift.js'
import pkg from '#package.json' assert { type: 'json' }
import pkg from '#package.json' with { type: 'json' }

export const PG_META_HOST = process.env.PG_META_HOST || '0.0.0.0'
export const PG_META_PORT = Number(process.env.PG_META_PORT || 1337)
Expand Down Expand Up @@ -49,11 +49,16 @@ export const GENERATE_TYPES_SWIFT_ACCESS_CONTROL = process.env
? (process.env.PG_META_GENERATE_TYPES_SWIFT_ACCESS_CONTROL as AccessControl)
: 'internal'

export const PG_META_MAX_RESULT_SIZE = process.env.PG_META_MAX_RESULT_SIZE
? parseInt(process.env.PG_META_MAX_RESULT_SIZE, 10)
: 2 * 1024 * 1024 * 1024 // default to 2GB max query size result

export const DEFAULT_POOL_CONFIG: PoolConfig = {
max: 1,
connectionTimeoutMillis: PG_CONN_TIMEOUT_SECS * 1000,
ssl: PG_META_DB_SSL_ROOT_CERT ? { ca: PG_META_DB_SSL_ROOT_CERT } : undefined,
application_name: `postgres-meta ${pkg.version}`,
maxResultSize: PG_META_MAX_RESULT_SIZE,
}

export const PG_META_REQ_HEADER = process.env.PG_META_REQ_HEADER || 'request-id'
1 change: 1 addition & 0 deletions test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ import './server/query'
import './server/ssl'
import './server/table-privileges'
import './server/typegen'
import './server/result-size-limit'
7 changes: 2 additions & 5 deletions test/lib/secrets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ vi.mock('node:fs/promises', async (): Promise<typeof import('node:fs/promises')>
const originalModule =
await vi.importActual<typeof import('node:fs/promises')>('node:fs/promises')
const readFile = vi.fn()
return {
...originalModule,
readFile,
}
return { ...originalModule, readFile }
})

describe('getSecret', () => {
Expand Down Expand Up @@ -57,6 +54,6 @@ describe('getSecret', () => {
const e: NodeJS.ErrnoException = new Error('permission denied')
e.code = 'EACCES'
vi.mocked(readFile).mockRejectedValueOnce(e)
expect(getSecret('SECRET')).rejects.toThrow()
await expect(getSecret('SECRET')).rejects.toThrow()
})
})
Loading