Catching Async Deadlocks in Python asyncio Before They Freeze Your App
Your asyncio application is running fine, then suddenly it stops doing anything. No exception, no traceback, no log output β just silence. You've hit an async deadlock, and Python isn't going to tell you where it came from.
Async deadlocks are harder to spot than their threading counterparts because there's no stack of blocked threads to inspect. Everything lives on a single event loop, and when that loop stalls, your whole app stalls with it.
What you'll learn
- How async deadlocks form in an event loop and why they're hard to see
- The most common patterns that cause them in real asyncio code
- How to detect a stalled event loop at runtime
- Debugging techniques using
asynciobuilt-ins and third-party tools - Defensive coding patterns that prevent deadlocks from forming in the first place
Prerequisites
You should be comfortable writing coroutines with async def and await, and know what the event loop does at a high level. All examples target Python 3.10+ where the asyncio API is stable and expressive.
How Async Deadlocks Actually Form
In threaded code, a deadlock happens when two threads each hold a lock the other needs. In asyncio, the mechanism is different but the result is the same: nothing makes progress.
The event loop runs one coroutine at a time. A coroutine yields control by hitting an await expression, which lets the loop pick up the next waiting task. A deadlock occurs when every runnable task is waiting for something that can only be unblocked by another task β and none of them can run.
The most common trigger is a coroutine that blocks the event loop entirely, starving all other tasks. When task A is waiting for task B to produce a result, and task B can never run because task A never yields, you're stuck.
Pattern 1: Calling Blocking Code Without an Executor
This is the single most common cause of a frozen asyncio app. You call a synchronous, blocking function directly inside a coroutine without wrapping it in an executor.
import asyncio
import time
async def fetch_data():
time.sleep(5) # blocks the entire event loop for 5 seconds
return "done"
async def main():
await asyncio.gather(
fetch_data(),
fetch_data(),
)
asyncio.run(main())
Both coroutines look concurrent, but time.sleep blocks the OS thread the event loop runs on. The loop cannot switch to the second coroutine until the first sleep finishes. Replace blocking calls with their async equivalents, or offload them to a thread pool:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
async def fetch_data():
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, time.sleep, 5)
return "done"
For CPU-bound work that takes real time, use ProcessPoolExecutor instead so you aren't competing with the GIL.
Pattern 2: Awaiting a Future That Nobody Will Resolve
A bare asyncio.Future is a promise that some other piece of code will call future.set_result(). If that code never runs β or runs after the future is already cancelled β any coroutine awaiting it waits forever.
import asyncio
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
# Forgot to schedule anything that calls future.set_result()
result = await future # hangs indefinitely
print(result)
asyncio.run(main())
Always pair a future with the coroutine responsible for resolving it, and use asyncio.wait_for with a timeout as a safety net:
async def resolver(future):
await asyncio.sleep(1)
future.set_result("resolved")
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
asyncio.create_task(resolver(future))
try:
result = await asyncio.wait_for(future, timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("future was never resolved")
asyncio.run(main())
Pattern 3: Lock Acquisition Cycles
Asyncio provides asyncio.Lock, and it can deadlock just like a threading lock if you create a cycle. Task A holds lock 1 and waits for lock 2. Task B holds lock 2 and waits for lock 1. Both yield at the await lock.acquire() call, and the loop spins without making progress on either.
import asyncio
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task_a():
async with lock1:
await asyncio.sleep(0) # yield to let task_b acquire lock2
async with lock2: # now waits forever
print("task_a done")
async def task_b():
async with lock2:
await asyncio.sleep(0)
async with lock1: # now waits forever
print("task_b done")
async def main():
await asyncio.gather(task_a(), task_b())
asyncio.run(main())
The fix is consistent lock ordering: always acquire locks in the same global order across all coroutines. If every task acquires lock1 before lock2, a cycle cannot form.
Pattern 4: Queues With No Consumer
An asyncio.Queue with await queue.join() waits until every item has been processed. If the consumer task crashes or is never started, the producer hangs at join() indefinitely.
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(i)
await queue.join() # blocks if consumer never calls task_done()
print("all items processed")
async def main():
queue = asyncio.Queue()
# Oops β forgot to start the consumer task
await producer(queue)
asyncio.run(main())
Always create consumer tasks before calling queue.join(), and consider wrapping producers in a timeout so a missing consumer surfaces as an error rather than a hang.
Detecting a Stalled Event Loop at Runtime
You can't always reproduce a deadlock locally. For production systems, you need runtime detection.
Enable asyncio debug mode
Set the environment variable PYTHONASYNCIODEBUG=1 or call asyncio.run(main(), debug=True). In debug mode, asyncio logs a warning whenever a coroutine blocks the event loop for longer than 100 milliseconds. This won't catch every deadlock, but it surfaces blocking calls immediately.
PYTHONASYNCIODEBUG=1 python myapp.py
Use a watchdog task
A lightweight watchdog coroutine runs alongside your application and tracks whether the loop is still making progress. If the loop stalls, the watchdog never gets scheduled either β but you can run it on a separate thread to detect that silence.
import asyncio
import threading
import time
_last_tick = time.monotonic()
async def loop_ticker():
global _last_tick
while True:
_last_tick = time.monotonic()
await asyncio.sleep(1)
def watchdog(threshold_seconds=5):
while True:
time.sleep(threshold_seconds)
age = time.monotonic() - _last_tick
if age > threshold_seconds:
print(f"WARNING: event loop may be stalled ({age:.1f}s since last tick)")
async def main():
asyncio.create_task(loop_ticker())
threading.Thread(target=watchdog, daemon=True).start()
# ... rest of your application
await asyncio.sleep(60)
asyncio.run(main())
Dump all running tasks on demand
When you suspect a hang, asyncio.all_tasks() gives you every task currently scheduled. Print their stack frames to see where each one is stuck:
import asyncio
import signal
import sys
def dump_tasks(signum, frame):
loop = asyncio.get_event_loop()
tasks = asyncio.all_tasks(loop)
for task in tasks:
task.print_stack()
signal.signal(signal.SIGUSR1, dump_tasks)
Send SIGUSR1 to the process while it's hung and you'll get a full coroutine stack dump in your logs. On Windows, use signal.SIGBREAK instead.
Using Third-Party Tools
The aiomonitor library attaches a telnet console to a running asyncio application. You can connect to it while the app is live and inspect tasks, stack traces, and loop state without restarting. It's particularly useful for long-running services where you cannot reproduce the deadlock in a test environment. Install it with pip install aiomonitor and start it alongside your app loop.
The py-spy sampling profiler works at the OS level and can attach to a running Python process without modifying your code. Run py-spy dump --pid <pid> to get a snapshot of every thread's current call stack. Because it operates outside the GIL, it works even when the event loop thread is completely blocked.
Common Pitfalls to Watch For
- Mixing
asyncio.runcalls: Callingasyncio.runfrom inside an already-running event loop raises aRuntimeErrorin recent Python versions, but in some environments it silently creates a nested loop that stalls. Useasyncio.get_event_loop().run_until_completeonly when you know there is no running loop. - Synchronous teardown in
__del__: Object destructors run outside the event loop's control. If your__del__method awaits something β even indirectly through a synchronous wrapper β it will block whichever thread runs the garbage collector. - Forgetting
awaiton a coroutine call: Calling a coroutine withoutawaitreturns a coroutine object but does not schedule it. This is a logic bug that looks like a deadlock when another task is waiting for work that never starts. Enable Python's-W error::RuntimeWarningflag to catch unawaited coroutines during development. - Shield misuse:
asyncio.shieldprotects a coroutine from cancellation, but if you shield something that itself blocks indefinitely, you lose the ability to cancel it as an escape valve. Use shield conservatively. - Long-lived
asyncio.gatherwithout error handling: If one task in agathercall raises an exception and you haven't setreturn_exceptions=True, the other tasks are cancelled. But if one task deadlocks silently, the gather never returns and the exception from other tasks is never surfaced.
Defensive Patterns That Prevent Deadlocks
The most effective approach is to make deadlocks structurally impossible rather than relying on detection after the fact.
Put a timeout on every external wait. Wrapping every await that touches I/O, a lock, or a queue with asyncio.wait_for(coro, timeout=N) means a stall becomes a TimeoutError you can log and handle, not a silent freeze.
Prefer asyncio.TaskGroup (available since Python 3.11) over bare gather. A task group propagates exceptions immediately and cancels sibling tasks, so a hung coroutine doesn't silently hold up the whole group indefinitely.
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(worker_one())
tg.create_task(worker_two())
Keep critical sections short. Acquire a lock, do the minimum necessary work, and release it immediately. The longer you hold a lock, the wider the window for a cycle to form.
Audit every synchronous call inside a coroutine. If a function doesn't have an async signature, it might be blocking. Check its implementation or wrap it in run_in_executor as a precaution.
Wrapping Up
Async deadlocks are fixable once you know the patterns behind them. Here are concrete steps you can take right now:
- Run your application with
PYTHONASYNCIODEBUG=1and fix every blocking-call warning it surfaces. - Add a watchdog thread to your production service so a stalled loop becomes an observable alert rather than a silent outage.
- Add
asyncio.wait_fortimeouts to anyawaitthat touches a lock, queue, or external resource. - Replace bare
asyncio.gatherwithasyncio.TaskGroupin Python 3.11+ to get automatic exception propagation and task cancellation. - Wire up a
SIGUSR1handler that dumps all task stacks so you can diagnose a live hang without a restart.
None of these changes require a major refactor. Start with debug mode and timeouts β those two alone will catch the majority of deadlocks before they reach users.
π€ Share this article
Sign in to saveRelated Articles
Comments (0)
No comments yet. Be the first!