python -version : 3.6.6
import asyncio import aiomysql import sys async def test_example(loop): pool = await aiomysql.create_pool(host='127.0.0.1', port=3308, user='finger', password='fypingfyping', db='userlogin_test', loop=loop) mgr = await pool.acquire() aexit = aiomysql.connection.Connection.__aexit__ aenter = await aiomysql.connection.Connection.__aenter__(mgr) conn = aenter try: mg = await conn.cursor() aexit_ = aiomysql.cursors.Cursor.__aexit__ aenter_ = await aiomysql.cursors.Cursor.__aenter__(mg) cur = aenter_ try: await cur.execute("select 48") print(cur.description) (r,) = await cur.fetchone() assert r==48 except: if not await aexit_(mg,*sys.exc_info()): raise else: await aexit_(mg,None,None,None) except : if not await aexit(mgr,*sys.exc_info()): raise else: await aexit(mgr,None,None,None) pool.close() await pool.wait_closed() async def test_example_0(loop): pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='', db='mysql', loop=loop) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") print(cur.description) (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() loop = asyncio.get_event_loop() loop.run_until_complete(test_example(loop))
源代码是 aiomysql的demos,这里的test_example_0