LoginSign up
GitHub

Streaming

Lightfast Core provides comprehensive streaming support for real-time AI responses, including smooth streaming, resumable streams, and custom transformations.

How Streaming Works

The agent streaming pipeline:

  1. Message Processing - Validate and prepare messages
  2. Context Assembly - Merge system, request, and runtime contexts
  3. Tool Resolution - Resolve tool factories with context
  4. Stream Generation - Call the language model
  5. Transformation - Apply streaming transformations
  6. Response Delivery - Stream to client

Basic Streaming

Every agent automatically supports streaming:

const agent = createAgent({
  name: "streamer",
  system: "You are a helpful assistant.",
  model: gateway("openai/gpt-5-nano"),
  tools: {},
  createRuntimeContext: () => ({}),
});

// Stream response
const { result, streamId } = await agent.stream({
  sessionId: "session-123",
  messages: [{ role: "user", content: "Hello!" }],
  memory,
  resourceId: "user-123",
  systemContext: { sessionId: "session-123", resourceId: "user-123" },
  requestContext: {},
});

// Convert to HTTP response
return result.toUIMessageStreamResponse();

Smooth Streaming

Apply smooth streaming for better UX:

import { smoothStream } from "ai";

const agent = createAgent({
  // ... other config
  experimental_transform: smoothStream({
    delayInMs: 25,      // Delay between chunks
    chunking: "word",   // Chunk by word boundaries
  }),
});

Chunking Options

  • "word" - Break at word boundaries (recommended)
  • "sentence" - Break at sentence boundaries
  • "character" - Character-by-character (typewriter effect)
  • "none" - No chunking, raw stream

Delay Configuration

// Fast streaming (minimal delay)
experimental_transform: smoothStream({
  delayInMs: 10,
  chunking: "word",
})

// Moderate streaming (balanced)
experimental_transform: smoothStream({
  delayInMs: 25,
  chunking: "word",
})

// Slow streaming (dramatic effect)
experimental_transform: smoothStream({
  delayInMs: 50,
  chunking: "character",
})

Stream Events

Monitor stream progress with callbacks:

const agent = createAgent({
  // ... config
  
  onChunk: ({ chunk }) => {
    // Called for each stream chunk
    if (chunk.type === "text-delta") {
      console.log("Text:", chunk.text);
    } else if (chunk.type === "tool-call") {
      console.log("Tool called:", chunk.toolName);
    } else if (chunk.type === "tool-result") {
      console.log("Tool result:", chunk.result);
    }
  },
  
  onStepFinish: (step) => {
    // Called after each step completes
    console.log("Step completed:", {
      text: step.text,
      toolCalls: step.toolCalls,
      toolResults: step.toolResults,
    });
  },
  
  onFinish: (result) => {
    // Called when streaming completes
    console.log("Streaming complete:", {
      text: result.text,
      usage: result.usage,
      finishReason: result.finishReason,
    });
  },
});

Stream Types

Text Streaming

Basic text generation:

// Client receives text chunks
{
  type: "text-delta",
  text: "Hello, "
}
{
  type: "text-delta",
  text: "how can "
}
{
  type: "text-delta",
  text: "I help?"
}

Tool Call Streaming

When tools are invoked:

// Tool call start
{
  type: "tool-call",
  toolCallId: "call-123",
  toolName: "search",
  args: { query: "weather" }
}

// Tool execution
{
  type: "tool-result",
  toolCallId: "call-123",
  toolName: "search",
  result: { results: [...] }
}

Reasoning Streaming (Anthropic)

For models with thinking/reasoning:

// Enable reasoning in response
const streamOptions = {
  sendReasoning: true,  // Include reasoning in stream
};

// Client receives reasoning chunks
{
  type: "reasoning-delta",
  reasoning: "Let me think about this..."
}

Resumable Streams

Enable stream resumption for reliability:

// In your handler
return fetchRequestHandler({
  agent,
  sessionId,
  memory,
  req,
  resourceId: userId,
  enableResume: true,  // Enable resumable streams
});

How Resumption Works

  1. Stream Creation - Each stream gets a unique ID
  2. Storage - Stream ID stored in memory (Redis)
  3. Interruption - Connection drops
  4. Resume Request - GET request to same endpoint
  5. Continuation - Stream resumes from last position

