diff --git a/examples/global_pool/README.md b/examples/global_pool/README.md new file mode 100644 index 00000000..c7956b78 --- /dev/null +++ b/examples/global_pool/README.md @@ -0,0 +1,67 @@ + +This example is about how to use asyncpg conveniently in both server environment and non-server environment. Users from pymongo will be comfortable for demo provided. The demo does not require to write data type schema like norm orm do. The process is adapted for non-async datadb users: first to get db handler, then table handler ,and then run code. + +The demo code provide PG class,which save connection pool in global dictionary with database name as key. Each database has one and only one pool. Different pg instances can share same db pool. + +# Non Server Environment +```build +from pg import PG + +# define your data process logic here +async def run_many_sqls_in_transaction(table): + sqls = [ + "sql1", + "sql2", + "sql3", + ] + await table.trans(sqls) +async def run_one_sql(table): + sql="INSERT INTO ......" + await table.execute(sql) +async def select(table): + sql="select ......" + b = await table.select(sql) + print(11, b) + +pgdb = PG("host", "port", "user", "password", "database") +table = pgdb['test'] +table.run(run_many_sqls_in_transaction) +table.run(run_one_sql) +table.run(select) + + +``` + +In non-server environment, data process logic is written in async function. Run this function in PG class method "run" will guarantee connection pool will be terminated. + +# Server Environment +```build +# example by fastapi + +from pg import PG +from fastapi import FastAPI +from fastapi.testclient import TestClient + +app = FastAPI() + +# Do not need init connection pool here,pg class will init automatically when pool is none. +@app.on_event("startup") +async def startup_event(): + pass + +@app.on_event("shutdown") +async def shutdown_event(): + pgdb = PG("host", "port", "user", "password", "database") + pgdb.terminate_pool() + + +@app.get("/xxx") +async def handlers(): + pgdb = PG("host", "port", "user", "password", "database") + table = pgdb['test'] + data= await table.select("select ....") + return data + + +``` +The PG class will create pool with first call. The pool will be terminated during server lifetime shutdown phase. \ No newline at end of file diff --git a/examples/global_pool/pg.py b/examples/global_pool/pg.py new file mode 100644 index 00000000..44193c1c --- /dev/null +++ b/examples/global_pool/pg.py @@ -0,0 +1,75 @@ +import asyncio + +import asyncpg + +DB = {} + + +class PG: + def __init__(self, host, port, user, pwd, dbName): + self.user = user + self.pwd = pwd + self.host = host + self.port = port + self.dbName = dbName + self.db = None + self.table = None + + def __getitem__(self, tb): + self.table = tb + return self + + async def db_pool(self): + global DB + if DB.get(self.dbName, None) is None: + DB[self.dbName] = await asyncpg.create_pool( + host=self.host, + port=self.port, + user=self.user, + password=self.pwd, + database=self.dbName, + ) + + self.db = DB[self.dbName] + print("DB", self.dbName, self.db) + return self + + # 如果测算程序不调用,切数据库会出现pgadmin看不到表格的问题. + def terminate_pool(self): + print("will terminate", self.db) + if self.db is not None: + self.db.terminate() + DB.pop(self.dbName, None) + + async def check(self): + if self.dbName is None: + raise "no db name" + if self.table is None: + raise "no table name" + if self.db is None: + await self.db_pool() + + async def execute(self, sql): + print("sql===", sql) + await self.check() + async with self.db.acquire() as conn: + await conn.execute(sql) + + async def trans(self, sqls): + await self.check() + async with self.db.acquire() as conn: + async with conn.transaction(): + for sql in sqls: + print("sql===", sql) + await conn.execute(sql) + + async def select(self, sql): + await self.check() + async with self.db.acquire() as conn: + q = await conn.fetch(sql) + return [dict(i) for i in q] + + def run(self, f): + print(self.dbName, self.table) + asyncio.get_event_loop().run_until_complete(f(self)) + self.terminate_pool()