python -version : 3.6.6
import asyncio
import aiomysql
import sys
async def test_example(loop):
pool = await aiomysql.create_pool(host='127.0.0.1', port=3308,
user='finger', password='fypingfyping',
db='userlogin_test', loop=loop)
mgr = await pool.acquire()
aexit = aiomysql.connection.Connection.__aexit__
aenter = await aiomysql.connection.Connection.__aenter__(mgr)
conn = aenter
try:
mg = await conn.cursor()
aexit_ = aiomysql.cursors.Cursor.__aexit__
aenter_ = await aiomysql.cursors.Cursor.__aenter__(mg)
cur = aenter_
try:
await cur.execute("select 48")
print(cur.description)
(r,) = await cur.fetchone()
assert r==48
except:
if not await aexit_(mg,*sys.exc_info()):
raise
else:
await aexit_(mg,None,None,None)
except :
if not await aexit(mgr,*sys.exc_info()):
raise
else:
await aexit(mgr,None,None,None)
pool.close()
await pool.wait_closed()
async def test_example_0(loop):
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='',
db='mysql', loop=loop)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
print(cur.description)
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))
源代码是 aiomysql的demos,这里的test_example_0