# 60. `asyncio`

# 60. `asyncio`

The `asyncio` module is CPython’s standard library framework for asynchronous I/O. It provides an event loop, tasks, futures, transports, streams, synchronization primitives, subprocess integration, timers, and APIs for coordinating many waiting operations in one thread.

For CPython internals, `asyncio` matters because it exposes how `async def`, coroutine objects, `await`, futures, event loops, callbacks, and I/O readiness fit together.

## 60.1 The Role of `asyncio`

`asyncio` is built for concurrent waiting.

It works well when a program spends much of its time waiting for:

```text
network sockets
subprocess pipes
timers
file descriptor readiness
server connections
client responses
coordination between tasks
```

A minimal example:

```python
import asyncio

async def main():
    await asyncio.sleep(1)
    print("done")

asyncio.run(main())
```

The program creates a coroutine, starts an event loop, waits for the timer, resumes the coroutine, prints, and shuts down the loop.

The core model:

```text
coroutines describe async work
tasks schedule coroutines
the event loop drives tasks
await suspends execution
I/O readiness or timers resume execution
```

## 60.2 `async def`

An `async def` function does not execute immediately when called.

```python
async def fetch():
    return 42

coro = fetch()
print(coro)
```

Calling `fetch()` creates a coroutine object.

Execution begins only when the coroutine is awaited or scheduled by an event loop.

```python
import asyncio

async def fetch():
    return 42

async def main():
    result = await fetch()
    print(result)

asyncio.run(main())
```

Conceptually:

```text
call async function
    ↓
create coroutine object
    ↓
schedule or await it
    ↓
run until await point
    ↓
suspend
    ↓
resume later
```

This is different from a normal function call, which executes immediately.

## 60.3 Coroutine Objects

A coroutine object is a resumable execution object.

It contains state similar to a generator:

```text
code object
suspended frame
current await target
running flag
locals
instruction position
```

You can inspect some fields:

```python
async def f():
    await asyncio.sleep(1)

coro = f()

print(coro.cr_code)
print(coro.cr_frame)
print(coro.cr_running)
print(coro.cr_await)

coro.close()
```

A coroutine can be in one of several states:

| State | Meaning |
|---|---|
| Created | Created but not started |
| Running | Currently executing |
| Suspended | Paused at `await` |
| Closed | Finished, cancelled, or closed |

The `inspect` module exposes these states through `inspect.getcoroutinestate()`.

## 60.4 `await`

`await` suspends the current coroutine until an awaitable produces a result.

Example:

```python
async def main():
    result = await some_async_operation()
```

The awaited object must be awaitable. Common awaitables include:

| Awaitable | Meaning |
|---|---|
| Coroutine object | Result of calling an `async def` function |
| Task | Scheduled coroutine |
| Future | Placeholder for a result |
| Object with `__await__` | Custom awaitable |

At runtime, `await` is a controlled suspension point.

Conceptually:

```text
current coroutine reaches await
    ↓
returns control to event loop
    ↓
event loop runs other work
    ↓
awaited object completes
    ↓
current coroutine resumes
```

`await` does not create an OS thread. It cooperatively yields control to the event loop.

## 60.5 Event Loop

The event loop is the scheduler.

It manages:

```text
ready callbacks
scheduled timers
I/O readiness
tasks
futures
subprocess events
signal handlers on supported platforms
```

A simplified event loop iteration looks like:

```text
run ready callbacks
check timers
wait for I/O readiness
mark futures ready
resume tasks
repeat
```

At the Python level, most programs use:

```python
asyncio.run(main())
```

This creates and manages the loop for a top-level coroutine.

Lower-level loop access:

```python
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    print(loop)

asyncio.run(main())
```

Inside a running coroutine, `get_running_loop()` returns the active event loop.

## 60.6 `asyncio.run()`

`asyncio.run()` is the standard top-level entry point.

```python
import asyncio

async def main():
    await asyncio.sleep(1)

asyncio.run(main())
```

It performs:

```text
create event loop
set it as current
run top-level coroutine
finalize asynchronous generators
shutdown default executor
close event loop
```

Use it once at the outermost program boundary.

Bad pattern:

```python
async def inner():
    asyncio.run(other())
```

You cannot call `asyncio.run()` from inside an already running event loop. In async code, use `await` instead.

## 60.7 Tasks

A task wraps a coroutine and schedules it on the event loop.

```python
import asyncio

async def worker():
    await asyncio.sleep(1)
    return "done"

async def main():
    task = asyncio.create_task(worker())
    result = await task
    print(result)

asyncio.run(main())
```

Without `create_task()`, simply creating a coroutine object does not schedule it.

```python
coro = worker()       # created, not running
task = create_task(coro)  # scheduled
```

A task manages:

```text
coroutine execution
result storage
exception storage
cancellation
callbacks
state transitions
```

