Asyncio Queue: Timeout Behavior and Error Handling
A practical guide to asyncio.Queue timeout behavior, error handling with QueueFull/QueueEmpty, graceful shutdown patterns, and detection techniques for production async code.
asyncio.Queue is Python’s built-in async queue, but it has a critical gap: get() and put() don’t support timeouts directly. A bare await queue.get() blocks forever — no timeout, no escape hatch. This guide covers the timeout and error handling patterns you need for production queue code, from asyncio.wait_for wrappers to graceful shutdown with sentinels.
How to Detect This in Your Code
Before diving into the fix, here’s how to find the dangerous patterns in your own codebase:
Detection Checklist
| Pattern | grep Command | Risk |
|---|---|---|
Bare await queue.get() | grep -rn "await queue\.get()" src/ | ⚠️ Hung consumer — blocks forever if queue empties |
Bare await queue.put() | grep -rn "await queue\.put(" src/ | ⚠️ Deadlocked producer — blocks forever if queue is full |
task_done() outside finally | grep -rn "task_done()" src/ | ⚠️ Silent join() deadlock — processing error skips the call |
| No sentinel on shutdown | grep -rn "queue\.put(None)" src/ | ⚠️ Graceful shutdown won’t work — consumers hang on empty queue |
Quick Fix Snippet
Replace every bare await queue.get() with a timeout-safe version:
import asyncio
TIMEOUT = 5.0 # Match to your task's SLA
async def safe_queue_get(queue):
try:
return await asyncio.wait_for(queue.get(), timeout=TIMEOUT)
except asyncio.TimeoutError:
print(f"[WARN] Queue get timed out after {TIMEOUT}s")
return None
Common Offenders in Real Code
- WebSocket message handlers:
await message_queue.get()inside a consumer loop — if the WebSocket disconnects, the handler hangs instead of timing out and reconnecting. - Task worker pools:
await task_queue.get()in awhile True:loop — when the producer finishes, workers hang forever instead of checking for a sentinel. - Batch processors:
await queue.join()after processing — if any worker skipstask_done()after an exception,join()blocks the main thread indefinitely. Always wraptask_done()infinally.
See Python asyncio.Queue docs and asyncio.wait_for docs for the official API reference.
The Problem: No Direct Timeout in get()/put()
Unlike Python’s queue.Queue, asyncio.Queue doesn’t support timeouts on get() or put() directly.
import asyncio
async def no_timeout_demo():
queue = asyncio.Queue(maxsize=1)
await queue.put("first")
# This blocks forever if no one consumes the item
# await queue.get() # No timeout supported directly
# This will also block until space is available
# await queue.put("second") # No timeout either
asyncio.run(no_timeout_demo())
Pattern 1: Using asyncio.wait_for() for Timeout
You have to wrap get()/put() in asyncio.wait_for() to apply timeouts:
import asyncio
async def timeout_demo():
queue = asyncio.Queue(maxsize=1)
await queue.put("first")
try:
# This times out after 0.1s if nothing is available
item = await asyncio.wait_for(queue.get(), timeout=0.1)
print(f"Got item: {item}")
except asyncio.TimeoutError:
print("Get timed out")
try:
# This times out if queue is full
await asyncio.wait_for(queue.put("second"), timeout=0.1)
print("Put succeeded")
except asyncio.TimeoutError:
print("Put timed out")
asyncio.run(timeout_demo())
Pattern 2: Handling QueueFull and QueueEmpty Errors
Even without timeouts, these exceptions can be raised:
import asyncio
async def error_handling_demo():
queue = asyncio.Queue(maxsize=1)
await queue.put("item")
try:
# This raises QueueFull
await queue.put_nowait("another")
except asyncio.QueueFull:
print("Queue is full")
# Remove the item to make room
item = await queue.get()
print(f"Got: {item}")
try:
# This raises QueueEmpty
await queue.get_nowait()
except asyncio.QueueEmpty:
print("Queue is empty")
asyncio.run(error_handling_demo())
Pattern 3: Graceful Shutdown with Queue
Proper shutdown requires careful use of None sentinels and timeout handling:
import asyncio
async def graceful_shutdown_demo():
queue = asyncio.Queue(maxsize=5)
async def producer():
for i in range(5):
await queue.put(f"item-{i}")
await asyncio.sleep(0.1)
# Send sentinels for graceful shutdown
for _ in range(3): # number of consumers
await queue.put(None)
async def consumer(name):
while True:
try:
# Use timeout to avoid hanging forever
item = await asyncio.wait_for(queue.get(), timeout=1.0)
if item is None:
print(f"Consumer {name} shutting down")
break
print(f"Consumer {name} got {item}")
await asyncio.sleep(0.2)
except asyncio.TimeoutError:
print(f"Consumer {name} timed out, shutting down")
break
tasks = [
asyncio.create_task(producer()),
asyncio.create_task(consumer("A")),
asyncio.create_task(consumer("B"))
]
await asyncio.gather(*tasks)
# asyncio.run(graceful_shutdown_demo())
Edge Cases I Missed
put_nowaitandget_nowait: These will error immediately on queue full/empty, unlike theirawaitcounterparts which block.asyncio.wait_forvsasyncio.timeout:wait_foris a function;asyncio.timeoutis a context manager. Both work butwait_foris more common in older code.- Timeout on
task_done():join()waits fortask_done()— aTimeoutErrorhere can be subtle if you don’t catch it.
Verdict
The asyncio.Queue API is consistent with queue.Queue in terms of what it does but more constrained — no direct timeout support. This requires either wrapping in timeouts or relying on task_done() semantics to manage lifecycles. My understanding of blocking behavior is correct, but I underestimated the need for explicit timeout handling.
How to Apply This to Existing Code
| Pattern | Before | After | Benefit |
|---|---|---|---|
| Blocking get() | item = await queue.get() | item = await asyncio.wait_for(queue.get(), timeout=5.0) | Prevents hung consumers |
| Non-blocking check | try: item = queue.get_nowait() | Same, but wrap in while not queue.empty(): loop | Explicit polling without blocking |
| Graceful shutdown | No sentinel handling | Send None per consumer + timeout wrapper on get() | Reliable shutdown without orphan tasks |
| task_done() safety | queue.task_done() on success only | finally: queue.task_done() | Prevents join() deadlocks |
| Python 3.11+ | wait_for(queue.get(), timeout=5) | async with asyncio.timeout(5): item = await queue.get() | Cleaner scoping for multiple operations |
Migration Steps for a Real Codebase
-
Identify all bare
await queue.get()calls — grep forawait queue.get()andawait queue.put(). Every call that doesn’t wrap in a timeout is a potential hang point. -
Wrap in
wait_forwith a reasonable timeout — match the timeout to your task’s SLA. For consumer loops, start with 5x the expected response time. -
Add the
finally: task_done()pattern — if anyget()→ process →task_done()chain lacks afinally, the queue’sjoin()can hang after the first processing error. This is the single most common silent bug in production queue code. -
Test with forced failures — inject
asyncio.TimeoutErrorinto one consumer and verify the pipeline recovers. Withouttask_done()infinally, the test reveals the hang immediately.
Key Takeaways
- Wrap every
queue.get()andqueue.put()inasyncio.wait_for(timeout=N)— bare queue operations block indefinitely. A timeout layer prevents hung consumers and deadlocked pipelines. Works across Python 3.7+. - For Python 3.11+, prefer
async with asyncio.timeout(N):as a context manager — cleaner scoping than nestingwait_forcalls, especially when multiple async operations share the same deadline. - Use
get_nowait()/put_nowait()for non-blocking checks — these raiseQueueEmpty/QueueFullimmediately. Ideal for polling patterns where blocking would waste cycles. Combine withawait asyncio.sleep(0)for cooperative polling. - Always call
task_done()in afinallyblock — if a consumer errors betweenget()andtask_done(),queue.join()hangs forever. This is the most common silent bug in queue-based async code. - Send one
Nonesentinel per consumer for graceful shutdown — wrapget()in a timeout as a safety net in case a sentinel is lost or the producer crashes mid-way.