Asynchronous Programming in Python

Asynchronous Programming in Python

What is asynchronous programming in python

Asynchronous programming is a paradigm that enables programs to perform multiple tasks concurrently without blocking the execution of the main thread.

Unlike traditional synchronous programming where tasks execute one after another, asynchronous programming allows a program to initiate an operation(e.g., a network request) and then yield control to other tasks while waiting for the operation to complete.

Once the operation is done, control is returned to the original task to continue its execution.

Core idea: Concurrency without Parallelish (Often)
The fundamental concept behind asynchronous programming in Python is concurrency, not necessarily true parallelism (which typically requires multiple CPU cores or precesses). Python’s Global Interpreter Lock (GIL) limits true multithreading for CPU-bound tasks, but asynchronous programming excels at I/O-bound tasks.

The “waiting” in asynchronous programming typically refers to I/O operations like:

  • Network requests: Fetching data from a web API, downloading files.
  • Database queries: Waiting for a response from a database server.
  • File I/O: Reading from or writing to disk.
  • Time delays: asyncio.sleep().
    During these waiting periods, the program doesn’t sit idle; it can execute other waiting tasks.

Key components and concepts

  1. Event Loop:
  • Event loop is the heart of asynchromous programming. It’s a single-threaded loop that monitors for events (e.g., “network data arrived,” “timer expired”) and dispatches tasks to handle those events. It manages the execution flow of coroutines.
  • asyncio library is python’s built-in library for writing concurrent code using the async/await syntax. The event loop is a central part of asyncio.
    asyncio.run() can be used to start the event loop. asyncio.get_event_loop() can be used to obtain and manage the event loop directly.
  1. Coroutines (Cooperative Routines):
  • Functions defined with async def can pause their execution at await points and resume later. They are “cooperative” because they explicitly yield control to the event loop.
  • Calling an async def function does not execute it immediately; it returns a coroutine object. To run it, you must await it or schedule it on the event loop (e.g., using asyncio.create_task())
  1. async and await:
  • async marks a function as a coroutine. It tells Python that this function can contain await expressions and will run cooperatively on the event loop.
  • awati used inside an async def function to pause its execution until an “awaitable” (another coroutine, task, future, etc.) completes. When await is called, control is returned to the event loop, which can then run other pending tasks. it can only be used inside an async def function.
  1. Awaitables:
  • Awaitables are any object that can be “awaited”. the most common awaitables are:
    • Coroutines: Objects returned by calling an async def function.
    • Tasks: Objects created by asyncio.create_task() to schedule a corountine for execution on the event loop.
    • Futures: Low-level objects representing the result of an asynchronous operation. Tasks are built on top of Future.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
async def greet(name,delay):
await asyncio.sleep(delay)
print(f"Hello,{name}!")
async def main():
# Creating coroutine objects (not yet running)
coro1 = greet("Alice", 2)
coro2 = greet("Bob", 1)
#Scheduling coroutines as tasks to run concurrently
task1 = asyncio.create_task(coro1)
taks2 = asyncio.create_task(coro2)
#await the completion of tasks
await task1
await task2
# Or simply await coroutines if you don't need explicit task objects
results = await greet("Charlie", 0.5)
# Running multiple awaitables concurrently and gathering results
results = await asyncio.gather(
greet("David", 0.3),
greet("Eve",0.1)
)
print("All greeting done.")
if __name__ == "__main__":
asyncio.run(main())

