Skip to content

Commit fbd1ef2

Browse files
committed
Add connection_holder_class to Pool for custom connection handling #1251
1 parent 5b14653 commit fbd1ef2

File tree

1 file changed

+49
-25
lines changed

1 file changed

+49
-25
lines changed

Diff for: asyncpg/pool.py

+49-25
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from __future__ import annotations
88

99
import asyncio
10+
import typing
1011
from collections.abc import Awaitable, Callable
1112
import functools
1213
import inspect
@@ -341,7 +342,7 @@ class Pool:
341342
'_queue', '_loop', '_minsize', '_maxsize',
342343
'_init', '_connect', '_reset', '_connect_args', '_connect_kwargs',
343344
'_holders', '_initialized', '_initializing', '_closing',
344-
'_closed', '_connection_class', '_record_class', '_generation',
345+
'_closed', '_connection_class', '_connection_holder_class', '_record_class', '_generation',
345346
'_setup', '_max_queries', '_max_inactive_connection_lifetime'
346347
)
347348

@@ -356,6 +357,7 @@ def __init__(self, *connect_args,
356357
reset=None,
357358
loop,
358359
connection_class,
360+
connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
359361
record_class,
360362
**connect_kwargs):
361363

@@ -408,6 +410,7 @@ def __init__(self, *connect_args,
408410
self._queue = None
409411

410412
self._connection_class = connection_class
413+
self._connection_holder_class = connection_holder_class
411414
self._record_class = record_class
412415

413416
self._closing = False
@@ -443,37 +446,48 @@ async def _async__init__(self):
443446
self._initialized = True
444447

445448
async def _initialize(self):
449+
self._initialize_connections_queue()
450+
if self._minsize:
451+
await self._initialize_connections()
452+
453+
def _initialize_connections_queue(self) -> None:
446454
self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
447455
for _ in range(self._maxsize):
448-
ch = PoolConnectionHolder(
449-
self,
456+
ch = self._connection_holder_class(
457+
pool=self,
458+
setup=self._setup,
450459
max_queries=self._max_queries,
451460
max_inactive_time=self._max_inactive_connection_lifetime,
452-
setup=self._setup)
453-
461+
)
454462
self._holders.append(ch)
455463
self._queue.put_nowait(ch)
456464

457-
if self._minsize:
458-
# Since we use a LIFO queue, the first items in the queue will be
459-
# the last ones in `self._holders`. We want to pre-connect the
460-
# first few connections in the queue, therefore we want to walk
461-
# `self._holders` in reverse.
462-
463-
# Connect the first connection holder in the queue so that
464-
# any connection issues are visible early.
465-
first_ch = self._holders[-1] # type: PoolConnectionHolder
466-
await first_ch.connect()
467-
468-
if self._minsize > 1:
469-
connect_tasks = []
470-
for i, ch in enumerate(reversed(self._holders[:-1])):
471-
# `minsize - 1` because we already have first_ch
472-
if i >= self._minsize - 1:
473-
break
474-
connect_tasks.append(ch.connect())
475-
476-
await asyncio.gather(*connect_tasks)
465+
466+
async def _initialize_connections(self) -> None:
467+
468+
if not self._minsize:
469+
raise exceptions.InterfaceError(
470+
'pool is already initialized with min_size > 0')
471+
472+
# Since we use a LIFO queue, the first items in the queue will be
473+
# the last ones in `self._holders`. We want to pre-connect the
474+
# first few connections in the queue, therefore we want to walk
475+
# `self._holders` in reverse.
476+
477+
# Connect the first connection holder in the queue so that
478+
# any connection issues are visible early.
479+
first_ch = self._holders[-1] # type: PoolConnectionHolder
480+
await first_ch.connect()
481+
482+
if self._minsize > 1:
483+
connect_tasks = []
484+
for i, ch in enumerate(reversed(self._holders[:-1])):
485+
# `minsize - 1` because we already have first_ch
486+
if i >= self._minsize - 1:
487+
break
488+
connect_tasks.append(ch.connect())
489+
490+
await asyncio.gather(*connect_tasks)
477491

478492
def is_closing(self):
479493
"""Return ``True`` if the pool is closing or is closed.
@@ -1083,6 +1097,7 @@ def create_pool(dsn=None, *,
10831097
reset=None,
10841098
loop=None,
10851099
connection_class=connection.Connection,
1100+
connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
10861101
record_class=protocol.Record,
10871102
**connect_kwargs):
10881103
r"""Create a connection pool.
@@ -1142,6 +1157,11 @@ def create_pool(dsn=None, *,
11421157
The class to use for connections. Must be a subclass of
11431158
:class:`~asyncpg.connection.Connection`.
11441159
1160+
:param PoolConnectionHolder connection_holder_class:
1161+
The class to use for connection holders. This class is used
1162+
to manage the connection lifecycle in the pool. Must be a subclass of
1163+
:class:`~asyncpg.pool.PoolConnectionHolder`
1164+
11451165
:param type record_class:
11461166
If specified, the class to use for records returned by queries on
11471167
the connections in this pool. Must be a subclass of
@@ -1230,10 +1250,14 @@ def create_pool(dsn=None, *,
12301250
12311251
.. versionchanged:: 0.30.0
12321252
Added the *connect* and *reset* parameters.
1253+
1254+
.. versionchanged:: 0.31.0
1255+
Added the *pool_connection_holder_class* parameter.
12331256
"""
12341257
return Pool(
12351258
dsn,
12361259
connection_class=connection_class,
1260+
connection_holder_class=connection_holder_class,
12371261
record_class=record_class,
12381262
min_size=min_size,
12391263
max_size=max_size,

0 commit comments

Comments
 (0)