How to Stream Claude API Responses in Python with the Official SDK

A
admin
April 22, 2026 7 min read 8 views
Advertisement

You've called the Claude API, your request went through, and now your user is staring at a blank screen for two or three seconds waiting for the full response to arrive. For short outputs that's annoying; for long ones it's a deal-breaker. Streaming fixes this by sending tokens to your application as they are generated, so you can display them immediately.

This guide shows you exactly how to implement streaming with the official anthropic Python SDK — both the synchronous and asynchronous flavours — along with error handling, chunk accumulation, and a few real-world patterns worth knowing.

What you'll learn

  • How to set up the Anthropic SDK and authenticate correctly
  • How to open a synchronous streaming request and process each chunk
  • How to run the same stream in an async context with asyncio
  • How to accumulate the full response text alongside the stream
  • Common pitfalls and how to avoid them

Prerequisites

You need Python 3.8 or later, an Anthropic API key (set as the environment variable ANTHROPIC_API_KEY), and the SDK installed:

pip install anthropic

The SDK handles HTTP, retries, and SSE (Server-Sent Events) parsing for you. You don't need to touch httpx or requests directly for streaming.

Why streaming matters for user experience

Language models generate text one token at a time. Without streaming, your server buffers the entire sequence and sends it only when generation is complete. The user waits, and your perceived latency equals the full generation time regardless of your network speed.

With streaming, the time to first token — the moment the user sees something on screen — drops to a fraction of a second. That shift from "waiting" to "watching" makes a measurable difference in how responsive your application feels. Chat interfaces, copilots, and document generators all benefit from this pattern.

Setting up the client

The anthropic.Anthropic client reads your API key from the environment automatically, so there's rarely a reason to pass it explicitly in code.

import anthropic
import os

# The client picks up ANTHROPIC_API_KEY from the environment automatically.
client = anthropic.Anthropic()

If you need to pass the key explicitly — for example, when pulling it from a secrets manager at runtime — use anthropic.Anthropic(api_key=secret_value). Avoid hardcoding keys in source files.

Synchronous streaming

The SDK exposes streaming through a context manager on client.messages.stream(). Inside the with block you iterate over stream.text_stream, which yields decoded text deltas as strings.

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain how TCP handshakes work in plain English."}
    ],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

print()  # newline after the stream ends

The end="" and flush=True arguments on print are important. Without them, Python buffers output to the terminal in chunks and you lose the visual effect of streaming. In a web application you'd replace this with a write to your response object instead.

Accessing the final message object

Once the stream closes, you can retrieve the fully assembled Message object, which includes stop reason, token usage, and the complete text. This is useful when you need the metadata after displaying the output.

with client.messages.stream(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "List five uses of the Rust programming language."}
    ],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    message = stream.get_final_message()

print()
print(f"Stop reason : {message.stop_reason}")
print(f"Input tokens: {message.usage.input_tokens}")
print(f"Output tokens: {message.usage.output_tokens}")

The get_final_message() call must happen inside the with block, before the context manager closes the underlying connection.

Accumulating the full text while streaming

Sometimes you want to both stream to the user and keep the complete response for logging or post-processing. The straightforward approach is to build up a list and join it after the loop.

chunks = []

with client.messages.stream(
    model="claude-3-5-sonnet-20241022",
    max_tokens=512,
    messages=[
        {"role": "user", "content": "What is idempotency in REST APIs?"}
    ],
) as stream:
    for text in stream.text_stream:
        chunks.append(text)
        print(text, end="", flush=True)

full_response = "".join(chunks)
print(f"\n\nTotal characters: {len(full_response)}")

Appending to a list and joining once is faster than repeated string concatenation in a tight loop. It's a minor optimisation here, but good habit for high-throughput applications.

Asynchronous streaming with asyncio

If your application is already async — a FastAPI endpoint, a Discord bot, or any asyncio-based service — use the AsyncAnthropic client. The interface mirrors the synchronous version but uses async with and async for.

import asyncio
import anthropic

async def stream_response(prompt: str) -> str:
    client = anthropic.AsyncAnthropic()
    collected = []

    async with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            collected.append(text)
            print(text, end="", flush=True)

    print()
    return "".join(collected)


asyncio.run(stream_response("Describe the CAP theorem in two paragraphs."))

