import asyncio import threading from operator import itemgetter from time import perf_counter as time import sqlite3 try: import pyodbc # running on Windows pyodbc_ok = True except ImportError: import psycopg2 # running on linux pyodbc_ok = False connection_list = [] connections_active = [] connection_lock = threading.Lock() def get_connection(): with connection_lock: try: # look for an inactive connection pos = connections_active.index(False) except ValueError: # if not found, create 10 more pos = len(connections_active) add_connections(10) conn = connection_list[pos] connections_active[pos] = True return conn def add_connections(n): for _ in range(n): if pyodbc_ok: conn = pyodbc.connect( driver='sql server', server='np:(local)', database='db', uid='sa', pwd='password') # conn = sqlite3.connect('/sqlite_db', check_same_thread=False) else: conn = psycopg2.connect(database='db') # conn = sqlite3.connect('/home/frank/sqlite_db', check_same_thread=False) connection_list.append(conn) connections_active.append(False) def release_connection(conn): # make connection available for reuse pos = connection_list.index(conn) connections_active[pos] = False def close_all_connections(): for conn in connection_list: conn.close() # actually close connection def db_task(pos): start = time() conn = get_connection() cur = conn.cursor() cur.execute('SELECT * FROM ar_trans') rows = cur.fetchall() release_connection(conn) end = time() return (pos, start, len(rows), end) async def run_tasks(): try: while True: print('start') tasks = [loop.run_in_executor( None, db_task, '{:>03}'.format(pos)) for pos in range(25)] await asyncio.wait(tasks) # results = [task.result() for task in tasks] # for result in sorted(results, key=itemgetter(3)): # print('{} {:.6f} {} {:.6f}'.format(*result)) print('done') await asyncio.sleep(10) # run every 10 seconds except asyncio.CancelledError: # respond to cancel() in shutdown() below for task in tasks: task.cancel() print('cancel tasks') async def counter(): cnt = 0 try: while True: cnt += 1 print(cnt, '{:.6f}'.format(time())) await asyncio.sleep(1) except asyncio.CancelledError: print('cancel counter') async def shutdown(loop, cnt, tsk): cnt.cancel() tsk.cancel() await asyncio.wait([cnt, tsk], loop=loop) close_all_connections() loop.stop() def stop(loop, cnt, tsk): input('Press to stop\n') asyncio.run_coroutine_threadsafe(shutdown(loop, cnt, tsk), loop) loop = asyncio.get_event_loop() cnt = asyncio.ensure_future(counter()) tsk = asyncio.ensure_future(run_tasks()) threading.Thread(target=stop, args=(loop, cnt, tsk)).start() loop.run_forever()