Task states include:

| State | Meaning |
|---|---|
| Pending | Scheduled or waiting |
| Running | Currently being advanced |
| Done | Returned, raised, or cancelled |

## 60.8 Futures

A future is a placeholder for a result that will be available later.

```python
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    loop.call_soon(fut.set_result, 42)

    result = await fut
    print(result)

asyncio.run(main())
```

A future can be:

```text
pending
done with result
done with exception
cancelled
```

Tasks are future-like. A task is a future that drives a coroutine.

Conceptually:

```text
Future
    stores eventual result

Task
    Future + coroutine runner
```

Library code often uses futures to bridge callback-style APIs into `await`.

## 60.9 Cooperative Scheduling

`asyncio` scheduling is cooperative.

A task runs until it reaches an await point, returns, raises, or is cancelled.

This means CPU-bound code blocks the event loop:

```python
async def bad():
    while True:
        pass
```

While `bad()` runs, no other task can proceed.

Good async code yields control at waiting points:

```python
async def good():
    while True:
        await asyncio.sleep(0)
```

`asyncio.sleep(0)` gives other tasks a chance to run.

For CPU-bound work, use:

```text
process pools
thread pools for blocking I/O
native libraries
separate services
```

## 60.10 Timers

`asyncio.sleep()` suspends the current coroutine for a duration.

```python
await asyncio.sleep(1.5)
```

Internally, the loop schedules a timer and resumes the task later.

Conceptually:

```text
task calls sleep
    ↓
future created
    ↓
timer scheduled
    ↓
task suspends
    ↓
timer fires
    ↓
future completes
    ↓
task resumes
```

The event loop can run other tasks while waiting.

## 60.11 Running Concurrent Tasks

Use `asyncio.gather()` to wait for several awaitables.

```python
import asyncio

async def fetch(i):
    await asyncio.sleep(1)
    return i

async def main():
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3),
    )
    print(results)

asyncio.run(main())
```

All three tasks can wait concurrently.

Expected runtime is about one second, not three, because the sleeps overlap.

`gather()` preserves input order in its result list.

## 60.12 `TaskGroup`

`TaskGroup` provides structured task management.

```python
import asyncio

async def worker(i):
    await asyncio.sleep(1)
    return i

async def main():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(worker(1))
        t2 = tg.create_task(worker(2))

    print(t1.result(), t2.result())

asyncio.run(main())
```

The task group waits for all child tasks before exiting.

If one task fails, the group cancels the others and raises an exception group.

This gives async code a clear ownership structure:

```text
parent task
    owns task group
        owns child tasks
```

Structured concurrency reduces leaked background tasks.

## 60.13 Cancellation

Cancellation is cooperative.

```python
import asyncio

async def worker():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("cancelled")
        raise

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(0.1)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("observed cancellation")

asyncio.run(main())
```

Calling `task.cancel()` requests cancellation. The task receives `CancelledError` at an await point.

Important rule:

```text
catch CancelledError only to clean up, then usually re-raise it
```

Swallowing cancellation can break timeouts, task groups, and shutdown logic.

## 60.14 Timeouts

`asyncio.timeout()` limits how long an operation may run.

```python
import asyncio

async def main():
    try:
        async with asyncio.timeout(1):
            await asyncio.sleep(10)
    except TimeoutError:
        print("timed out")

asyncio.run(main())
```

Timeouts are implemented through cancellation.

Conceptually:

```text
start protected block
    ↓
schedule timeout
    ↓
if timeout fires, cancel current task
    ↓
convert cancellation to TimeoutError at boundary
```

Timeout behavior depends on cooperative cancellation. If the coroutine blocks the event loop with CPU work, the timeout cannot fire promptly.

## 60.15 Shielding

`asyncio.shield()` protects an awaitable from outer cancellation.

```python
await asyncio.shield(task)
```

If the awaiting coroutine is cancelled, the shielded task may continue running.

Use this sparingly. Shielding can make shutdown harder and can leave background work alive longer than expected.

Typical use cases:

```text
must-finish cleanup
commit or rollback boundary
protocol close handshake
```

## 60.16 Streams

`asyncio` streams provide high-level TCP I/O.

Server:

```python
import asyncio

async def handle(reader, writer):
    data = await reader.read(100)
    writer.write(data)
    await writer.drain()
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "127.0.0.1", 8000)

    async with server:
        await server.serve_forever()

asyncio.run(main())
```

Client:

```python
reader, writer = await asyncio.open_connection("127.0.0.1", 8000)

writer.write(b"hello")
await writer.drain()

data = await reader.read(100)
writer.close()
await writer.wait_closed()
```

Streams wrap lower-level transports and protocols into reader and writer objects.

## 60.17 Backpressure

