Streaming
Lightfast Core provides comprehensive streaming support for real-time AI responses, including smooth streaming, resumable streams, and custom transformations.
How Streaming Works
The agent streaming pipeline:
- Message Processing - Validate and prepare messages
- Context Assembly - Merge system, request, and runtime contexts
- Tool Resolution - Resolve tool factories with context
- Stream Generation - Call the language model
- Transformation - Apply streaming transformations
- Response Delivery - Stream to client
Basic Streaming
Every agent automatically supports streaming:
const agent = createAgent({
name: "streamer",
system: "You are a helpful assistant.",
model: gateway("openai/gpt-5-nano"),
tools: {},
createRuntimeContext: () => ({}),
});
// Stream response
const { result, streamId } = await agent.stream({
sessionId: "session-123",
messages: [{ role: "user", content: "Hello!" }],
memory,
resourceId: "user-123",
systemContext: { sessionId: "session-123", resourceId: "user-123" },
requestContext: {},
});
// Convert to HTTP response
return result.toUIMessageStreamResponse();
Smooth Streaming
Apply smooth streaming for better UX:
import { smoothStream } from "ai";
const agent = createAgent({
// ... other config
experimental_transform: smoothStream({
delayInMs: 25, // Delay between chunks
chunking: "word", // Chunk by word boundaries
}),
});
Chunking Options
"word"
- Break at word boundaries (recommended)"sentence"
- Break at sentence boundaries"character"
- Character-by-character (typewriter effect)"none"
- No chunking, raw stream
Delay Configuration
// Fast streaming (minimal delay)
experimental_transform: smoothStream({
delayInMs: 10,
chunking: "word",
})
// Moderate streaming (balanced)
experimental_transform: smoothStream({
delayInMs: 25,
chunking: "word",
})
// Slow streaming (dramatic effect)
experimental_transform: smoothStream({
delayInMs: 50,
chunking: "character",
})
Stream Events
Monitor stream progress with callbacks:
const agent = createAgent({
// ... config
onChunk: ({ chunk }) => {
// Called for each stream chunk
if (chunk.type === "text-delta") {
console.log("Text:", chunk.text);
} else if (chunk.type === "tool-call") {
console.log("Tool called:", chunk.toolName);
} else if (chunk.type === "tool-result") {
console.log("Tool result:", chunk.result);
}
},
onStepFinish: (step) => {
// Called after each step completes
console.log("Step completed:", {
text: step.text,
toolCalls: step.toolCalls,
toolResults: step.toolResults,
});
},
onFinish: (result) => {
// Called when streaming completes
console.log("Streaming complete:", {
text: result.text,
usage: result.usage,
finishReason: result.finishReason,
});
},
});
Stream Types
Text Streaming
Basic text generation:
// Client receives text chunks
{
type: "text-delta",
text: "Hello, "
}
{
type: "text-delta",
text: "how can "
}
{
type: "text-delta",
text: "I help?"
}
Tool Call Streaming
When tools are invoked:
// Tool call start
{
type: "tool-call",
toolCallId: "call-123",
toolName: "search",
args: { query: "weather" }
}
// Tool execution
{
type: "tool-result",
toolCallId: "call-123",
toolName: "search",
result: { results: [...] }
}
Reasoning Streaming (Anthropic)
For models with thinking/reasoning:
// Enable reasoning in response
const streamOptions = {
sendReasoning: true, // Include reasoning in stream
};
// Client receives reasoning chunks
{
type: "reasoning-delta",
reasoning: "Let me think about this..."
}
Resumable Streams
Enable stream resumption for reliability:
// In your handler
return fetchRequestHandler({
agent,
sessionId,
memory,
req,
resourceId: userId,
enableResume: true, // Enable resumable streams
});
How Resumption Works
- Stream Creation - Each stream gets a unique ID
- Storage - Stream ID stored in memory (Redis)
- Interruption - Connection drops
- Resume Request - GET request to same endpoint
- Continuation - Stream resumes from last position
Client Implementation
// Start streaming
async function startStream(sessionId: string, message: string) {
const response = await fetch(`/api/chat/${sessionId}`, {
method: "POST",
body: JSON.stringify({
messages: [{ role: "user", content: message }],
}),
});
return processStream(response);
}
// Resume if interrupted
async function resumeStream(sessionId: string) {
const response = await fetch(`/api/chat/${sessionId}`, {
method: "GET", // GET resumes the stream
});
if (response.status === 204) {
// No stream to resume
return null;
}
return processStream(response);
}
// Process stream with error handling
async function processStream(response: Response) {
const reader = response.body?.getReader();
if (!reader) return;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Process chunk
const text = new TextDecoder().decode(value);
console.log("Received:", text);
}
} catch (error) {
// Connection dropped, try resuming
console.log("Stream interrupted, resuming...");
return resumeStream(sessionId);
}
}
Custom Transformations
Create custom stream transformations:
// Custom transform that adds timestamps
const timestampTransform = () => {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue({
...chunk,
timestamp: Date.now(),
});
},
});
};
const agent = createAgent({
// ... config
experimental_transform: timestampTransform(),
});
Filtering Transformation
Filter certain types of chunks:
const filterTransform = (allowedTypes: string[]) => {
return new TransformStream({
transform(chunk, controller) {
if (allowedTypes.includes(chunk.type)) {
controller.enqueue(chunk);
}
},
});
};
// Only stream text, not tool calls
experimental_transform: filterTransform(["text-delta"]),
Buffering Transformation
Buffer chunks for batch processing:
const bufferTransform = (bufferSize: number) => {
let buffer: any[] = [];
return new TransformStream({
transform(chunk, controller) {
buffer.push(chunk);
if (buffer.length >= bufferSize) {
controller.enqueue({
type: "buffered-chunks",
chunks: buffer,
});
buffer = [];
}
},
flush(controller) {
if (buffer.length > 0) {
controller.enqueue({
type: "buffered-chunks",
chunks: buffer,
});
}
},
});
};
experimental_transform: bufferTransform(5),
Client-Side Streaming
Using Vercel AI SDK React
"use client";
import { useChat } from "ai/react";
export function ChatComponent() {
const {
messages,
input,
handleInputChange,
handleSubmit,
isLoading,
error,
} = useChat({
api: "/api/chat",
streamProtocol: "data", // Use data protocol for richer events
});
return (
<div>
{messages.map((message) => (
<div key={message.id}>
<strong>{message.role}:</strong>
<span>{message.content}</span>
</div>
))}
{isLoading && <div>Assistant is typing...</div>}
{error && <div>Error: {error.message}</div>}
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={handleInputChange}
disabled={isLoading}
/>
<button type="submit" disabled={isLoading}>
Send
</button>
</form>
</div>
);
}
Manual Stream Processing
async function* streamChat(message: string) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [{ role: "user", content: message }],
}),
});
if (!response.body) throw new Error("No response body");
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") return;
try {
const parsed = JSON.parse(data);
yield parsed;
} catch (e) {
console.error("Failed to parse:", data);
}
}
}
}
}
// Usage
for await (const chunk of streamChat("Hello!")) {
console.log("Received chunk:", chunk);
}
Stream Performance
Optimize Chunk Size
Balance between responsiveness and efficiency:
// Small chunks - more responsive
experimental_transform: smoothStream({
delayInMs: 10,
chunking: "character",
})
// Large chunks - more efficient
experimental_transform: smoothStream({
delayInMs: 50,
chunking: "sentence",
})
Network Considerations
Configure for different network conditions:
const getStreamConfig = (connectionSpeed: "slow" | "medium" | "fast") => {
switch (connectionSpeed) {
case "slow":
return {
delayInMs: 100,
chunking: "sentence" as const,
};
case "medium":
return {
delayInMs: 25,
chunking: "word" as const,
};
case "fast":
return {
delayInMs: 5,
chunking: "word" as const,
};
}
};
experimental_transform: smoothStream(
getStreamConfig(detectConnectionSpeed())
),
Compression
Enable compression for large responses:
// In your response headers
return result.toUIMessageStreamResponse({
headers: {
"Content-Encoding": "gzip",
"Transfer-Encoding": "chunked",
},
});
Error Handling
Handle streaming errors gracefully:
const agent = createAgent({
// ... config
onError: ({ error }) => {
console.error("Stream error:", error);
// Attempt recovery
if (error.code === "ECONNRESET") {
// Connection reset, client should retry
return { retry: true, delay: 1000 };
}
// Fatal error
return { retry: false, message: "Stream failed" };
},
});
Client-Side Error Recovery
class StreamManager {
private retryCount = 0;
private maxRetries = 3;
async streamWithRetry(message: string): Promise<void> {
try {
await this.stream(message);
this.retryCount = 0; // Reset on success
} catch (error) {
if (this.retryCount < this.maxRetries) {
this.retryCount++;
const delay = Math.min(1000 * Math.pow(2, this.retryCount), 10000);
console.log(`Retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
return this.streamWithRetry(message);
}
throw error; // Max retries exceeded
}
}
private async stream(message: string): Promise<void> {
// Streaming implementation
}
}
Monitoring Streams
Track streaming metrics:
const agent = createAgent({
// ... config
onChunk: ({ chunk }) => {
metrics.increment("stream.chunks", {
type: chunk.type,
});
},
onFinish: (result) => {
metrics.record("stream.complete", {
duration: Date.now() - startTime,
tokens: result.usage.totalTokens,
chunks: chunkCount,
});
},
onError: ({ error }) => {
metrics.increment("stream.errors", {
error: error.code,
});
},
});
Best Practices
1. Choose Appropriate Delays
Match delay to use case:
// Conversational - smooth, natural
delayInMs: 25
// Code generation - faster feedback
delayInMs: 10
// Dramatic effect - slower
delayInMs: 50
2. Handle Disconnections
Always implement reconnection logic:
// Server: Enable resumable streams
enableResume: true
// Client: Implement retry logic
onError: () => reconnect()
3. Optimize for Mobile
Adapt to network conditions:
const isMobile = /mobile/i.test(navigator.userAgent);
const isSlowConnection = navigator.connection?.effectiveType === "2g";
const streamConfig = {
delayInMs: isMobile || isSlowConnection ? 50 : 25,
chunking: isMobile ? "sentence" : "word",
};
4. Provide Feedback
Show streaming status to users:
// Show typing indicator
{isLoading && <TypingIndicator />}
// Show partial content
{streamedContent && <div>{streamedContent}</div>}
// Show progress
{progress && <ProgressBar value={progress} />}
5. Test Streaming
Test various scenarios:
describe("Streaming", () => {
it("should handle interruptions", async () => {
// Start stream
const stream = await startStream();
// Simulate interruption
stream.abort();
// Resume should work
const resumed = await resumeStream();
expect(resumed).toBeDefined();
});
it("should apply transformations", async () => {
const chunks: any[] = [];
await processStream((chunk) => {
chunks.push(chunk);
});
// Verify transformation applied
expect(chunks.every(c => c.timestamp)).toBe(true);
});
});
Next Steps
- Learn about Memory Adapters for state persistence
- Explore Error Handling for robust streams
- See Integration Examples for complete implementations