Asyncio Queue: Timeout Behavior and Error Handling

A practical guide to asyncio.Queue timeout behavior, error handling with QueueFull/QueueEmpty, graceful shutdown patterns, and detection techniques for production async code.

asyncio.Queue is Python’s built-in async queue, but it has a critical gap: get() and put() don’t support timeouts directly. A bare await queue.get() blocks forever — no timeout, no escape hatch. This guide covers the timeout and error handling patterns you need for production queue code, from asyncio.wait_for wrappers to graceful shutdown with sentinels.

How to Detect This in Your Code

Before diving into the fix, here’s how to find the dangerous patterns in your own codebase:

Detection Checklist

Patterngrep CommandRisk
Bare await queue.get()grep -rn "await queue\.get()" src/⚠️ Hung consumer — blocks forever if queue empties
Bare await queue.put()grep -rn "await queue\.put(" src/⚠️ Deadlocked producer — blocks forever if queue is full
task_done() outside finallygrep -rn "task_done()" src/⚠️ Silent join() deadlock — processing error skips the call
No sentinel on shutdowngrep -rn "queue\.put(None)" src/⚠️ Graceful shutdown won’t work — consumers hang on empty queue

Quick Fix Snippet

Replace every bare await queue.get() with a timeout-safe version:

import asyncio

TIMEOUT = 5.0  # Match to your task's SLA

async def safe_queue_get(queue):
    try:
        return await asyncio.wait_for(queue.get(), timeout=TIMEOUT)
    except asyncio.TimeoutError:
        print(f"[WARN] Queue get timed out after {TIMEOUT}s")
        return None

Common Offenders in Real Code

  • WebSocket message handlers: await message_queue.get() inside a consumer loop — if the WebSocket disconnects, the handler hangs instead of timing out and reconnecting.
  • Task worker pools: await task_queue.get() in a while True: loop — when the producer finishes, workers hang forever instead of checking for a sentinel.
  • Batch processors: await queue.join() after processing — if any worker skips task_done() after an exception, join() blocks the main thread indefinitely. Always wrap task_done() in finally.

See Python asyncio.Queue docs and asyncio.wait_for docs for the official API reference.

The Problem: No Direct Timeout in get()/put()

Unlike Python’s queue.Queue, asyncio.Queue doesn’t support timeouts on get() or put() directly.

import asyncio

async def no_timeout_demo():
    queue = asyncio.Queue(maxsize=1)
    await queue.put("first")

    # This blocks forever if no one consumes the item
    # await queue.get() # No timeout supported directly

    # This will also block until space is available
    # await queue.put("second") # No timeout either

asyncio.run(no_timeout_demo())

Pattern 1: Using asyncio.wait_for() for Timeout

You have to wrap get()/put() in asyncio.wait_for() to apply timeouts:

import asyncio

async def timeout_demo():
    queue = asyncio.Queue(maxsize=1)
    await queue.put("first")

    try:
        # This times out after 0.1s if nothing is available
        item = await asyncio.wait_for(queue.get(), timeout=0.1)
        print(f"Got item: {item}")
    except asyncio.TimeoutError:
        print("Get timed out")

    try:
        # This times out if queue is full
        await asyncio.wait_for(queue.put("second"), timeout=0.1)
        print("Put succeeded")
    except asyncio.TimeoutError:
        print("Put timed out")

asyncio.run(timeout_demo())

Pattern 2: Handling QueueFull and QueueEmpty Errors

Even without timeouts, these exceptions can be raised:

import asyncio

async def error_handling_demo():
    queue = asyncio.Queue(maxsize=1)
    await queue.put("item")

    try:
        # This raises QueueFull
        await queue.put_nowait("another")
    except asyncio.QueueFull:
        print("Queue is full")

    # Remove the item to make room
    item = await queue.get()
    print(f"Got: {item}")

    try:
        # This raises QueueEmpty
        await queue.get_nowait()
    except asyncio.QueueEmpty:
        print("Queue is empty")

asyncio.run(error_handling_demo())

Pattern 3: Graceful Shutdown with Queue

