Use context managers for aio connectors

Not sure why we didn't do this initially, but this ensures that we
always close all connectors properly, and also gives much clearer scope
regarding their life-cycles.
This commit is contained in:
Hugo Osvaldo Barrera 2021-08-16 19:15:18 +02:00
parent 54e829262d
commit cf1d082628
2 changed files with 52 additions and 63 deletions

View file

@ -63,8 +63,5 @@ async def aio_session(event_loop):
@pytest.fixture @pytest.fixture
async def aio_connector(event_loop): async def aio_connector(event_loop):
conn = aiohttp.TCPConnector(limit_per_host=16) async with aiohttp.TCPConnector(limit_per_host=16) as conn:
try:
yield conn yield conn
finally:
await conn.close()

View file

@ -127,27 +127,25 @@ def sync(ctx, collections, force_delete):
from .tasks import sync_collection from .tasks import sync_collection
async def main(collections): async def main(collections):
conn = aiohttp.TCPConnector(limit_per_host=16) async with aiohttp.TCPConnector(limit_per_host=16) as conn:
tasks = []
tasks = [] for pair_name, collections in collections:
for pair_name, collections in collections: async for collection, config in prepare_pair(
async for collection, config in prepare_pair( pair_name=pair_name,
pair_name=pair_name, collections=collections,
collections=collections, config=ctx.config,
config=ctx.config, connector=conn,
connector=conn, ):
): tasks.append(
tasks.append( sync_collection(
sync_collection( collection=collection,
collection=collection, general=config,
general=config, force_delete=force_delete,
force_delete=force_delete, connector=conn,
connector=conn, )
) )
)
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
await conn.close()
asyncio.run(main(collections)) asyncio.run(main(collections))
@ -166,28 +164,26 @@ def metasync(ctx, collections):
from .tasks import prepare_pair from .tasks import prepare_pair
async def main(collections): async def main(collections):
conn = aiohttp.TCPConnector(limit_per_host=16) async with aiohttp.TCPConnector(limit_per_host=16) as conn:
for pair_name, collections in collections: for pair_name, collections in collections:
collections = prepare_pair( collections = prepare_pair(
pair_name=pair_name, pair_name=pair_name,
collections=collections, collections=collections,
config=ctx.config, config=ctx.config,
connector=conn, connector=conn,
) )
await asyncio.gather( await asyncio.gather(
*[ *[
metasync_collection( metasync_collection(
collection=collection, collection=collection,
general=config, general=config,
connector=conn, connector=conn,
) )
async for collection, config in collections async for collection, config in collections
] ]
) )
await conn.close()
asyncio.run(main(collections)) asyncio.run(main(collections))
@ -213,18 +209,15 @@ def discover(ctx, pairs, list):
config = ctx.config config = ctx.config
async def main(): async def main():
conn = aiohttp.TCPConnector(limit_per_host=16) async with aiohttp.TCPConnector(limit_per_host=16) as conn:
for pair_name in pairs or config.pairs:
for pair_name in pairs or config.pairs: await discover_collections(
await discover_collections( status_path=config.general["status_path"],
status_path=config.general["status_path"], pair=config.get_pair(pair_name),
pair=config.get_pair(pair_name), from_cache=False,
from_cache=False, list_collections=list,
list_collections=list, connector=conn,
connector=conn, )
)
await conn.close()
asyncio.run(main()) asyncio.run(main())
@ -267,14 +260,13 @@ def repair(ctx, collection, repair_unsafe_uid):
click.confirm("Do you want to continue?", abort=True) click.confirm("Do you want to continue?", abort=True)
async def main(): async def main():
conn = aiohttp.TCPConnector(limit_per_host=16) async with aiohttp.TCPConnector(limit_per_host=16) as conn:
await repair_collection( await repair_collection(
ctx.config, ctx.config,
collection, collection,
repair_unsafe_uid=repair_unsafe_uid, repair_unsafe_uid=repair_unsafe_uid,
connector=conn, connector=conn,
) )
await conn.close()
asyncio.run(main()) asyncio.run(main())