Skip to content

Commit a8d5f78

Browse files
committed
Fix PreparedStatement to set a unique name
PostgreSQL prepared statements and portals are referenced by names, where setting an empty string is ok, as it means it is unnamed prepared statement. [1] However there is a weird issue happening under high loads where a different set of parameters are being set to the wrong prepared statements: `bind message supplies 10 parameters, but prepared statement “...” requires 12` And also sporadically we get `ERROR: portal "" cannot be run` making it super hard to troubleshoot as all portals currently share the same name. Setting a unique name for each prepared statement has fixed the problem in our production setting, however I am not able to pinpoint why the problem exists in the first place, as "" seems to be a valid portal name. [1] - https://www.postgresql.org/docs/11/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
1 parent dd06237 commit a8d5f78

File tree

3 files changed

+31
-10
lines changed

3 files changed

+31
-10
lines changed

src/main/scala/com/twitter/finagle/postgres/PostgresClientImpl.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class PostgresClientImpl(
158158
typeMap().flatMap { _ =>
159159
for {
160160
service <- factory()
161-
statement = new PreparedStatementImpl("", sql, service)
161+
statement = new PreparedStatementImpl(sql, service)
162162
result <- statement.selectToStream(params: _*)(f)
163163
} yield result
164164
}
@@ -171,7 +171,7 @@ class PostgresClientImpl(
171171
typeMap().flatMap { _ =>
172172
for {
173173
service <- factory()
174-
statement = new PreparedStatementImpl("", sql, service)
174+
statement = new PreparedStatementImpl(sql, service)
175175
OK(count) <- statement.exec(params: _*)
176176
} yield count
177177
}
@@ -215,11 +215,12 @@ class PostgresClientImpl(
215215

216216

217217
private[this] class PreparedStatementImpl(
218-
name: String,
219218
sql: String,
220219
service: Service[PgRequest, PgResponse]
221220
) extends PreparedStatement {
222221

222+
private[this] val name = s"fin-pg-$id-" + counter.incrementAndGet
223+
223224
def closeService = service.close()
224225

225226
private[this] def parse(params: Param[_]*): Future[Unit] = {
@@ -306,8 +307,6 @@ class PostgresClientImpl(
306307
}.ensure(service.close())
307308
}
308309
}
309-
310-
private[this] def genName() = s"fin-pg-$id-" + counter.incrementAndGet
311310
}
312311

313312

src/test/scala/com/twitter/finagle/postgres/integration/IntegrationSpec.scala

+26-4
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,14 @@ class IntegrationSpec extends Spec {
3939
useSsl = sys.env.getOrElse("USE_PG_SSL", "0") == "1"
4040
sslHost = sys.env.get("PG_SSL_HOST")
4141
} yield {
42-
43-
4442
val queryTimeout = Duration.fromSeconds(2)
4543

46-
def getClient: PostgresClientImpl = {
44+
def getClient: PostgresClientImpl = getConcurrentClient(maxConcurrency = 1)
45+
def getConcurrentClient(maxConcurrency: Int): PostgresClientImpl = {
4746
val client = Postgres.Client()
4847
.withCredentials(user, password)
4948
.database(dbname)
50-
.withSessionPool.maxSize(1)
49+
.withSessionPool.maxSize(maxConcurrency)
5150
.conditionally(useSsl, c => sslHost.fold(c.withTransport.tls)(c.withTransport.tls(_)))
5251
.newRichClient(hostPort)
5352

@@ -440,6 +439,29 @@ class IntegrationSpec extends Spec {
440439
client.status must equal(Status.Closed)
441440
}
442441
}
442+
443+
"generate unique names per prepared statement" in {
444+
val client = getConcurrentClient(maxConcurrency = 10)
445+
cleanDb(client)
446+
447+
val concurrentQueries = (1 to 1000).par.map {
448+
case number if number % 2 == 0 =>
449+
client.prepareAndExecute(
450+
"INSERT INTO %s (str_field, int_field, double_field, bool_field) VALUES ($1, $2, $3, $4)".format(IntegrationSpec.pgTestTable),
451+
number.toString, number, number, true
452+
)
453+
case number =>
454+
client.prepareAndExecute(
455+
"INSERT INTO %s (str_field) VALUES ($1)".format(IntegrationSpec.pgTestTable),
456+
number.toString
457+
)
458+
}
459+
460+
val result = Await.result(
461+
Future.collect(concurrentQueries.seq)
462+
)
463+
result.size must equal(1000)
464+
}
443465
}
444466

445467
}

version.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "0.13.0-SNAPSHOT"
1+
version in ThisBuild := "0.13.1-SNAPSHOT"

0 commit comments

Comments
 (0)