Do not mix anthropic.Anthropic (the sync client) inside an async function. Calling a blocking network operation inside a coroutine blocks the event loop and defeats the purpose of async I/O.

Streaming inside a FastAPI endpoint

Pairing async streaming with FastAPI's StreamingResponse is one of the most practical patterns you'll use. Each chunk is yielded as it arrives and sent immediately to the HTTP client.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
claude = anthropic.AsyncAnthropic()


async def token_generator(prompt: str):
    async with claude.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            yield text


@app.get("/ask")
async def ask(q: str):
    return StreamingResponse(token_generator(q), media_type="text/plain")

For a browser-facing API you might prefer media_type="text/event-stream" and format each chunk as an SSE message (data: {chunk}\n\n), which lets your front end consume the stream with the EventSource API.

Handling errors in a stream

The SDK raises typed exceptions from the anthropic namespace. Wrap your streaming block in a try/except and catch the ones you care about.

import anthropic

client = anthropic.Anthropic()

try:
    with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=256,
        messages=[{"role": "user", "content": "Summarise the history of DNS."}],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
except anthropic.AuthenticationError:
    print("\nBad API key. Check your ANTHROPIC_API_KEY environment variable.")
except anthropic.RateLimitError:
    print("\nRate limited. Back off and retry.")
except anthropic.APIStatusError as e:
    print(f"\nAPI error {e.status_code}: {e.message}")
except anthropic.APIConnectionError:
    print("\nNetwork error. Is your connection up?")

The APIStatusError catches any HTTP-level error the API returns (4xx, 5xx) and exposes the status code so you can branch your retry logic accordingly. APIConnectionError covers lower-level network failures before a response starts.

Common pitfalls and gotchas

Forgetting flush=True in a terminal script

Python's stdout is line-buffered by default in a terminal and block-buffered when piped. If you stream into a pipeline (python script.py | grep something) without flush=True, output will appear in unpredictable bursts rather than token by token. Make flushing explicit wherever the streaming experience matters.

Calling get_final_message() outside the context manager

Once the with block exits the underlying connection is closed. Any call to stream.get_final_message() or stream.get_final_text() after the block will raise a StreamAlreadyClosed error. Do all your metadata extraction before the block closes.

Using the sync client in async code

The sync client uses httpx in blocking mode. Calling it from inside a coroutine will block the event loop for the entire duration of the request. Use AsyncAnthropic in any code that runs under asyncio, including FastAPI, Starlette, and aiohttp servers.

Not handling partial chunks in your UI

Text deltas are not always complete words or sentences. A single chunk might be "conc" followed by "urrent". If you're building a web UI that parses tokens for markdown rendering, buffer until you have a complete line or logical unit rather than trying to render every individual delta.

Ignoring stop_reason

When generation ends because max_tokens was reached, the stop_reason will be "max_tokens" rather than "end_turn". If your application relies on the model finishing its thought completely, check this field and either increase max_tokens or prompt the user to continue.

Next steps

You now have the building blocks to add streaming to any Python project that calls Claude. Here's what to do next:

  • Add streaming to an existing endpoint: If you have a synchronous handler that calls client.messages.create(), swap it for the stream() context manager and wire the generator into a StreamingResponse.
  • Handle multi-turn conversations: Pass the full conversation history in the messages list and stream just the latest assistant turn. Accumulate the response text and append it to the history as an assistant message before the next call.
  • Log token usage: Pull message.usage from get_final_message() and write it to your observability stack. Input and output token counts are what drives your bill.
  • Implement retry logic: Catch RateLimitError and APIConnectionError with exponential backoff. The SDK does not retry streaming requests automatically because part of the response may already have been consumed.
  • Read the SDK source: The anthropic package on GitHub is well-structured. Spending 20 minutes reading _streaming.py will clarify how the SSE parser works and what edge cases it already handles for you.
Advertisement

📤 Share this article

Sign in to save
A

admin

Writer at Bitsfolio. Passionate about Python, Data Analytics, and making complex tech topics accessible.

View all articles →

Comments (0)

No comments yet. Be the first!

Leave a Comment

Sign in to comment with your profile.

📬 Weekly Newsletter

Stay ahead of the curve

Get the best programming tutorials, data analytics tips, and tool reviews delivered to your inbox every week.

No spam. Unsubscribe anytime.