Memory
Memory adapters in Lightfast Core provide persistent storage for conversation history, session metadata, and stream management. They enable stateful conversations that maintain context across multiple interactions.
Memory Interface
All memory adapters implement the core Memory
interface:
interface Memory<TMessage extends UIMessage = UIMessage, TContext = {}> {
// Message operations
appendMessage(params: {
sessionId: string;
message: TMessage;
context?: TContext;
}): Promise<void>;
getMessages(sessionId: string): Promise<TMessage[]>;
// Session operations
createSession(params: {
sessionId: string;
resourceId: string;
context?: TContext;
}): Promise<void>;
getSession(sessionId: string): Promise<{ resourceId: string } | null>;
// Stream operations
createStream(params: {
sessionId: string;
streamId: string;
context?: TContext;
}): Promise<void>;
getSessionStreams(sessionId: string): Promise<string[]>;
}
Built-in Adapters
Lightfast Core provides two memory adapters out of the box:
Redis Memory
The RedisMemory
adapter uses Upstash Redis for scalable, persistent storage:
import { RedisMemory } from "lightfast/agent/memory/adapters/redis";
const memory = new RedisMemory({
url: process.env.KV_REST_API_URL,
token: process.env.KV_REST_API_TOKEN,
});
Features:
- Persistent storage across server restarts
- Scalable for production use
- JSON storage for complex message structures
- TTL support for streams (24 hours default)
- No TTL for sessions and messages (persist forever)
Key Structure:
session:{sessionId}:metadata # Session metadata
session:{sessionId}:messages # Conversation history
session:{sessionId}:streams # Active streams
stream:{streamId} # Stream data
In-Memory Adapter
The InMemoryMemory
adapter stores data in process memory:
import { InMemoryMemory } from "lightfast/memory/adapters/in-memory";
const memory = new InMemoryMemory();
Features:
- Zero configuration
- Fast access
- Perfect for development and testing
- Data lost on server restart
Use Cases:
- Local development
- Unit testing
- Prototyping
- Stateless deployments with external state
Session Management
Sessions are the core organizational unit for conversations:
Creating Sessions
Sessions are automatically created on the first message:
// In fetchRequestHandler (handled automatically)
if (!sessionExists) {
await memory.createSession({
sessionId: "session-123",
resourceId: "user-456", // Usually the authenticated user ID
context: { // Optional metadata
createdAt: Date.now(),
source: "web",
},
});
}
Session Ownership
Sessions are bound to a resourceId
(typically a user ID) for security:
// The framework automatically validates ownership
const session = await memory.getSession(sessionId);
if (session.resourceId !== currentUserId) {
throw new SessionForbiddenError();
}
Session IDs
Choose session IDs based on your use case:
// User-specific singleton session
const sessionId = `user-${userId}`;
// Conversation-specific sessions
const sessionId = `conv-${conversationId}`;
// Time-based sessions
const sessionId = `${userId}-${Date.now()}`;
// Random sessions
const sessionId = crypto.randomUUID();
Message Management
Messages represent the conversation history:
Message Structure
Messages follow the Vercel AI SDK UIMessage
format:
interface UIMessage {
id: string;
role: "user" | "assistant" | "system";
content: string;
createdAt?: Date;
// Additional fields based on your needs
}
Appending Messages
Messages are automatically appended by the framework:
// User messages (on request)
await memory.appendMessage({
sessionId,
message: {
id: "msg-1",
role: "user",
content: "Hello, how are you?",
},
});
// Assistant messages (on completion)
await memory.appendMessage({
sessionId,
message: {
id: "msg-2",
role: "assistant",
content: "I'm doing well, thank you!",
},
});
Retrieving History
Get the full conversation history:
const messages = await memory.getMessages(sessionId);
// Messages are returned in chronological order
messages.forEach(msg => {
console.log(`${msg.role}: ${msg.content}`);
});
Stream Management
Streams enable resumable responses for long-running generations:
Stream Creation
Streams are automatically created for each response:
// In agent.stream() (handled automatically)
const streamId = generateId();
await memory.createStream({
sessionId,
streamId,
context: {
startedAt: Date.now(),
},
});
Stream Resumption
Enable resumable streams in your handler:
return fetchRequestHandler({
agent,
sessionId,
memory,
req,
resourceId: userId,
enableResume: true, // Enable stream resumption
});
When enabled, GET requests will resume the most recent stream:
// GET /api/chat/session-123
// Automatically resumes the last stream if available
Stream TTL
Redis memory implements automatic cleanup:
- Streams expire after 24 hours
- Sessions and messages persist forever
- Customize TTL in your adapter if needed
Custom Context
Pass additional context through memory operations:
interface MyContext {
ipAddress: string;
userAgent: string;
timestamp: number;
}
const memory = new RedisMemory<UIMessage, MyContext>();
// Context flows through operations
await memory.createSession({
sessionId,
resourceId,
context: {
ipAddress: req.headers.get("x-forwarded-for"),
userAgent: req.headers.get("user-agent"),
timestamp: Date.now(),
},
});
Creating Custom Adapters
Implement the Memory
interface for custom storage backends:
import { Memory } from "lightfast/memory";
import { Database } from "your-database";
export class DatabaseMemory implements Memory {
private db: Database;
constructor(connectionString: string) {
this.db = new Database(connectionString);
}
async appendMessage({ sessionId, message }) {
await this.db.messages.insert({
session_id: sessionId,
message_id: message.id,
role: message.role,
content: message.content,
created_at: new Date(),
});
}
async getMessages(sessionId: string) {
const rows = await this.db.messages
.where({ session_id: sessionId })
.orderBy("created_at", "asc");
return rows.map(row => ({
id: row.message_id,
role: row.role,
content: row.content,
createdAt: row.created_at,
}));
}
async createSession({ sessionId, resourceId }) {
await this.db.sessions.insert({
session_id: sessionId,
resource_id: resourceId,
created_at: new Date(),
});
}
async getSession(sessionId: string) {
const session = await this.db.sessions
.where({ session_id: sessionId })
.first();
return session ? { resourceId: session.resource_id } : null;
}
async createStream({ sessionId, streamId }) {
await this.db.streams.insert({
stream_id: streamId,
session_id: sessionId,
created_at: new Date(),
});
}
async getSessionStreams(sessionId: string) {
const streams = await this.db.streams
.where({ session_id: sessionId })
.orderBy("created_at", "desc");
return streams.map(s => s.stream_id);
}
}
Performance Optimization
Caching Strategies
Implement caching for frequently accessed sessions:
class CachedMemory implements Memory {
private cache = new Map<string, UIMessage[]>();
private delegate: Memory;
constructor(delegate: Memory) {
this.delegate = delegate;
}
async getMessages(sessionId: string) {
// Check cache first
if (this.cache.has(sessionId)) {
return this.cache.get(sessionId)!;
}
// Fall back to delegate
const messages = await this.delegate.getMessages(sessionId);
this.cache.set(sessionId, messages);
// Expire cache after 5 minutes
setTimeout(() => {
this.cache.delete(sessionId);
}, 5 * 60 * 1000);
return messages;
}
async appendMessage(params) {
// Invalidate cache on write
this.cache.delete(params.sessionId);
return this.delegate.appendMessage(params);
}
// Delegate other methods...
}
Batch Operations
Optimize for bulk operations when possible:
class BatchMemory implements Memory {
private pendingMessages: Map<string, UIMessage[]> = new Map();
async appendMessage({ sessionId, message }) {
if (!this.pendingMessages.has(sessionId)) {
this.pendingMessages.set(sessionId, []);
}
this.pendingMessages.get(sessionId)!.push(message);
// Flush after delay or threshold
if (this.pendingMessages.get(sessionId)!.length >= 10) {
await this.flush(sessionId);
}
}
private async flush(sessionId: string) {
const messages = this.pendingMessages.get(sessionId);
if (!messages || messages.length === 0) return;
// Batch insert
await this.batchInsert(sessionId, messages);
this.pendingMessages.delete(sessionId);
}
}
Error Handling
Memory operations can fail. The framework handles errors gracefully:
// Automatic error conversion in runtime
try {
await memory.appendMessage({ sessionId, message });
} catch (error) {
// Converted to appropriate API error
if (error.message.includes("connection")) {
throw new ServiceUnavailableError("Memory service unavailable");
}
throw new InternalServerError("Failed to save message");
}
Best Practices
1. Choose the Right Adapter
- Development: Use
InMemoryMemory
for simplicity - Production: Use
RedisMemory
for persistence - High Scale: Consider custom adapters with your database
2. Session ID Strategy
// Good: Predictable, user-scoped
const sessionId = `${userId}-${conversationId}`;
// Good: Random, secure
const sessionId = crypto.randomUUID();
// Avoid: Guessable
const sessionId = "session-1";
3. Message Validation
Always validate messages before storage:
class ValidatingMemory implements Memory {
async appendMessage({ message, ...params }) {
// Validate message structure
if (!message.id || !message.role || !message.content) {
throw new Error("Invalid message structure");
}
// Sanitize content
message.content = sanitizeHtml(message.content);
return this.delegate.appendMessage({ ...params, message });
}
}
4. Monitor Memory Usage
Track memory operations for performance:
class MonitoredMemory implements Memory {
async getMessages(sessionId: string) {
const start = Date.now();
try {
const messages = await this.delegate.getMessages(sessionId);
metrics.record("memory.get_messages", {
duration: Date.now() - start,
messageCount: messages.length,
sessionId,
});
return messages;
} catch (error) {
metrics.record("memory.error", {
operation: "getMessages",
error: error.message,
});
throw error;
}
}
}
5. Implement Cleanup
Remove old sessions periodically:
async function cleanupOldSessions(memory: Memory, daysOld = 30) {
const cutoff = Date.now() - (daysOld * 24 * 60 * 60 * 1000);
// Custom cleanup logic based on your adapter
// This would require extending the Memory interface
}
Next Steps
- Learn about Request Handlers for HTTP integration
- Explore Session Management patterns
- See Redis Memory for production setup