`writer.write()` buffers data. It may not send immediately.

```python
writer.write(data)
await writer.drain()
```

`drain()` waits until the transport buffer has room.

This is backpressure.

Without `drain()`, a fast producer can buffer too much data and increase memory usage.

Conceptually:

```text
producer writes faster than socket can send
    ↓
transport buffer grows
    ↓
drain waits for buffer to shrink
```

Correct async I/O respects backpressure.

## 60.18 Synchronization Primitives

`asyncio` provides task-level synchronization primitives.

| Primitive | Purpose |
|---|---|
| `asyncio.Lock` | Mutual exclusion between tasks |
| `asyncio.Event` | One-bit notification |
| `asyncio.Condition` | Wait and notify |
| `asyncio.Semaphore` | Limit concurrent access |
| `asyncio.Queue` | Async producer-consumer channel |
| `asyncio.Barrier` | Wait for a group of tasks |

Example semaphore:

```python
import asyncio

sem = asyncio.Semaphore(10)

async def fetch(url):
    async with sem:
        return await do_request(url)
```

These primitives coordinate tasks in one event loop. They are not process locks or thread locks.

## 60.19 Async Queues

`asyncio.Queue` supports async producer-consumer pipelines.

```python
import asyncio

async def producer(q):
    for i in range(5):
        await q.put(i)
    await q.put(None)

async def consumer(q):
    while True:
        item = await q.get()
        try:
            if item is None:
                return
            print(item)
        finally:
            q.task_done()

async def main():
    q = asyncio.Queue()

    await asyncio.gather(
        producer(q),
        consumer(q),
    )

asyncio.run(main())
```

Queues provide natural backpressure with `maxsize`.

```python
q = asyncio.Queue(maxsize=100)
```

When the queue is full, `put()` waits.

## 60.20 Blocking Calls

Blocking calls stop the event loop.

Bad:

```python
async def main():
    time.sleep(5)
```

Good:

```python
async def main():
    await asyncio.sleep(5)
```

For blocking functions that cannot be rewritten as async, use a thread executor:

```python
import asyncio
import time

def blocking():
    time.sleep(5)
    return 42

async def main():
    result = await asyncio.to_thread(blocking)
    print(result)

asyncio.run(main())
```

`asyncio.to_thread()` runs the function in a thread and returns an awaitable.

This helps with blocking I/O. It does not make CPU-bound Python code parallel in traditional CPython because of the GIL.

## 60.21 Executors

The event loop can run blocking work in executors.

```python
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_work(x):
    return x * x

async def main():
    loop = asyncio.get_running_loop()

    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_work, 10)

    print(result)

asyncio.run(main())
```

Executor choices:

| Executor | Good for |
|---|---|
| Thread pool | Blocking I/O |
| Process pool | CPU-bound Python code |
| Default executor | Convenient blocking I/O offload |

This is the bridge between async scheduling and blocking APIs.

## 60.22 Subprocesses

`asyncio` can manage subprocesses asynchronously.

```python
import asyncio

async def main():
    proc = await asyncio.create_subprocess_exec(
        "python",
        "-c",
        "print('hello')",
        stdout=asyncio.subprocess.PIPE,
    )

    out, err = await proc.communicate()
    print(out.decode())

asyncio.run(main())
```

This lets the event loop wait for process output without blocking other tasks.

## 60.23 Low-Level Callbacks

The event loop supports callback scheduling.

```python
import asyncio

def callback():
    print("soon")

async def main():
    loop = asyncio.get_running_loop()
    loop.call_soon(callback)

    await asyncio.sleep(0)

asyncio.run(main())
```

Timers:

```python
loop.call_later(1.0, callback)
```

Thread-safe scheduling from another thread:

```python
loop.call_soon_threadsafe(callback)
```

Most application code should prefer coroutines and tasks, but callbacks are important for bridging low-level event sources.

## 60.24 Transports and Protocols

Before streams, `asyncio` exposed transports and protocols.

A protocol object receives callbacks:

```text
connection_made
data_received
connection_lost
```

A transport object performs I/O:

```text
write
close
pause_reading
resume_reading
```

Conceptually:

```text
event loop
    ↓ I/O event
protocol callback
    ↓
transport operation
```

Streams are built on top of this lower-level abstraction.

Transports and protocols are useful for high-performance protocol implementations or compatibility with older asyncio code.

## 60.25 Event Loop Implementations

`asyncio` has platform-specific event loop implementations.

Common mechanisms include:

| Platform area | Mechanism |
|---|---|
| Unix sockets | Selector-based readiness |
| Windows I/O | Proactor-based APIs |
| Timers | Event loop scheduled callbacks |
| Subprocesses | Platform-specific child watchers or proactor support |

Third-party event loops such as `uvloop` can replace the default loop in some environments.