Client Implementation

// Start streaming
async function startStream(sessionId: string, message: string) {
  const response = await fetch(`/api/chat/${sessionId}`, {
    method: "POST",
    body: JSON.stringify({
      messages: [{ role: "user", content: message }],
    }),
  });
  
  return processStream(response);
}

// Resume if interrupted
async function resumeStream(sessionId: string) {
  const response = await fetch(`/api/chat/${sessionId}`, {
    method: "GET",  // GET resumes the stream
  });
  
  if (response.status === 204) {
    // No stream to resume
    return null;
  }
  
  return processStream(response);
}

// Process stream with error handling
async function processStream(response: Response) {
  const reader = response.body?.getReader();
  if (!reader) return;
  
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      // Process chunk
      const text = new TextDecoder().decode(value);
      console.log("Received:", text);
    }
  } catch (error) {
    // Connection dropped, try resuming
    console.log("Stream interrupted, resuming...");
    return resumeStream(sessionId);
  }
}

Custom Transformations

Create custom stream transformations:

// Custom transform that adds timestamps
const timestampTransform = () => {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue({
        ...chunk,
        timestamp: Date.now(),
      });
    },
  });
};

const agent = createAgent({
  // ... config
  experimental_transform: timestampTransform(),
});

Filtering Transformation

Filter certain types of chunks:

const filterTransform = (allowedTypes: string[]) => {
  return new TransformStream({
    transform(chunk, controller) {
      if (allowedTypes.includes(chunk.type)) {
        controller.enqueue(chunk);
      }
    },
  });
};

// Only stream text, not tool calls
experimental_transform: filterTransform(["text-delta"]),

Buffering Transformation

Buffer chunks for batch processing:

const bufferTransform = (bufferSize: number) => {
  let buffer: any[] = [];
  
  return new TransformStream({
    transform(chunk, controller) {
      buffer.push(chunk);
      
      if (buffer.length >= bufferSize) {
        controller.enqueue({
          type: "buffered-chunks",
          chunks: buffer,
        });
        buffer = [];
      }
    },
    
    flush(controller) {
      if (buffer.length > 0) {
        controller.enqueue({
          type: "buffered-chunks",
          chunks: buffer,
        });
      }
    },
  });
};

experimental_transform: bufferTransform(5),

Client-Side Streaming

Using Vercel AI SDK React

"use client";

import { useChat } from "ai/react";

export function ChatComponent() {
  const { 
    messages, 
    input, 
    handleInputChange, 
    handleSubmit,
    isLoading,
    error,
  } = useChat({
    api: "/api/chat",
    streamProtocol: "data",  // Use data protocol for richer events
  });
  
  return (
    <div>
      {messages.map((message) => (
        <div key={message.id}>
          <strong>{message.role}:</strong>
          <span>{message.content}</span>
        </div>
      ))}
      
      {isLoading && <div>Assistant is typing...</div>}
      {error && <div>Error: {error.message}</div>}
      
      <form onSubmit={handleSubmit}>
        <input
          value={input}
          onChange={handleInputChange}
          disabled={isLoading}
        />
        <button type="submit" disabled={isLoading}>
          Send
        </button>
      </form>
    </div>
  );
}

Manual Stream Processing

async function* streamChat(message: string) {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      messages: [{ role: "user", content: message }],
    }),
  });
  
  if (!response.body) throw new Error("No response body");
  
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    const chunk = decoder.decode(value);
    const lines = chunk.split("\n");
    
    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = line.slice(6);
        if (data === "[DONE]") return;
        
        try {
          const parsed = JSON.parse(data);
          yield parsed;
        } catch (e) {
          console.error("Failed to parse:", data);
        }
      }
    }
  }
}

// Usage
for await (const chunk of streamChat("Hello!")) {
  console.log("Received chunk:", chunk);
}

Stream Performance

Optimize Chunk Size

Balance between responsiveness and efficiency:

// Small chunks - more responsive
experimental_transform: smoothStream({
  delayInMs: 10,
  chunking: "character",
})