Key Libraries and Tools

  1. asyncio (Standard Library):
    asyncio is the foundation for asynchronous I/O in python. it provides the event loop, coroutine utilities, task management, and primitives for networking, subprocesses, and synchronization.
    Key functions/classes:
  • asyncio.run(coro): runs the given coroutine until it complete, managing the event loop automatically. Best for top-level entry point.
  • asyncio.create_task(coro): schedules a coroutine to run on the event loop as a task.
  • asyncio.sleep(delay): An awaitable that pauses the current coroutine for delay seconds, yeilding control.
  • **asyncio.gather(*awaitables)**: Runs multiple awaitables concurrently and waits for all of them to complete. Returns their results in the order they were passed.
  • asyncio.Queue, asyncio.Lock, asyncio.Semaphore: Asynchronous synchronization primitives.
  1. contextlib (Standard Library) - AsyncExitStack & asynccontextmanager:
    AsyncExitStack provides a robust way to manage the lifecycle of multiple synchronous or asynchronous context managers, especially when their number is dynamic. asynccontextmanager is a decorator for easily creating asynchronous context managers.
  • AsyncExitStack (Class) is a stack-like structure that collects cleanup operations( from aexit, exit, or custom callbacks) and executes them in LIFO order when the stack exits.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    from contextlib import AsyncExitStack
    async def connect_db(uri):
    # simulate a asynchronously db connecting
    print(f"Connecting to {url} ...")
    class MockConnection:
    async def close(self):
    print(f"Closing connection to {uri}...")
    return MockConnection()
    async def process_data(connection_uris):
    async with AsyncExitStack() as stack:
    connections = []
    for uri in connection_uris:
    conn = await stack.enter_async_context(connect_db(uri))
    connections.append(conn)
    # Now, all connections in the list connections will be automatically closed when the async with block complete.
    print("All connections established. Processing data...")
    # Simulate the data processing
    await asyncio.sleep(0.1)
    print("Data processing finished.")

    import asyncio
    asyncio.run(process_data(["db_url_1","db_url_2","db_url_3"]))
  • @asynccontextmanager

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    from contextlib import asynccontextmanager
    import asyncio

    @asynccontextmanager
    async def my_async_resource(name):
    print(f"Acquiring async resource: {name}")
    await asyncio.sleep(0.05) # Simulate async setup
    try:
    yield f"DATA_FROM_{name}" # Yield control to the 'async with' block
    finally:
    print(f"Releasing async resource:{name}")
    await asyncio.sleep(0.02) # Simulate async teardown

    async def use_my_resource():
    async with my_async_resource("DB_Connection") as db:
    print(f"Using {db}...")
    await asyncio.sleep(0.1)
    print("Finishing using DB_Connection.')

    # asyncio.run(use_my_resource())

async with is designed to work with @asynccontextmanager. it is to ensure that asynchronous resources are properly set up(aenter) before a block of code executes, and properly torn down (aexit) afterward, even if errors occur within that block. It is the asynchronous version of with
How it works:

  1. When pyton encounters async with expression as variable:, it first evaluates expression. This expression is expected to return an asynchronous context manager object.
  2. It then awaits the aenter method of that context manager object. The result of aenter is assigned to variable (if as variable is used)
  3. Then code inside the async with block then executes.
  4. Crucially regardless of whether the async with block finishes normally or exits due to an exception, python guarantees that it will await the aexit method of the context manager.
  5. The aexit method receives information about any exception that occurred within the block, allowing the context manager to handle or suppress it.

asynccontextmanager Decorator is a decorator that provide a convenient, generator-based way to create asynchronous context managers without having to write a full class with aenter and aexit methods. It wraps an async def generator function and transforms it into an object that adheres to the asynchronous context manager protocol.
How it works:

  1. define the async def function with decorator of @asynccontextmanager such as my_async_resource.
  2. Inside this function, any code before the yield statement is considered the asynchronous setup logic (what would typically go in aenter).
  3. The yield statement is the point where control is passed to the async with block. The value yielded(e.g., f””DATA_FROM_{NAME}””) becomes the value bound to the as variable in the async with statement (e.g., db).
  4. Any code after the yield statement (typically in a finally block to ensure execution) is considered the asynchronous teardown logic( what would typically go in aexit). This code will run when the async with blcok is exited, whether normally or due to an exception.

Concurrency in FastAPI (vs Java Spring)

Understanding concurrency is fundamental to building hight-performance web application. While both python’s FastAPI and Java’s Spring can handle many requests simultaneously, their underlying concurrency models differ significantly.

Concurrency vs. Parallelism

  • Concurrency deals with many things at once, but tasks take turns on a signle CPU core.
  • Parallelism is truly doing many things at once, with tasks literally executing simultaneously on different CPU cores or precessors.

Baseline: Java Spring’ Traditional Concurrency Model( Thread-Per-Request)

In SpringMVC efore reactive Spring WebFlux became popular, the traditional concurrency model is thread-based and ofent relies on blocking I/O:

  • Core Unit: Operating System (OS) Threads.
  • Request Handling:
    • When a request arrives, the web server(e.g., Tomcat embedded in Spring Boot) picks an available thread from its thread pool.
    • This thread is then dedicated to handling that specific request from start to finish.
  • I/O Handling (Blocking):
    • If the thread needs to perform an I/O operation(e.g., fetching data from a database, calling an external API, reading from a file), it becomes blocked (or “waits”).
    • While blocked, the thread cannot perform any other work; it effectively sits idle until the I/O operation completes.
  • Archieving Concurrency: By having a pool of many threads(e.g., 200-400), the server can handle multiple simultaneous requests, each consuming one thread.
  • Archieving Parallelism: On a multi-core CPU, the OS can truly run multiple threads in parallel across different cores.
  • Pros:
    • Simple Mental Model: The code flows sequentially, making it easies to read and debug.
    • Good for CPU-Bound Tasks: If a request handler is purely CPU-intensive, a dedicated thread effectively utilizes a CPU core.
  • Cons:
    • Resource Consumption: Each OS thread consumes significant memory (stack space) and OS resources. For very high concurrency (throusands of requests), this can lead to memory exhaustion and performance degradation( the “C10k problem”)
    • Context Switching Overhead: The OS incurs overhead when switching between a large number of active threads.
    • I/O-Bound Bottleneck: Threads are idle while waiting for I/O, wasting valuable server resources.

FastAPI’s Concurrency Model (Asynchronous I/O with Conroutines)

FastAPI is built on ASGI (Asynchronous Server Gateway Interface), leveraging python’s async/await syntax, which enables asynchronous I/O and coroutines. This model is fundamentally different from traditional Spring:

  • Core Unit: Coroutines(Python’s lightweight concurrency primitives)
  • Request Handling:
    • An ASGI server (like Uvicorn, which FastAPI uses) typically starts a small number of worker processes (often matching the number of CPU cores)
    • Within each worker proccess, there’s one or more event loops running on a single OS thread.
    • When a reqeust arrives, the event loop creates and schedules coroutines to handle it.
  • I/O Handling (Non-Blocking):
    • When a coroutine encounters an awaitable I/O operation (e.g., await database.fetch_data(), await httpx.get(…)), it yields control back to the event loop instead of blocking the thread.
    • The event loop immediately picks up another pending coroutine or a new incoming request.
    • When the I/O operation completes, the event loop receives a notification and resumes the previously suspended coroutine from where it left off.
  • Archieving Concurrency: A single thread (running an event loop) can efficiently manage thousands of concurrent I/O-bound connections by switching between coroutines that are waiting for I/O.
  • Archieving Parallelism: This is achieved by running multiple Uvicorn worker processes (each with its own event loop and threads), allowing different processes to utilize different CPU cores.
  • Pros:
    • High Concurrency for I/O-Bound Tasks: Excellent for modern web APIs that frequently interact with databases, external services, or file systems.
    • Low Resource Consumption: Coroutines are very lightweight; their overhead (memory, context switching) is significantly lower than OS threads.
    • Scalability: More scalable for I/O-intensive workloads.
  • Cons:
    • Programming Model Shift: Requires understanding async/await
    • Blocking Code Issue: If you accidentally run a blocking(non-async) I/O operation or a CPU-bound task directly within an async def coroutine without explicitly offloading it, it will block the entire event loop, severely degrading performance for all other concurrent requests in that worker thread.

FastAPI’s Automatic Offloading:
If an entire endpoint is defined as a standard def function (not async def), FastAPI will automatically run that entire endpoint in its thread pool. Similary, if a def dependecy function is called from an async def endpoint, that dependency function will also be automatically run in the thread pool.
Explicitly Control Offloading:
While above automatic behavior covers many common cases. the run_in_threadpool also gives the explicit control when need to call a specific part of an async def function sysnchronously.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# main.py - CORRECT WAY (offloads to thread pool)
from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool # Explicit import for clarity
import time # For simulation in example

app = FastAPI()

def perform_heavy_computation(data: int) -> int:
"""
A synchronous, CPU-bound function.
"""
print(f"Starting heavy computation for {data}...")
time.sleep(3) # Simulate 3 seconds of CPU-bound work
result = data * data * 123456789
print(f"Finished heavy computation for {data}.")
return result

@app.get("/offloaded_cpu_task/{data}")
async def offloaded_cpu_task_endpoint(data: int):
print("Endpoint received request (offloaded_cpu_task_endpoint)")
# Explicitly offload the blocking function to a separate thread pool.
# The current async function will await this operation, but the event loop
# will NOT be blocked. It can process other requests while this runs in a thread.
result = await run_in_threadpool(perform_heavy_computation, data)
# Or using the app instance (often available via dependency injection if needed):
# result = await app.run_in_threadpool(perform_heavy_computation, data)
print("Endpoint finished request (offloaded_cpu_task_endpoint)")
return {"message": "Offloaded task completed", "result": result}

How FastAPI handles Different Concurrency Scenarios

FastAPI is designed to handle both I/O-bound and CPU-bound tasks gracefully:

1. I/O-Bound Tasks (Native Asynchronous Strength)

Use async def for endpoint functions and await for all I/O operations (e.g., database clients, HTTP clients like httpx, file I/O libraries that support asyncio).

1
2
3
4
5
6
7
8
9
10
from fastapi import FastAPI
import httpx # An async HTTP client

app = FastAPI()
@app.get("/item/{item_id}")
async def read_item(item_id:int):
#This will yield control while waiting for the external API response
async with httpx.AsyncClient() as client:
response = await client.get(f"https://some-external-api.com/items/{item_id}")
return response.json()

While the client.get() waits, the event loop can process other incoming requests or resume other coroutines.

2. CPU-Bound Tasks (Automatic Thread Pool Offloading)

For a blocking (synchronous) function or a CPU-intensive calculation that cannot be easily made asynchronous, it is still ok in FastAPI.

  • in a async def endpoint, call a regular def function that perform blocking I/O or heavy computation, FastAPI (via Starlette’s underlying AnyIO/asyncio) will automatically run that blocking function in a separate thread pool. This prevents it from blocking the main event loop.
  • if the entire endpoint is def (synchronous), FastAPI will also run the entire endpoint in this thread pool.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    from fastapi import FastAPI
    import time

    app = FastAPI()

    def complex_calculation(n: int) -> int:
    # This is a CPU-bound, blocking function
    time.sleep(n) # Simulate heavy computation
    return n * 2

    @app.get("/calculate/{n}")
    async def get_calculation(n: int):
    # FastAPI will run complex_calculation in a thread pool
    result = await app.run_in_threadpool(complex_calculation, n)
    return {"result": result}

    # Alternatively, define the endpoint itself as synchronous:
    @app.get("/calculate_sync/{n}")
    def get_calculation_sync(n: int):
    # FastAPI will automatically run this entire sync endpoint in a thread pool
    result = complex_calculation(n)
    return {"result": result}
    This allows the main event loop to remain free for I/O-bound tasks, ensuring overall responsiveness.

Achieving True Parallelism with FastAPI

While async/await provides concurrency, true parallelism across multiple CPU cores is achieved by running multiple worker processes for the ASGI application.

  • Uvicorn Workers: typically run Uvicorn with the –workers flag:
    1
    uvicorn main:app --workers 4 --reload
    This command will start 4 Uvicorn processes. Each process will then handle its own event loop and manage its own set of concurrent coroutines. This setup allows the FastAPI application to fully utilize multiple CPU cores for CPU-bound tasks (distributed across processes) and efficiently handle I/O-bound tasks within each process.