Streaming API
Evolution OpenAI поддерживает потоковую передачу ответов (streaming), позволяя получать части ответа по мере их генерации.
Что такое Streaming
Streaming позволяет:
Получать ответы по частям вместо ожидания полного ответа
Улучшить UX - пользователь видит прогресс генерации
Снизить воспринимаемую задержку - первые слова появляются быстрее
Обрабатывать длинные ответы эффективно
Базовое использование
Синхронный streaming
from evolution_openai import OpenAI
client = OpenAI(
key_id="your_key_id",
secret="your_secret",
base_url="https://your-endpoint.cloud.ru/v1"
)
stream = client.chat.completions.create(
model="default",
messages=[{
"role": "user",
"content": "Расскажи длинную историю про космос"
}],
stream=True, # Включаем streaming
max_tokens=300
)
# Обрабатываем каждый chunk
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # Новая строка в конце
Асинхронный streaming
import asyncio
from evolution_openai import AsyncOpenAI
async def async_streaming():
async with AsyncOpenAI(
key_id="your_key_id",
secret="your_secret",
base_url="https://your-endpoint.cloud.ru/v1"
) as client:
stream = await client.chat.completions.create(
model="default",
messages=[{
"role": "user",
"content": "Объясни квантовую физику подробно"
}],
stream=True,
max_tokens=400
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
asyncio.run(async_streaming())
Структура streaming chunk
Каждый chunk содержит следующие поля:
chunk = {
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": 1677652288,
"model": "default",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant", # Только в первом chunk
"content": "Привет" # Часть ответа
},
"finish_reason": None # null пока не закончено
}
]
}
Обработка различных типов chunk
stream = client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Привет!"}],
stream=True
)
full_response = ""
for chunk in stream:
choice = chunk.choices[0]
delta = choice.delta
# Первый chunk с ролью
if delta.role:
print(f"Роль: {delta.role}")
# Контент
if delta.content:
content = delta.content
print(content, end="", flush=True)
full_response += content
# Проверяем причину завершения
if choice.finish_reason:
print(f"\nЗавершено: {choice.finish_reason}")
break
print(f"\nПолный ответ: {full_response}")
Продвинутые возможности
Streaming с метаданными
import time
def streaming_with_stats():
start_time = time.time()
chunk_count = 0
total_content = ""
stream = client.chat.completions.create(
model="default",
messages=[{
"role": "user",
"content": "Напиши подробный рассказ про будущее"
}],
stream=True,
max_tokens=500
)
for chunk in stream:
chunk_count += 1
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
total_content += content
print(content, end="", flush=True)
end_time = time.time()
print(f"\n\nСтатистика:")
print(f"Время: {end_time - start_time:.2f} сек")
print(f"Chunks: {chunk_count}")
print(f"Символов: {len(total_content)}")
print(f"Скорость: {len(total_content)/(end_time-start_time):.1f} симв/сек")
Stop sequences в streaming
stream = client.chat.completions.create(
model="default",
messages=[{
"role": "system",
"content": "Создавай список и заканчивай словом КОНЕЦ"
}, {
"role": "user",
"content": "Дай советы по изучению Python"
}],
stream=True,
stop=["КОНЕЦ", "END"], # Остановочные последовательности
max_tokens=300
)
for chunk in stream:
choice = chunk.choices[0]
if choice.delta.content:
print(choice.delta.content, end="", flush=True)
# Проверяем причину остановки
if choice.finish_reason == "stop":
print("\n[Остановлено по stop sequence]")
elif choice.finish_reason == "length":
print("\n[Достигнут лимит токенов]")
Обработка ошибок в streaming
Базовая обработка ошибок
try:
stream = client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Тест"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
except Exception as e:
print(f"\nОшибка streaming: {e}")
Продвинутая обработка ошибок
import time
from evolution_openai.exceptions import EvolutionOpenAIError
def robust_streaming(messages, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="default",
messages=messages,
stream=True,
timeout=30
)
content_buffer = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
content_buffer += content
return content_buffer # Успешно завершено
except EvolutionOpenAIError as e:
print(f"\nОшибка API (попытка {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
print(f"\nНеожиданная ошибка: {e}")
break
return None # Все попытки неудачны
Streaming в веб-приложениях
Server-Sent Events (SSE)
from flask import Flask, Response
import json
app = Flask(__name__)
@app.route('/stream-chat')
def stream_chat():
message = request.args.get('message', 'Привет!')
def generate():
try:
stream = client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": message}],
stream=True,
max_tokens=200
)
for chunk in stream:
if chunk.choices[0].delta.content:
data = {
'content': chunk.choices[0].delta.content,
'done': False
}
yield f"data: {json.dumps(data)}\n\n"
# Сигнал завершения
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
error_data = {'error': str(e), 'done': True}
yield f"data: {json.dumps(error_data)}\n\n"
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
FastAPI с streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.get("/stream")
async def stream_endpoint(message: str = "Привет!"):
async def generate():
try:
async with AsyncOpenAI(
key_id="your_key_id",
secret="your_secret",
base_url="https://your-endpoint.cloud.ru/v1"
) as client:
stream = await client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": message}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
data = {
'content': chunk.choices[0].delta.content,
'done': False
}
yield f"data: {json.dumps(data)}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
error_data = {'error': str(e)}
yield f"data: {json.dumps(error_data)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
Клиентская сторона (JavaScript)
<!DOCTYPE html>
<html>
<head>
<title>Streaming Chat</title>
</head>
<body>
<div id="response"></div>
<button onclick="startStream()">Начать streaming</button>
<script>
function startStream() {
const responseDiv = document.getElementById('response');
responseDiv.innerHTML = '';
const eventSource = new EventSource('/stream-chat?message=Расскажи анекдот');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.error) {
responseDiv.innerHTML += `<div style="color: red;">Ошибка: ${data.error}</div>`;
eventSource.close();
} else if (data.done) {
responseDiv.innerHTML += '<div>✅ Завершено</div>';
eventSource.close();
} else {
responseDiv.innerHTML += data.content;
}
};
eventSource.onerror = function(event) {
console.error('SSE error:', event);
eventSource.close();
};
}
</script>
</body>
</html>
Паттерны и лучшие практики
Буферизация для UI
import time
import threading
class StreamBuffer:
def __init__(self, update_callback, buffer_delay=0.1):
self.buffer = ""
self.update_callback = update_callback
self.buffer_delay = buffer_delay
self.last_update = time.time()
self.timer = None
def add_content(self, content):
self.buffer += content
# Обновляем UI не чаще чем buffer_delay
now = time.time()
if now - self.last_update >= self.buffer_delay:
self.flush()
else:
# Планируем отложенное обновление
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(self.buffer_delay, self.flush)
self.timer.start()
def flush(self):
if self.buffer:
self.update_callback(self.buffer)
self.buffer = ""
self.last_update = time.time()
if self.timer:
self.timer.cancel()
self.timer = None
# Использование
def update_ui(text):
print(f"UI Update: {text}")
buffer = StreamBuffer(update_ui, buffer_delay=0.2)
stream = client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Длинный ответ..."}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
buffer.add_content(chunk.choices[0].delta.content)
buffer.flush() # Принудительная отправка остатков
Множественный streaming
import asyncio
async def multiple_streams():
async with AsyncOpenAI(
key_id="your_key_id",
secret="your_secret",
base_url="https://your-endpoint.cloud.ru/v1"
) as client:
questions = [
"Что такое Python?",
"Объясни машинное обучение",
"Расскажи про веб-разработку"
]
async def handle_stream(question, stream_id):
print(f"\n=== Поток {stream_id}: {question} ===")
stream = await client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": question}],
stream=True,
max_tokens=100
)
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(f"[{stream_id}] {content}", end="", flush=True)
print(f"\n=== Поток {stream_id} завершен ===")
# Запускаем все потоки параллельно
tasks = [
handle_stream(question, i)
for i, question in enumerate(questions)
]
await asyncio.gather(*tasks)
asyncio.run(multiple_streams())
Мониторинг производительности
import time
from collections import deque
class StreamingMetrics:
def __init__(self, window_size=100):
self.chunks = deque(maxlen=window_size)
self.start_time = None
self.first_chunk_time = None
self.total_chars = 0
def start(self):
self.start_time = time.time()
def add_chunk(self, chunk_content):
now = time.time()
if self.first_chunk_time is None:
self.first_chunk_time = now
self.chunks.append({
'timestamp': now,
'content': chunk_content,
'length': len(chunk_content)
})
self.total_chars += len(chunk_content)
def get_stats(self):
if not self.chunks or not self.start_time:
return {}
now = time.time()
total_time = now - self.start_time
time_to_first_chunk = (self.first_chunk_time - self.start_time
if self.first_chunk_time else 0)
return {
'total_time': total_time,
'time_to_first_chunk': time_to_first_chunk,
'total_chunks': len(self.chunks),
'total_chars': self.total_chars,
'chars_per_second': self.total_chars / total_time if total_time > 0 else 0,
'chunks_per_second': len(self.chunks) / total_time if total_time > 0 else 0,
'avg_chunk_size': self.total_chars / len(self.chunks)
}
# Использование
metrics = StreamingMetrics()
metrics.start()
stream = client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Длинный ответ..."}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
metrics.add_chunk(content)
print(content, end="", flush=True)
stats = metrics.get_stats()
print(f"\n\nМетрики streaming:")
for key, value in stats.items():
print(f"{key}: {value:.2f}")
Советы по оптимизации
Используйте буферизацию для UI обновлений
Обрабатывайте ошибки gracefully с retry логикой
Мониторьте производительность и задержки
Ограничивайте количество одновременных streams
Правильно закрывайте streams во избежание утечек ресурсов
Используйте stop sequences для контроля генерации
Реализуйте cancellation для длинных операций