Python asyncio의 이벤트 루프 내부 동작부터 gather·wait·TaskGroup 선택 기준, aiohttp·asyncpg 실전 패턴, Semaphore·Queue 동시성 제어, 프로덕션에서 자주 발생하는 5가지 실수와 디버깅 전략까지 코드 중심으로 정리.
Python asyncio는 단순한 비동기 라이브러리가 아니다. 이벤트 루프, 코루틴, Task, Future가 맞물려 돌아가는 동시성 런타임이다. 제대로 이해하지 않으면 "왜 빠르지 않지?"라는 질문만 남는다. 이 글은 asyncio의 내부 동작 원리부터 프로덕션에서 실제로 쓰이는 패턴까지, 코드 중심으로 정리한다.
이 글이 필요한 사람: requests로 짠 I/O 코드를 aiohttp로 전환하려는 개발자, asyncio를 쓰는데 왜 빠른지 모르는 팀원, gather·wait·TaskGroup 중 뭘 써야 할지 헷갈리는 엔지니어, 프로덕션 asyncio 앱에서 에러가 silently 사라지는 현상을 겪은 개발자.
asyncio가 필요한 이유 — 동기 코드의 한계
Python으로 웹 크롤러, API 클라이언트, DB 쿼리를 순차 실행하면 CPU는 대부분의 시간을 "기다리는 데" 쓴다. 100개의 HTTP 요청을 순서대로 날리면 첫 번째 응답을 기다리는 동안 나머지 99개는 시작도 못 한다. 네트워크 레이턴시가 평균 50ms라면 100개 요청 처리에 최소 5초가 걸린다.
이 문제를 해결하는 방법은 세 가지다. 멀티스레딩은 스레드마다 독립된 실행 흐름을 만들지만, Python의 GIL(Global Interpreter Lock) 때문에 CPU 연산은 실질적으로 병렬화되지 않는다. I/O 대기 중에는 GIL이 해제돼 스레드가 교대로 실행되지만, 스레드 수가 늘어날수록 컨텍스트 스위칭 비용과 메모리 오버헤드가 증가한다. 멀티프로세싱은 GIL을 우회하지만 프로세스 간 통신 비용이 크고, I/O 바운드 워크로드에는 과도하다. asyncio는 단일 스레드 안에서 협력적 멀티태스킹(cooperative multitasking)을 구현한다. 코루틴이 I/O 대기 시점에 자발적으로 제어권을 이벤트 루프에 넘기고, 루프는 준비된 다른 코루틴을 실행한다.
결론부터 말하면: asyncio는 I/O 바운드 워크로드에서 스레드보다 적은 메모리로 더 많은 동시 연결을 처리한다. CPU 바운드 작업에는 asyncio가 답이 아니다. 그 경우엔 multiprocessing 또는 concurrent.futures.ProcessPoolExecutor를 써야 한다.
단일 스레드에서 여러 코루틴이 이벤트 루프를 통해 협력적으로 실행되는 구조
이벤트 루프의 작동 원리
이벤트 루프는 asyncio의 심장이다. 내부적으로 실행 대기 중인 코루틴 큐를 관리하고, OS의 I/O 멀티플렉싱 메커니즘(Linux의 epoll, macOS의 kqueue, Windows의 IOCP)을 활용해 I/O 이벤트를 감지한다. 코루틴이 await으로 I/O 작업을 요청하면 해당 파일 디스크립터를 루프에 등록하고 제어권을 반납한다. 루프는 epoll_wait을 호출해 준비된 I/O를 기다리다가, 이벤트가 오면 해당 코루틴을 재개한다.
Python 3.10부터 asyncio.run()이 안정화되면서 이벤트 루프를 직접 생성하거나 관리할 필요가 사라졌다. 프로덕션 코드에서 loop.run_until_complete()나 asyncio.get_event_loop()를 직접 사용하는 것은 더 이상 권장되지 않는다.
이벤트 루프 기초 — asyncio.run()이 정답
import asyncio
import time
async def fetch_data(name: str, delay: float) -> str:
print(f"[{name}] 시작")
await asyncio.sleep(delay) # I/O 대기 시뮬레이션
print(f"[{name}] 완료 ({delay}s)")
return f"{name} 결과"
async def main():
start = time.perf_counter()
# 순차 실행: 총 3초 소요
result1 = await fetch_data("A", 1.0)
result2 = await fetch_data("B", 2.0)
elapsed = time.perf_counter() - start
print(f"순차 실행: {elapsed:.2f}s") # 약 3.0s
asyncio.run(main())
# asyncio.run()은 내부적으로:
# 1. 새 이벤트 루프 생성
# 2. main() 코루틴 실행
# 3. 루프 종료 및 정리 (pending tasks 취소, 소켓 닫기 등)
async/await 핵심 패턴 — 코루틴, Task, Future
async def로 정의된 함수는 호출 시 코루틴 객체를 반환한다. 호출 자체로는 아무 것도 실행되지 않는다. 이벤트 루프에 넘겨져 await되거나, asyncio.create_task()로 Task로 감싸져야 실제로 실행된다.
코루틴(Coroutine), Task, Future의 차이를 명확히 알아야 한다. 코루틴은 실행 가능한 객체지만 스케줄링되지 않은 상태다. Task는 코루틴을 이벤트 루프에 등록해 즉시 실행을 예약한 것이다. Future는 아직 완료되지 않은 연산의 결과를 담는 컨테이너로, Task는 Future의 서브클래스다. 대부분의 경우 직접 Future를 생성할 일은 없다. 저수준 콜백 기반 코드와 코루틴을 연결할 때 사용된다.
코루틴 vs Task — 실행 시점의 차이
import asyncio
async def work(name: str) -> str:
print(f"{name}: 실행 시작")
await asyncio.sleep(1)
return f"{name}: 완료"
async def main():
# 코루틴 객체 생성 (아직 실행 안 됨)
coro = work("코루틴")
print("코루틴 객체 생성됨 (실행 전)")
# Task 생성: 이벤트 루프에 즉시 등록, 백그라운드에서 시작
task = asyncio.create_task(work("Task"))
print("Task 등록됨 (백그라운드 실행 시작)")
# 다른 작업 수행 중에도 task는 실행됨
await asyncio.sleep(0) # 이벤트 루프에 제어권 양보 → task가 시작됨
# 코루틴은 여기서 처음 실행됨
result_coro = await coro
result_task = await task
print(result_coro)
print(result_task)
asyncio.run(main())
동시성 패턴 — gather, wait, TaskGroup
여러 코루틴을 동시에 실행하는 방법은 세 가지다. 각각 용도가 다르다.
asyncio.gather()는 가장 일반적인 선택이다. 여러 코루틴/Task를 동시에 실행하고 모든 결과를 순서대로 리스트로 반환한다. return_exceptions=True를 사용하면 일부 코루틴에서 예외가 발생해도 나머지를 계속 실행한다. 기본값(False)에서는 첫 번째 예외 발생 시 즉시 전파되지만 나머지 Task는 취소되지 않는다 — 이것은 흔한 리소스 누수의 원인이다.
asyncio.wait()는 더 세밀한 제어가 필요할 때 쓴다. return_when 파라미터로 FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED를 지정할 수 있다. 타임아웃도 가능하다. 반환값이 (done, pending) 두 세트라는 점이 gather와 다르다.
asyncio.TaskGroup(Python 3.11+)은 구조적 동시성(structured concurrency)을 구현한다. 컨텍스트 매니저로 사용하며, 블록 내 모든 Task가 완료되거나 하나라도 예외가 발생하면 나머지를 자동으로 취소한다. 리소스 누수 없이 에러 처리가 가능해 최신 코드에서 가장 권장되는 방식이다.
gather vs wait vs TaskGroup 비교
import asyncio
from typing import Any
async def fetch(url: str, delay: float) -> str:
await asyncio.sleep(delay)
if "error" in url:
raise ValueError(f"fetch 실패: {url}")
return f"응답: {url}"
# ── 1. asyncio.gather ──────────────────────────────────────
async def use_gather():
results = await asyncio.gather(
fetch("api/users", 0.5),
fetch("api/posts", 1.0),
fetch("api/error", 0.3),
return_exceptions=True # 예외도 결과 리스트에 포함
)
for r in results:
if isinstance(r, Exception):
print(f"에러: {r}")
else:
print(r)
# ── 2. asyncio.wait ────────────────────────────────────────
async def use_wait():
tasks = {
asyncio.create_task(fetch("api/fast", 0.2)),
asyncio.create_task(fetch("api/slow", 2.0)),
}
done, pending = await asyncio.wait(
tasks,
timeout=0.5,
return_when=asyncio.FIRST_COMPLETED
)
for t in done:
print(f"완료: {t.result()}")
for t in pending:
t.cancel() # 타임아웃된 Task 직접 취소 필요
print("타임아웃으로 취소됨")
# ── 3. TaskGroup (Python 3.11+, 권장) ──────────────────────
async def use_taskgroup():
results: list[str] = []
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch("api/users", 0.5))
t2 = tg.create_task(fetch("api/posts", 1.0))
# 블록 종료 시 모든 task 완료 대기
# 예외 발생 시 나머지 task 자동 취소
results = [t1.result(), t2.result()]
except* ValueError as eg: # ExceptionGroup 처리
for e in eg.exceptions:
print(f"TaskGroup 내 예외: {e}")
return results
gather는 예외 발생 시 나머지 Task를 취소하지 않는다. TaskGroup은 자동으로 정리한다
실전 패턴 — aiohttp, asyncpg, aiofiles
asyncio의 가치는 실제 I/O 작업에서 드러난다. 세 가지 핵심 라이브러리를 알면 대부분의 프로덕션 시나리오를 커버할 수 있다.
aiohttp는 asyncio 기반 HTTP 클라이언트/서버다. requests 대비 비동기 HTTP 요청에서 압도적인 처리량을 보인다. ClientSession을 매 요청마다 새로 만들지 말고 반드시 재사용해야 한다. 세션 생성 자체가 커넥션 풀을 초기화하기 때문에 오버헤드가 크다.
asyncpg는 PostgreSQL 전용 asyncio 드라이버로, psycopg2 대비 쿼리 처리 성능이 3~5배 빠르다. 연결 풀(asyncpg.create_pool())을 애플리케이션 수명 동안 유지하는 패턴이 권장된다. aiofiles는 파일 I/O를 비동기로 처리한다. 파일 읽기/쓰기는 기본적으로 블로킹 I/O라 asyncio 루프를 블로킹할 수 있는데, aiofiles는 스레드풀을 통해 이를 논블로킹으로 래핑한다.
aiohttp — 동시 HTTP 요청 + 연결 풀 재사용
import asyncio
import aiohttp
from typing import Any
async def fetch_json(
session: aiohttp.ClientSession,
url: str
) -> dict[str, Any]:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()
async def fetch_all(urls: list[str]) -> list[dict]:
# ClientSession은 한 번만 생성 — 커넥션 풀 재사용
connector = aiohttp.TCPConnector(limit=50) # 동시 연결 상한
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_json(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"실패 [{url}]: {result}")
else:
successful.append(result)
return successful
# 100개 URL 동시 요청
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 101)]
results = asyncio.run(fetch_all(urls))
print(f"성공: {len(results)}개")
asyncpg — 연결 풀 + 트랜잭션
import asyncio
import asyncpg
async def setup_pool() -> asyncpg.Pool:
return await asyncpg.create_pool(
dsn="postgresql://user:password@localhost:5432/mydb",
min_size=5,
max_size=20,
command_timeout=60
)
async def get_users(pool: asyncpg.Pool, limit: int = 100) -> list[dict]:
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, email, created_at FROM users ORDER BY created_at DESC LIMIT $1",
limit
)
return [dict(row) for row in rows]
async def create_user_with_profile(
pool: asyncpg.Pool,
email: str,
name: str
) -> int:
async with pool.acquire() as conn:
async with conn.transaction(): # 트랜잭션 — 실패 시 자동 롤백
user_id = await conn.fetchval(
"INSERT INTO users (email) VALUES ($1) RETURNING id",
email
)
await conn.execute(
"INSERT INTO profiles (user_id, name) VALUES ($1, $2)",
user_id, name
)
return user_id
async def main():
pool = await setup_pool()
try:
users = await get_users(pool, limit=10)
print(f"유저 {len(users)}명 조회")
# 여러 유저 동시 생성
tasks = [
create_user_with_profile(pool, f"user{i}@example.com", f"유저{i}")
for i in range(10)
]
ids = await asyncio.gather(*tasks)
print(f"생성된 ID: {ids}")
finally:
await pool.close()
asyncio.run(main())
동시성 제어 — Semaphore, Lock, Queue
동시 요청 수를 제한하지 않으면 외부 API 속도 제한(rate limit)에 걸리거나, DB 커넥션 풀이 소진되거나, 메모리가 폭발한다. asyncio는 동시성 제어를 위한 프리미티브를 제공한다.
asyncio.Semaphore는 동시에 실행 가능한 코루틴 수를 제한한다. 1,000개 URL을 크롤링할 때 한 번에 50개씩만 요청하고 싶다면 Semaphore(50)을 쓴다. asyncio.Lock은 공유 자원에 대한 상호 배제를 보장한다. 캐시 딕셔너리를 여러 코루틴이 동시에 쓸 때 사용한다. asyncio.Queue는 생산자-소비자 패턴을 구현할 때 적합하다. 크롤러에서 URL을 큐에 넣고, 워커 풀이 큐를 소비하는 구조다.
Semaphore — 동시 요청 수 제한
import asyncio
import aiohttp
async def fetch_with_limit(
session: aiohttp.ClientSession,
sem: asyncio.Semaphore,
url: str
) -> str:
async with sem: # 세마포어 획득 — 동시 실행 수 제한
async with session.get(url) as resp:
return await resp.text()
async def crawl(urls: list[str], concurrency: int = 50) -> list[str]:
sem = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, sem, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
# ── Queue를 활용한 워커 풀 패턴 ────────────────────────────
async def producer(queue: asyncio.Queue, items: list[str]):
for item in items:
await queue.put(item)
# sentinel 값으로 워커 종료 신호
for _ in range(5): # 워커 수만큼
await queue.put(None)
async def worker(worker_id: int, queue: asyncio.Queue, results: list):
while True:
item = await queue.get()
if item is None: # 종료 신호
queue.task_done()
break
try:
# 실제 처리 로직
await asyncio.sleep(0.1)
results.append(f"worker{worker_id}: {item} 처리 완료")
finally:
queue.task_done()
async def process_with_worker_pool(items: list[str]) -> list[str]:
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
results: list[str] = []
workers = [asyncio.create_task(worker(i, queue, results)) for i in range(5)]
await producer(queue, items)
await asyncio.gather(*workers)
return results
Semaphore로 동시성 상한을 두고 Queue로 작업을 분배하는 프로덕션 패턴
프로덕션 고려사항 — 타임아웃, 취소, 디버깅
asyncio 앱이 프로덕션에서 자주 겪는 세 가지 문제가 있다. 무한 대기, 에러가 silently 사라지는 현상, 블로킹 코드가 이벤트 루프를 멈추는 현상이다.
타임아웃은 asyncio.wait_for()로 처리한다. 지정한 시간 내에 완료되지 않으면 asyncio.TimeoutError를 발생시키고 내부 Task를 취소한다. Python 3.11+에서는 asyncio.timeout() 컨텍스트 매니저를 사용할 수 있다.
블로킹 코드는 asyncio의 가장 흔한 함정이다. time.sleep(), requests.get(), 오래 걸리는 CPU 연산을 코루틴 안에서 직접 호출하면 이벤트 루프 전체가 블로킹된다. 블로킹 코드는 loop.run_in_executor()로 스레드풀에 오프로드해야 한다.
에러 은닉은 Task를 생성하고 결과를 await하지 않을 때 발생한다. Task에서 예외가 발생해도 아무도 await하지 않으면 예외가 소비되지 않고 사라진다. Python 3.11+에서는 asyncio.get_event_loop().set_exception_handler()로 처리되지 않은 Task 예외를 잡을 수 있다.
타임아웃, 블로킹 코드 오프로드, 에러 처리
import asyncio
import time
import requests # 블로킹 라이브러리
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
# ── 타임아웃 처리 ──────────────────────────────────────────
async def with_timeout():
# asyncio.wait_for: Python 3.8+
try:
result = await asyncio.wait_for(
some_slow_coroutine(),
timeout=5.0
)
except asyncio.TimeoutError:
print("5초 초과, 취소됨")
# asyncio.timeout: Python 3.11+ 권장
try:
async with asyncio.timeout(5.0):
result = await some_slow_coroutine()
except TimeoutError:
print("타임아웃")
async def some_slow_coroutine():
await asyncio.sleep(10)
return "완료"
# ── 블로킹 코드 오프로드 ───────────────────────────────────
async def run_blocking_io(url: str) -> str:
loop = asyncio.get_running_loop()
# requests.get은 블로킹 — 스레드풀에서 실행
response = await loop.run_in_executor(
executor,
lambda: requests.get(url, timeout=10)
)
return response.text
# CPU 바운드 작업도 동일하게 오프로드
async def run_cpu_intensive(data: list[int]) -> int:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, sum, data)
# ── Task 예외 처리 ─────────────────────────────────────────
async def may_fail():
await asyncio.sleep(1)
raise RuntimeError("예상치 못한 에러")
async def safe_task_usage():
# 나쁜 예: Task 생성 후 결과를 await하지 않으면 예외가 사라짐
# task = asyncio.create_task(may_fail()) # 예외 무시됨!
# 좋은 예 1: 나중에 await
task = asyncio.create_task(may_fail())
try:
await task
except RuntimeError as e:
print(f"Task 에러 처리: {e}")
# 좋은 예 2: done callback으로 에러 로깅
def handle_task_result(task: asyncio.Task):
if not task.cancelled() and task.exception():
print(f"[ERROR] Task 실패: {task.exception()}")
task2 = asyncio.create_task(may_fail())
task2.add_done_callback(handle_task_result)
await asyncio.sleep(2) # task2가 완료될 때까지 대기
디버깅 도구와 흔한 실수
asyncio는 디버깅이 어렵기로 유명하다. 비동기 스택 트레이스는 동기 코드보다 추적이 힘들고, 블로킹 코드가 이벤트 루프를 멈춰도 명확한 에러 메시지가 나오지 않는 경우가 많다.
디버그 모드를 활성화하면 asyncio가 내부 경고를 더 많이 출력한다. PYTHONASYNCIODEBUG=1 환경변수로 활성화하거나, asyncio.run(main(), debug=True)로 켤 수 있다. 느린 콜백(기본 100ms 초과)을 감지해 경고를 출력한다. asyncio의 가장 흔한 실수 5가지를 정리한다.
흔한 실수 5가지
코루틴을 await 없이 호출: my_coroutine()처럼 호출하면 코루틴 객체만 생성되고 실행되지 않는다. Python은 RuntimeWarning: coroutine was never awaited를 발생시킨다.
time.sleep() 사용: 코루틴 안에서 time.sleep(1)을 쓰면 이벤트 루프 전체가 1초 블로킹된다. 반드시 await asyncio.sleep(1)을 쓸 것.
ClientSession을 매 요청마다 새로 생성: aiohttp는 세션마다 커넥션 풀을 만든다. 루프 안에서 async with aiohttp.ClientSession() as session을 반복하면 성능이 급격히 저하된다.
gather의 예외 처리 누락: return_exceptions=False(기본값)일 때 하나의 예외가 발생하면 나머지 Task는 취소되지 않고 백그라운드에서 계속 실행된다. 이 Task들이 리소스를 점유할 수 있다.
async 함수를 동기 컨텍스트에서 직접 호출: 이미 실행 중인 이벤트 루프 안에서 asyncio.run()을 다시 호출하면 RuntimeError: This event loop is already running이 발생한다. Jupyter처럼 이미 루프가 있는 환경에서는 await를 직접 쓰거나 nest_asyncio를 사용한다.
asyncio 앱의 성능을 측정할 때는 time.perf_counter()와 함께 실행 중인 Task 수를 함께 모니터링한다. len(asyncio.all_tasks())가 지속적으로 증가한다면 Task 누수가 있는 것이다. 프로덕션 FastAPI 앱이라면 uvicorn이 이벤트 루프를 관리하므로 asyncio.run()을 사용하지 말고 async def 엔드포인트를 직접 작성하면 된다.
마지막으로 asyncio와 동기 코드를 섞는 법을 알아두자. 동기 함수에서 코루틴을 실행해야 한다면 asyncio.run()을 쓴다. 단, 이미 이벤트 루프가 실행 중인 환경에서는 asyncio.run()을 쓸 수 없다. 그 경우 loop.create_task()나 asyncio.ensure_future()를 써야 한다. FastAPI, Starlette, aiohttp 서버 환경이라면 모든 핸들러를 async def로 작성하는 것이 가장 단순하고 올바른 방법이다.
asyncio 도입 체크리스트
I/O 바운드 워크로드인가? (Y → asyncio, N → multiprocessing 검토)
모든 I/O 라이브러리가 asyncio 지원 버전인가? (requests → aiohttp, psycopg2 → asyncpg)