Streaming
Real-time responses with Server-Sent Events
Streaming Responses
TL;DR
Add "stream": true to your request. Responses use Server-Sent Events (SSE). Parse chunks with delta.content. Stream ends with data: [DONE]. Reduces perceived latency from 2-5s to instant feedback.
Enable streaming for real-time, token-by-token responses. This improves perceived latency and user experience for chat applications.
How Streaming Works
Without streaming, you wait for the entire response:
[Request] ────────────────────────────> [Full Response]
<---- 2-5 seconds ---->With streaming, tokens arrive as they're generated:
[Request] → [Token] → [Token] → [Token] → [Done]
<50ms> <50ms> <50ms>Enabling Streaming
Set stream=true in your request:
from openai import OpenAI
client = OpenAI(
api_key="ask_your_key",
base_url="https://api.assisters.dev/v1"
)
stream = client.chat.completions.create(
model="assisters-chat-v1",
messages=[{"role": "user", "content": "Write a poem about coding"}],
stream=True # Enable streaming
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)import OpenAI from 'openai';
const client = new OpenAI({
apiKey: 'ask_your_key',
baseURL: 'https://api.assisters.dev/v1'
});
const stream = await client.chat.completions.create({
model: 'assisters-chat-v1',
messages: [{ role: 'user', content: 'Write a poem about coding' }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
process.stdout.write(content);
}curl https://api.assisters.dev/v1/chat/completions \
-H "Authorization: Bearer ask_your_key" \
-H "Content-Type: application/json" \
-d '{
"model": "assisters-chat-v1",
"messages": [{"role": "user", "content": "Write a poem"}],
"stream": true
}'Stream Response Format
Streaming uses Server-Sent Events (SSE). Each event is a JSON object:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hello"}}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":" world"}}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{}},"finish_reason":"stop"]}
data: [DONE]Chunk Structure
{
"id": "chatcmpl-abc123",
"object": "chat.completion.chunk",
"created": 1706745600,
"model": "assisters-chat-v1",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": "Hello"
},
"finish_reason": null
}
]
}role appears only in the first chunk. finish_reason is null for all chunks except the last, where it is "stop".Web Application Example
React Hook
import { useState, useCallback } from 'react';
function useStreamingChat() {
const [response, setResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (message) => {
setIsStreaming(true);
setResponse('');
const res = await fetch('https://api.assisters.dev/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': 'Bearer ask_your_key',
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'assisters-chat-v1',
messages: [{ role: 'user', content: message }],
stream: true
})
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(line => line.startsWith('data:'));
for (const line of lines) {
const data = line.slice(5).trim();
if (data === '[DONE]') continue;
try {
const json = JSON.parse(data);
const content = json.choices[0]?.delta?.content || '';
setResponse(prev => prev + content);
} catch (e) {
// Skip malformed chunks
}
}
}
setIsStreaming(false);
}, []);
return { response, isStreaming, sendMessage };
}Usage
function ChatComponent() {
const { response, isStreaming, sendMessage } = useStreamingChat();
return (
<div>
<button onClick={() => sendMessage('Hello!')}>
Send
</button>
<div>{response}</div>
{isStreaming && <span>...</span>}
</div>
);
}Python Async Streaming
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key="ask_your_key",
base_url="https://api.assisters.dev/v1"
)
async def stream_chat(message):
stream = await client.chat.completions.create(
model="assisters-chat-v1",
messages=[{"role": "user", "content": message}],
stream=True
)
full_response = ""
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response += content
print(content, end="", flush=True)
return full_response
# Run
asyncio.run(stream_chat("Tell me a story"))FastAPI Streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI(api_key="ask_your_key", base_url="https://api.assisters.dev/v1")
@app.post("/chat")
async def chat(message: str):
async def generate():
stream = client.chat.completions.create(
model="assisters-chat-v1",
messages=[{"role": "user", "content": message}],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield f"data: {content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)Handling Stream Errors
from openai import OpenAI, APIError
client = OpenAI(api_key="ask_your_key", base_url="https://api.assisters.dev/v1")
def safe_stream(messages):
try:
stream = client.chat.completions.create(
model="assisters-chat-v1",
messages=messages,
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield content
except APIError as e:
yield f"\n\n[Error: {e.message}]"Token Counting with Streams
Streaming responses don't include usage stats until the end:
def stream_with_usage(messages):
stream = client.chat.completions.create(
model="assisters-chat-v1",
messages=messages,
stream=True,
stream_options={"include_usage": True} # Request usage in final chunk
)
full_response = ""
usage = None
for chunk in stream:
if chunk.choices:
content = chunk.choices[0].delta.content
if content:
full_response += content
print(content, end="")
# Usage appears in final chunk
if hasattr(chunk, 'usage') and chunk.usage:
usage = chunk.usage
print(f"\n\nTokens used: {usage.total_tokens if usage else 'unknown'}")
return full_responseBest Practices
Always Use for Chat
Streaming dramatically improves UX for conversational interfaces
Handle Disconnects
Implement reconnection logic for long responses
Buffer Display
Display tokens as they arrive, don't wait for full words
Show Typing Indicator
Show users that a response is being generated
When Not to Stream
Streaming isn't always the best choice:
| Use Case | Recommendation |
|---|---|
| Chat interfaces | ✅ Stream |
| Batch processing | ❌ Don't stream |
| Short responses | Either works |
| JSON extraction | ❌ Don't stream |
| Background tasks | ❌ Don't stream |
Debugging Streams
Log stream events for debugging:
import json
def debug_stream(messages):
stream = client.chat.completions.create(
model="assisters-chat-v1",
messages=messages,
stream=True
)
for i, chunk in enumerate(stream):
print(f"Chunk {i}: {json.dumps(chunk.model_dump(), indent=2)}")