Async/Await in Python: Patterns Beyond the Basics

Exploring structured concurrency, task groups, and error propagation in Python asyncio — with testable code snippets.

I’ve been writing Python async code for years, but I keep hitting the same edge cases: silently swallowed exceptions, tasks that outlive their scope, and dangling coroutines. This post tests my understanding of three patterns that fix these issues.

The Problem: Fire-and-Forget Is a Leak

Starting a task without tracking it is the most common async antipattern:

import asyncio

async def leaky_example():
    # This task runs detached - if it errors, you never know
    asyncio.create_task(background_work())
    await asyncio.sleep(0.1)
    print("Done, but was background_work really done?")

async def background_work():
    await asyncio.sleep(0.5)
    raise ValueError("Something went wrong in background!")

asyncio.run(leaky_example())

The ValueError is silently swallowed. The process exits cleanly. You’d never know.

Pattern 1: Task Groups (Python 3.11+)

TaskGroup enforces structured concurrency — all child tasks complete (or error) before the group exits:

import asyncio

async def task_group_demo():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(worker("A", 0.3))
        tg.create_task(worker("B", 0.5))
        tg.create_task(worker("C", 0.1))
    print("All tasks complete")

async def worker(name: str, delay: float):
    await asyncio.sleep(delay)
    print(f"Worker {name} done")
    return name

# asyncio.run(task_group_demo())

If any child raises, all siblings are cancelled immediately and the exception propagates. No silent failures.

Pattern 2: Timeout with asyncio.timeout (3.11+)

Old-style asyncio.wait_for() wrapped the entire coroutine. asyncio.timeout is a context manager — cleaner scoping:

import asyncio

async def timeout_demo():
    try:
        async with asyncio.timeout(1.0):
            result = await slow_operation()
            print(f"Got result: {result}")
    except TimeoutError:
        print("Operation timed out")

async def slow_operation():
    await asyncio.sleep(3)
    return 42

# asyncio.run(timeout_demo())

One gotcha I missed: asyncio.timeout(0) is immediate timeout, but asyncio.timeout(None) is no timeout. Using None when you think you passed a float creates subtle bugs.

Pattern 3: Async Queue With Backpressure

Producer-consumer with controlled concurrency using asyncio.Queue:

import asyncio

async def queue_demo():
    queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)

    async def producer():
        for i in range(10):
            item = f"item-{i}"
            await queue.put(item)
            print(f"Produced {item}")
        await queue.put(None)  # sentinel

    async def consumer(n: int):
        while True:
            item = await queue.get()
            if item is None:
                await queue.put(None)  # pass sentinel for other consumers
                break
            await asyncio.sleep(0.1)  # simulate work
            print(f"Consumer {n} processed {item}")

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        tg.create_task(consumer(1))
        tg.create_task(consumer(2))

# asyncio.run(queue_demo())

The sentinel pattern (None as poison pill) works for N consumers, but you need N sentinels. A common bug: sending one sentinel for three consumers — two hang forever.

Edge Cases I Missed

  1. CancelledError rescue: TaskGroup re-raises ExceptionGroup. You need except* (3.11+) to handle individual exceptions in the group.
  2. Queue maxsize=0: asyncio.Queue(0) is unbounded. You must use maxsize for backpressure.
  3. Shield vs TaskGroup: asyncio.shield() prevents cancellation, but inside a TaskGroup, if a sibling raises, shielded tasks still get cancelled. Shield doesn’t protect you from structured concurrency.

Verdict

My understanding of structured concurrency was superficial. I knew TaskGroup existed but didn’t understand ExceptionGroup propagation. Pattern 1 alone fixes 80% of the async bugs I’ve encountered in production. [1]

Score: 7.5/10 — solid on patterns, weak on edge cases (ExceptionGroup handling).

Key Takeaways

  • Use TaskGroup (3.11+) as your default concurrency primitive: Structured concurrency ensures no task is orphaned and all exceptions propagate. It replaces manual asyncio.gather() + cancellation boilerplate. Use except* ExceptionGroup to handle partial failures when some tasks succeed and others fail.
  • Prefer asyncio.timeout() context manager over wait_for(): The context manager scopes the deadline to a block of code rather than a single await, reducing nesting. Use wait_for() only when targeting a single coroutine in pre-3.11 codebases.
  • Sentinel count must match consumer count: One None sentinel signals one consumer to stop. For N consumers, send N sentinels. This is the most common queue shutdown bug — one sentinel for three consumers leaves two hanging forever.
  • Validate timeout values at call sites: asyncio.timeout(None) means infinite wait; asyncio.timeout(0) means immediate timeout. A None passed where you expected an integer creates a silent hang. Always coerce or guard dynamic timeout values.
  • asyncio.shield() does not protect from TaskGroup cancellation: If a sibling task raises inside a TaskGroup, all children — including shielded ones — are cancelled. Shield only protects against external task.cancel(), not structured concurrency cleanup.

Migration Guide: Async Code Audit Checklist

If you’re migrating a codebase to structured concurrency, run through this checklist:

  1. Replace all asyncio.create_task() with TaskGroup.create_task() inside existing async context managers. This catches orphaned tasks immediately — any create_task() call not wrapped in a TaskGroup is a candidate for task leaks. The Python asyncio docs show the migration pattern from gather to TaskGroup.

  2. Audit wait_for() calls for asyncio.timeout() replacement: Each wait_for() wrapping multiple awaits should become a timeout() context manager. Single-coroutine wait_for() calls can stay for pre-3.11 compatibility.

  3. Check queue shutdown logic for sentinel count: Every producer-consumer pattern using sentinels needs exactly-as-many sentinels as consumers. Add a test case where one consumer processes zero items (sentinel arrives immediately) to verify all consumers terminate.

  4. Add except* ExceptionGroup to TaskGroup handlers: Existing except Exception won’t catch exception groups. Wrap TaskGroup blocks in try/except* ExceptionGroup and inspect .exceptions to handle partial failures correctly.

  5. Verify shield usage inside TaskGroups: If your code uses asyncio.shield() inside a structured concurrency context, ensure the shielded task isn’t relying on parent-scope resource cleanup — sibling cancellation will still terminate it.

References

  • [1] (citation needed)