The event loop abstraction lets most asyncio programs avoid direct OS readiness APIs.

## 60.26 Debug Mode

`asyncio` has debug support.

Enable with:

```python
import asyncio

asyncio.run(main(), debug=True)
```

or environment variables and loop settings.

Debug mode can help detect:

```text
slow callbacks
forgotten awaits
pending task destruction
wrong-thread loop access
resource warnings
```

It adds overhead, so it is primarily for development and testing.

## 60.27 Async Context Managers

Async context managers use `async with`.

```python
class Connection:
    async def __aenter__(self):
        await self.open()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.close()
```

Usage:

```python
async with Connection() as conn:
    await conn.query("select 1")
```

This pattern is essential when cleanup itself requires awaiting, such as closing network connections gracefully.

## 60.28 Async Iterators

Async iterators use `async for`.

```python
class Counter:
    def __init__(self, n):
        self.i = 0
        self.n = n

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration

        self.i += 1
        await asyncio.sleep(0.1)
        return self.i
```

Usage:

```python
async for item in Counter(3):
    print(item)
```

This supports asynchronous streams of values, such as network messages or paginated API results.

## 60.29 Async Generators

An async generator is declared with `async def` and `yield`.

```python
async def ticker(n):
    for i in range(n):
        await asyncio.sleep(1)
        yield i
```

Usage:

```python
async for value in ticker(3):
    print(value)
```

Async generators combine coroutine suspension with streamed output.

They have cleanup semantics because they may hold resources across yields. `asyncio.run()` finalizes remaining async generators during shutdown.

## 60.30 Context Variables

`asyncio` integrates with `contextvars`.

Context variables allow task-local context.

```python
import asyncio
import contextvars

request_id = contextvars.ContextVar("request_id")

async def worker():
    print(request_id.get())

async def main():
    request_id.set("abc")
    await worker()

asyncio.run(main())
```

Context is preserved across awaits and task scheduling in controlled ways.

This is important for:

```text
request IDs
tracing
logging context
auth context
locale or tenant state
```

Unlike thread-local storage, context variables work with asynchronous task switching.

## 60.31 Common Failure Modes

Common `asyncio` bugs include:

| Mistake | Consequence |
|---|---|
| Calling coroutine without awaiting | Coroutine never runs |
| Blocking with `time.sleep()` | Event loop stalls |
| CPU-bound loop in coroutine | Other tasks starve |
| Forgetting `writer.drain()` | Memory growth |
| Swallowing `CancelledError` | Broken cancellation |
| Creating orphan tasks | Leaked background work |
| Calling `asyncio.run()` inside loop | Runtime error |
| Sharing non-async locks | Deadlocks or blocking |
| Ignoring task exceptions | Lost failures |
| Using async for CPU parallelism | No real CPU parallelism |

Most errors come from treating async code like threaded code. `asyncio` is cooperative scheduling, not preemptive scheduling.

## 60.32 Relationship to CPython Internals

`asyncio` connects to several CPython subsystems:

| CPython area | Connection |
|---|---|
| Compiler | `async def`, `await`, `async for`, `async with` syntax |
| Code objects | Coroutine flags and async generator flags |
| Object model | Coroutine, task, future, async iterator objects |
| Frames | Suspended coroutine frames |
| Exceptions | Cancellation and exception propagation |
| Context variables | Task-local context propagation |
| Event loop | Scheduling and callback execution |
| Selectors | OS readiness polling |
| Subprocess support | Async process integration |
| Thread pools | Blocking work offload |

At the language level, `async` and `await` define suspension semantics. At the library level, `asyncio` provides the scheduler and I/O integration needed to make those semantics useful.

## 60.33 Why `asyncio` Matters

`asyncio` matters because modern Python programs often handle many concurrent I/O operations:

```text
HTTP clients
web servers
database clients
message brokers
chat systems
crawlers
proxies
stream processors
automation systems
```

For these workloads, creating one OS thread per operation can be expensive. `asyncio` lets one event loop manage many waiting operations with explicit suspension points.

The core tradeoff:

| Strength | Cost |
|---|---|
| High concurrency for I/O-bound workloads | Requires async-compatible APIs |
| Low per-task overhead | Blocking code stalls everything |
| Structured cancellation and timeouts | Cancellation must be handled carefully |
| Clear scheduling points | CPU-bound work needs another strategy |

## 60.34 Chapter Summary

The `asyncio` module is CPython’s standard asynchronous I/O framework. It uses an event loop to drive coroutine objects, tasks, futures, timers, sockets, subprocesses, and callbacks.

For CPython internals, `asyncio` is important because it connects language-level coroutine machinery to runtime scheduling. It shows how suspended frames, awaitables, tasks, cancellation, event loops, context variables, and OS I/O readiness cooperate to provide concurrent execution in one thread.