Proper shutdown requires careful use of None sentinels and timeout handling:

import asyncio

async def graceful_shutdown_demo():
    queue = asyncio.Queue(maxsize=5)

    async def producer():
        for i in range(5):
            await queue.put(f"item-{i}")
            await asyncio.sleep(0.1)
        # Send sentinels for graceful shutdown
        for _ in range(3):  # number of consumers
            await queue.put(None)

    async def consumer(name):
        while True:
            try:
                # Use timeout to avoid hanging forever
                item = await asyncio.wait_for(queue.get(), timeout=1.0)
                if item is None:
                    print(f"Consumer {name} shutting down")
                    break
                print(f"Consumer {name} got {item}")
                await asyncio.sleep(0.2)
            except asyncio.TimeoutError:
                print(f"Consumer {name} timed out, shutting down")
                break

    tasks = [
        asyncio.create_task(producer()),
        asyncio.create_task(consumer("A")),
        asyncio.create_task(consumer("B"))
    ]

    await asyncio.gather(*tasks)

# asyncio.run(graceful_shutdown_demo())

Edge Cases I Missed

  1. put_nowait and get_nowait: These will error immediately on queue full/empty, unlike their await counterparts which block.
  2. asyncio.wait_for vs asyncio.timeout: wait_for is a function; asyncio.timeout is a context manager. Both work but wait_for is more common in older code.
  3. Timeout on task_done(): join() waits for task_done() — a TimeoutError here can be subtle if you don’t catch it.

Verdict

The asyncio.Queue API is consistent with queue.Queue in terms of what it does but more constrained — no direct timeout support. This requires either wrapping in timeouts or relying on task_done() semantics to manage lifecycles. My understanding of blocking behavior is correct, but I underestimated the need for explicit timeout handling.

How to Apply This to Existing Code

PatternBeforeAfterBenefit
Blocking get()item = await queue.get()item = await asyncio.wait_for(queue.get(), timeout=5.0)Prevents hung consumers
Non-blocking checktry: item = queue.get_nowait()Same, but wrap in while not queue.empty(): loopExplicit polling without blocking
Graceful shutdownNo sentinel handlingSend None per consumer + timeout wrapper on get()Reliable shutdown without orphan tasks
task_done() safetyqueue.task_done() on success onlyfinally: queue.task_done()Prevents join() deadlocks
Python 3.11+wait_for(queue.get(), timeout=5)async with asyncio.timeout(5): item = await queue.get()Cleaner scoping for multiple operations

Migration Steps for a Real Codebase

  1. Identify all bare await queue.get() calls — grep for await queue.get() and await queue.put(). Every call that doesn’t wrap in a timeout is a potential hang point.

  2. Wrap in wait_for with a reasonable timeout — match the timeout to your task’s SLA. For consumer loops, start with 5x the expected response time.

  3. Add the finally: task_done() pattern — if any get() → process → task_done() chain lacks a finally, the queue’s join() can hang after the first processing error. This is the single most common silent bug in production queue code.

  4. Test with forced failures — inject asyncio.TimeoutError into one consumer and verify the pipeline recovers. Without task_done() in finally, the test reveals the hang immediately.

Key Takeaways

  • Wrap every queue.get() and queue.put() in asyncio.wait_for(timeout=N) — bare queue operations block indefinitely. A timeout layer prevents hung consumers and deadlocked pipelines. Works across Python 3.7+.
  • For Python 3.11+, prefer async with asyncio.timeout(N): as a context manager — cleaner scoping than nesting wait_for calls, especially when multiple async operations share the same deadline.
  • Use get_nowait() / put_nowait() for non-blocking checks — these raise QueueEmpty/QueueFull immediately. Ideal for polling patterns where blocking would waste cycles. Combine with await asyncio.sleep(0) for cooperative polling.
  • Always call task_done() in a finally block — if a consumer errors between get() and task_done(), queue.join() hangs forever. This is the most common silent bug in queue-based async code.
  • Send one None sentinel per consumer for graceful shutdown — wrap get() in a timeout as a safety net in case a sentinel is lost or the producer crashes mid-way.