Skip to content

GCP CloudSQL Connector with asyncpg Pool #1148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
d1manson opened this issue Apr 26, 2024 · 1 comment
Closed

GCP CloudSQL Connector with asyncpg Pool #1148

d1manson opened this issue Apr 26, 2024 · 1 comment

Comments

@d1manson
Copy link
Contributor

Not sure if i'm better asking this on the cloud sql python connector repo or here, but I'm trying to understand how to effectively call asyncpg.create_pool(...) but make it work with the cloudsql connector.

I can create a single asyncpg-style connection using this:

from google.cloud.sql.connector import Connector

async def init():  
    connector = Connector(enable_iam_auth=True, loop=asyncio.get_event_loop())
    conn =  await connector.connect_async(
                instance_connection_string="my-project:region:db-name",
                driver="asyncpg",
                db="postgres",
                user="[email protected]")
    # do init stuff...
    await conn.set_type_codec("json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog")
    return conn

but i'm not sure how to do a complete drop in replacement for async.create_pool, which is what we have in the existing codebase; I'm hoping to retain the pool object as is so that the rest of the codebase doesn't need to be modified in light of switching to CloudSQL.

Thanks!

@d1manson
Copy link
Contributor Author

d1manson commented Aug 2, 2024

In case anyone finds it useful, i did actually come back to this and got it working as follows:

from asyncpg import Pool as Pool_original, create_pool as create_pool_original
import inspect
from google.cloud.sql.connector import Connector
import asyncio

async def connect(dsn, **kwargs):
    # The dsn should be a cloudsql instance_connection_string, not a full dsn; we use the name 'dsn' for consistency
    # with the original asyncpg.connect function arguments.

    # you may want to make this Connector a singleton rather than create a new one scoped to this function call
    connector = Connector(enable_iam_auth=True, loop=loop) 

    return await connector.connect_async(
        dsn,
        driver="asyncpg",
        user="SOME_USER_HERE",
        **kwargs
    )


class Pool(Pool_original):

    async def _get_new_connection(self):
        # this function body is copy-pasted from the base class, with just the first line modified
        con = await connect(*self._connect_args, loop=self._loop,
                            connection_class=self._connection_class,
                            record_class=self._record_class,
                            **self._connect_kwargs)

        if self._init is not None:
            try:
                await self._init(con)
            except (Exception, asyncio.CancelledError) as ex:
                # If a user-defined `init` function fails, we don't
                # know if the connection is safe for re-use, hence
                # we close it.  A new connection will be created
                # when `acquire` is called again.
                try:
                    # Use `close()` to close the connection gracefully.
                    # An exception in `init` isn't necessarily caused
                    # by an IO or a protocol error.  close() will
                    # do the necessary cleanup via _release_on_close().
                    await con.close()
                finally:
                    raise ex

        return con


def _borrow_default_kwargs(func, kwargs):
    signature = inspect.signature(func)
    return {
        **{
            k: v.default
            for k, v in signature.parameters.items()
            if v.default is not inspect.Parameter.empty and v.kind == v.KEYWORD_ONLY
        },
        **kwargs,
    }


def create_pool(dsn=None, **kwargs):
    return Pool(dsn, **_borrow_default_kwargs(create_pool_original, kwargs))

Note this would be somewhat simpler if it was possible to provide a custom .connect method for the _get_new_connection method to use - here - rather than having it be hardcoded to use the default connection.connect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant