LoginSign up
GitHub

Memory

Memory adapters in Lightfast Core provide persistent storage for conversation history, session metadata, and stream management. They enable stateful conversations that maintain context across multiple interactions.

Memory Interface

All memory adapters implement the core Memory interface:

interface Memory<TMessage extends UIMessage = UIMessage, TContext = {}> {
  // Message operations
  appendMessage(params: { 
    sessionId: string; 
    message: TMessage;
    context?: TContext;
  }): Promise<void>;
  
  getMessages(sessionId: string): Promise<TMessage[]>;

  // Session operations
  createSession(params: { 
    sessionId: string; 
    resourceId: string;
    context?: TContext;
  }): Promise<void>;
  
  getSession(sessionId: string): Promise<{ resourceId: string } | null>;

  // Stream operations
  createStream(params: { 
    sessionId: string; 
    streamId: string;
    context?: TContext;
  }): Promise<void>;
  
  getSessionStreams(sessionId: string): Promise<string[]>;
}

Built-in Adapters

Lightfast Core provides two memory adapters out of the box:

Redis Memory

The RedisMemory adapter uses Upstash Redis for scalable, persistent storage:

import { RedisMemory } from "lightfast/agent/memory/adapters/redis";

const memory = new RedisMemory({
  url: process.env.KV_REST_API_URL,
  token: process.env.KV_REST_API_TOKEN,
});

Features:

  • Persistent storage across server restarts
  • Scalable for production use
  • JSON storage for complex message structures
  • TTL support for streams (24 hours default)
  • No TTL for sessions and messages (persist forever)

Key Structure:

session:{sessionId}:metadata    # Session metadata
session:{sessionId}:messages    # Conversation history
session:{sessionId}:streams     # Active streams
stream:{streamId}               # Stream data

In-Memory Adapter

The InMemoryMemory adapter stores data in process memory:

import { InMemoryMemory } from "lightfast/memory/adapters/in-memory";

const memory = new InMemoryMemory();

Features:

  • Zero configuration
  • Fast access
  • Perfect for development and testing
  • Data lost on server restart

Use Cases:

  • Local development
  • Unit testing
  • Prototyping
  • Stateless deployments with external state

Session Management

Sessions are the core organizational unit for conversations:

Creating Sessions

Sessions are automatically created on the first message:

// In fetchRequestHandler (handled automatically)
if (!sessionExists) {
  await memory.createSession({
    sessionId: "session-123",
    resourceId: "user-456",  // Usually the authenticated user ID
    context: {               // Optional metadata
      createdAt: Date.now(),
      source: "web",
    },
  });
}

Session Ownership

Sessions are bound to a resourceId (typically a user ID) for security:

// The framework automatically validates ownership
const session = await memory.getSession(sessionId);
if (session.resourceId !== currentUserId) {
  throw new SessionForbiddenError();
}

Session IDs

Choose session IDs based on your use case:

// User-specific singleton session
const sessionId = `user-${userId}`;

// Conversation-specific sessions
const sessionId = `conv-${conversationId}`;

// Time-based sessions
const sessionId = `${userId}-${Date.now()}`;

// Random sessions
const sessionId = crypto.randomUUID();

Message Management

Messages represent the conversation history:

Message Structure

Messages follow the Vercel AI SDK UIMessage format:

interface UIMessage {
  id: string;
  role: "user" | "assistant" | "system";
  content: string;
  createdAt?: Date;
  // Additional fields based on your needs
}

Appending Messages

Messages are automatically appended by the framework:

// User messages (on request)
await memory.appendMessage({
  sessionId,
  message: {
    id: "msg-1",
    role: "user",
    content: "Hello, how are you?",
  },
});

// Assistant messages (on completion)
await memory.appendMessage({
  sessionId,
  message: {
    id: "msg-2",
    role: "assistant",
    content: "I'm doing well, thank you!",
  },
});

Retrieving History

Get the full conversation history:

const messages = await memory.getMessages(sessionId);

// Messages are returned in chronological order
messages.forEach(msg => {
  console.log(`${msg.role}: ${msg.content}`);
});

Stream Management

Streams enable resumable responses for long-running generations:

Stream Creation

Streams are automatically created for each response:

// In agent.stream() (handled automatically)
const streamId = generateId();
await memory.createStream({
  sessionId,
  streamId,
  context: {
    startedAt: Date.now(),
  },
});

Stream Resumption

Enable resumable streams in your handler:

return fetchRequestHandler({
  agent,
  sessionId,
  memory,
  req,
  resourceId: userId,
  enableResume: true,  // Enable stream resumption
});

When enabled, GET requests will resume the most recent stream:

// GET /api/chat/session-123
// Automatically resumes the last stream if available

Stream TTL

Redis memory implements automatic cleanup:

  • Streams expire after 24 hours
  • Sessions and messages persist forever
  • Customize TTL in your adapter if needed

Custom Context

Pass additional context through memory operations:

interface MyContext {
  ipAddress: string;
  userAgent: string;
  timestamp: number;
}

const memory = new RedisMemory<UIMessage, MyContext>();

// Context flows through operations
await memory.createSession({
  sessionId,
  resourceId,
  context: {
    ipAddress: req.headers.get("x-forwarded-for"),
    userAgent: req.headers.get("user-agent"),
    timestamp: Date.now(),
  },
});

Creating Custom Adapters

Implement the Memory interface for custom storage backends:

import { Memory } from "lightfast/memory";
import { Database } from "your-database";

export class DatabaseMemory implements Memory {
  private db: Database;
  
  constructor(connectionString: string) {
    this.db = new Database(connectionString);
  }
  
  async appendMessage({ sessionId, message }) {
    await this.db.messages.insert({
      session_id: sessionId,
      message_id: message.id,
      role: message.role,
      content: message.content,
      created_at: new Date(),
    });
  }
  
  async getMessages(sessionId: string) {
    const rows = await this.db.messages
      .where({ session_id: sessionId })
      .orderBy("created_at", "asc");
    
    return rows.map(row => ({
      id: row.message_id,
      role: row.role,
      content: row.content,
      createdAt: row.created_at,
    }));
  }
  
  async createSession({ sessionId, resourceId }) {
    await this.db.sessions.insert({
      session_id: sessionId,
      resource_id: resourceId,
      created_at: new Date(),
    });
  }
  
  async getSession(sessionId: string) {
    const session = await this.db.sessions
      .where({ session_id: sessionId })
      .first();
    
    return session ? { resourceId: session.resource_id } : null;
  }
  
  async createStream({ sessionId, streamId }) {
    await this.db.streams.insert({
      stream_id: streamId,
      session_id: sessionId,
      created_at: new Date(),
    });
  }
  
  async getSessionStreams(sessionId: string) {
    const streams = await this.db.streams
      .where({ session_id: sessionId })
      .orderBy("created_at", "desc");
    
    return streams.map(s => s.stream_id);
  }
}

Performance Optimization

Caching Strategies

Implement caching for frequently accessed sessions:

class CachedMemory implements Memory {
  private cache = new Map<string, UIMessage[]>();
  private delegate: Memory;
  
  constructor(delegate: Memory) {
    this.delegate = delegate;
  }
  
  async getMessages(sessionId: string) {
    // Check cache first
    if (this.cache.has(sessionId)) {
      return this.cache.get(sessionId)!;
    }
    
    // Fall back to delegate
    const messages = await this.delegate.getMessages(sessionId);
    this.cache.set(sessionId, messages);
    
    // Expire cache after 5 minutes
    setTimeout(() => {
      this.cache.delete(sessionId);
    }, 5 * 60 * 1000);
    
    return messages;
  }
  
  async appendMessage(params) {
    // Invalidate cache on write
    this.cache.delete(params.sessionId);
    return this.delegate.appendMessage(params);
  }
  
  // Delegate other methods...
}

Batch Operations

Optimize for bulk operations when possible:

class BatchMemory implements Memory {
  private pendingMessages: Map<string, UIMessage[]> = new Map();
  
  async appendMessage({ sessionId, message }) {
    if (!this.pendingMessages.has(sessionId)) {
      this.pendingMessages.set(sessionId, []);
    }
    
    this.pendingMessages.get(sessionId)!.push(message);
    
    // Flush after delay or threshold
    if (this.pendingMessages.get(sessionId)!.length >= 10) {
      await this.flush(sessionId);
    }
  }
  
  private async flush(sessionId: string) {
    const messages = this.pendingMessages.get(sessionId);
    if (!messages || messages.length === 0) return;
    
    // Batch insert
    await this.batchInsert(sessionId, messages);
    this.pendingMessages.delete(sessionId);
  }
}

Error Handling

Memory operations can fail. The framework handles errors gracefully:

// Automatic error conversion in runtime
try {
  await memory.appendMessage({ sessionId, message });
} catch (error) {
  // Converted to appropriate API error
  if (error.message.includes("connection")) {
    throw new ServiceUnavailableError("Memory service unavailable");
  }
  throw new InternalServerError("Failed to save message");
}

Best Practices

1. Choose the Right Adapter

  • Development: Use InMemoryMemory for simplicity
  • Production: Use RedisMemory for persistence
  • High Scale: Consider custom adapters with your database

2. Session ID Strategy

// Good: Predictable, user-scoped
const sessionId = `${userId}-${conversationId}`;

// Good: Random, secure
const sessionId = crypto.randomUUID();

// Avoid: Guessable
const sessionId = "session-1";

3. Message Validation

Always validate messages before storage:

class ValidatingMemory implements Memory {
  async appendMessage({ message, ...params }) {
    // Validate message structure
    if (!message.id || !message.role || !message.content) {
      throw new Error("Invalid message structure");
    }
    
    // Sanitize content
    message.content = sanitizeHtml(message.content);
    
    return this.delegate.appendMessage({ ...params, message });
  }
}

4. Monitor Memory Usage

Track memory operations for performance:

class MonitoredMemory implements Memory {
  async getMessages(sessionId: string) {
    const start = Date.now();
    
    try {
      const messages = await this.delegate.getMessages(sessionId);
      
      metrics.record("memory.get_messages", {
        duration: Date.now() - start,
        messageCount: messages.length,
        sessionId,
      });
      
      return messages;
    } catch (error) {
      metrics.record("memory.error", {
        operation: "getMessages",
        error: error.message,
      });
      throw error;
    }
  }
}

5. Implement Cleanup

Remove old sessions periodically:

async function cleanupOldSessions(memory: Memory, daysOld = 30) {
  const cutoff = Date.now() - (daysOld * 24 * 60 * 60 * 1000);
  
  // Custom cleanup logic based on your adapter
  // This would require extending the Memory interface
}

Next Steps