// Large chunks - more efficient
experimental_transform: smoothStream({
  delayInMs: 50,
  chunking: "sentence",
})

Network Considerations

Configure for different network conditions:

const getStreamConfig = (connectionSpeed: "slow" | "medium" | "fast") => {
  switch (connectionSpeed) {
    case "slow":
      return {
        delayInMs: 100,
        chunking: "sentence" as const,
      };
    case "medium":
      return {
        delayInMs: 25,
        chunking: "word" as const,
      };
    case "fast":
      return {
        delayInMs: 5,
        chunking: "word" as const,
      };
  }
};

experimental_transform: smoothStream(
  getStreamConfig(detectConnectionSpeed())
),

Compression

Enable compression for large responses:

// In your response headers
return result.toUIMessageStreamResponse({
  headers: {
    "Content-Encoding": "gzip",
    "Transfer-Encoding": "chunked",
  },
});

Error Handling

Handle streaming errors gracefully:

const agent = createAgent({
  // ... config
  
  onError: ({ error }) => {
    console.error("Stream error:", error);
    
    // Attempt recovery
    if (error.code === "ECONNRESET") {
      // Connection reset, client should retry
      return { retry: true, delay: 1000 };
    }
    
    // Fatal error
    return { retry: false, message: "Stream failed" };
  },
});

Client-Side Error Recovery

class StreamManager {
  private retryCount = 0;
  private maxRetries = 3;
  
  async streamWithRetry(message: string): Promise<void> {
    try {
      await this.stream(message);
      this.retryCount = 0; // Reset on success
    } catch (error) {
      if (this.retryCount < this.maxRetries) {
        this.retryCount++;
        const delay = Math.min(1000 * Math.pow(2, this.retryCount), 10000);
        
        console.log(`Retrying in ${delay}ms...`);
        await new Promise(resolve => setTimeout(resolve, delay));
        
        return this.streamWithRetry(message);
      }
      
      throw error; // Max retries exceeded
    }
  }
  
  private async stream(message: string): Promise<void> {
    // Streaming implementation
  }
}

Monitoring Streams

Track streaming metrics:

const agent = createAgent({
  // ... config
  
  onChunk: ({ chunk }) => {
    metrics.increment("stream.chunks", {
      type: chunk.type,
    });
  },
  
  onFinish: (result) => {
    metrics.record("stream.complete", {
      duration: Date.now() - startTime,
      tokens: result.usage.totalTokens,
      chunks: chunkCount,
    });
  },
  
  onError: ({ error }) => {
    metrics.increment("stream.errors", {
      error: error.code,
    });
  },
});

Best Practices

1. Choose Appropriate Delays

Match delay to use case:

// Conversational - smooth, natural
delayInMs: 25

// Code generation - faster feedback
delayInMs: 10

// Dramatic effect - slower
delayInMs: 50

2. Handle Disconnections

Always implement reconnection logic:

// Server: Enable resumable streams
enableResume: true

// Client: Implement retry logic
onError: () => reconnect()

3. Optimize for Mobile

Adapt to network conditions:

const isMobile = /mobile/i.test(navigator.userAgent);
const isSlowConnection = navigator.connection?.effectiveType === "2g";

const streamConfig = {
  delayInMs: isMobile || isSlowConnection ? 50 : 25,
  chunking: isMobile ? "sentence" : "word",
};

4. Provide Feedback

Show streaming status to users:

// Show typing indicator
{isLoading && <TypingIndicator />}

// Show partial content
{streamedContent && <div>{streamedContent}</div>}

// Show progress
{progress && <ProgressBar value={progress} />}

5. Test Streaming

Test various scenarios:

describe("Streaming", () => {
  it("should handle interruptions", async () => {
    // Start stream
    const stream = await startStream();
    
    // Simulate interruption
    stream.abort();
    
    // Resume should work
    const resumed = await resumeStream();
    expect(resumed).toBeDefined();
  });
  
  it("should apply transformations", async () => {
    const chunks: any[] = [];
    
    await processStream((chunk) => {
      chunks.push(chunk);
    });
    
    // Verify transformation applied
    expect(chunks.every(c => c.timestamp)).toBe(true);
  });
});

Next Steps