Services Architecture
Grid's services architecture follows a layered design pattern, organizing functionality from atomic building blocks to complex orchestration systems. This architecture ensures modularity, reusability, and clear separation of concerns.
Architecture Overview
Grid organizes services into three distinct layers:
┌─────────────────────────────────────────┐
│ Organism Level Services │
│ (ConversationLoop, AgentFlow) │
└─────────────────────────────────────────┘
↑
┌─────────────────────────────────────────┐
│ Composed Level Services │
│ (ConversationManager) │
└─────────────────────────────────────────┘
↑
┌──────────────────────────────────────────────┐
│ Atomic Level Services │
│ (History, Context, ToolExecutor, LLM, Voice) │
└──────────────────────────────────────────────┘
Atomic Level Services
These are the fundamental building blocks that handle specific, focused responsibilities. All services use a closure-based pattern - no classes, just functions returning objects with methods.
baseLLMService
The core service for LLM interactions:
const llmService = baseLLMService({
provider: "openai",
apiKey: process.env.OPENAI_API_KEY,
});
// Usage
const response = await llmService.generateText({
model: "gpt-4",
messages: [...],
tools: [...],
});
Key features:
- Provider abstraction (OpenAI, Anthropic)
- Automatic retry logic
- Token counting and limits
- Response streaming support
createConversationHistory
Manages message history using closure-based pattern:
export const createConversationHistory = (systemPrompt?: string) => {
// Private state in closure
const messages: Message[] = [];
if (systemPrompt) {
messages.push({ role: "system", content: systemPrompt });
}
// Public methods
return {
addMessage: (message: Message) => {
messages.push(message);
},
getMessages: () => [...messages], // Return copy
getNonSystemMessages: () =>
messages.filter(m => m.role !== "system"),
clear: () => {
const systemMsg = messages.find(m => m.role === "system");
messages.length = 0;
if (systemMsg) messages.push(systemMsg);
},
getLastMessageByRole: (role: MessageRole) =>
messages.findLast(m => m.role === role),
};
};
Features:
- Message storage and retrieval
- System message preservation
- Tool response handling
- Message filtering by role
createConversationContext
Maintains conversation state:
export const createConversationContext = () => {
// Private state
const state: Record<string, any> = {};
const metadata: ConversationMetadata = {
startTime: new Date(),
messageCount: 0,
toolCallCount: 0,
};
// Public API
return {
updateState: (key: string, value: any) => {
state[key] = value;
},
getState: () => ({ ...state }), // Return copy
updateMetadata: (key: string, value: any) => {
metadata[key] = value;
},
incrementMessageCount: () => {
metadata.messageCount++;
},
getMetrics: () => ({
duration: Date.now() - metadata.startTime.getTime(),
messages: metadata.messageCount,
toolCalls: metadata.toolCallCount,
}),
};
};
Use cases:
- User preferences
- Session state
- Tool results caching
- Conversation metrics
toolExecutor
Executes tools safely and efficiently:
export const createToolExecutor = (tools: Record<string, CoreTool>) => {
// Private helper functions
const validateParams = async (tool: CoreTool, params: any) => {
return tool.parameters.parseAsync(params);
};
// Public methods
return {
execute: async (toolCall: ToolCall) => {
const tool = tools[toolCall.name];
if (!tool) throw new Error(`Tool not found: ${toolCall.name}`);
try {
const validated = await validateParams(tool, toolCall.args);
const result = await tool.execute(validated);
return { success: true, result };
} catch (error) {
return { success: false, error: error.message };
}
},
getAvailableTools: () => Object.keys(tools),
};
};
Features:
- Parameter validation with Zod
- Error boundaries
- Tool discovery
- Result formatting
baseVoiceService
Provides voice capabilities for agents using the closure pattern:
export const baseVoiceService = (implementation: VoiceServiceImplementation): VoiceService => {
// Private state
let defaultVoice: Voice | null = null;
const voiceCache = new Map<string, Voice>();
// Private helper functions
const validateVoiceId = async (voiceId: string) => {
if (!voiceCache.has(voiceId)) {
const voice = await implementation.getVoice?.(voiceId);
if (voice) voiceCache.set(voiceId, voice);
}
return voiceCache.get(voiceId);
};
// Public API
return {
synthesize: async (text: string, options?: VoiceOptions) => {
const voiceId = options?.voiceId || defaultVoice?.id;
if (!voiceId) throw new Error("No voice specified");
return implementation.synthesize(text, options);
},
transcribe: async (audio: AudioData) => {
return implementation.transcribe(audio);
},
streamSynthesize: implementation.streamSynthesize || async function* (text, options) {
// Fallback to non-streaming
const result = await implementation.synthesize(text, options);
yield result.audio;
},
listVoices: async () => {
const voices = await implementation.listVoices();
// Cache voices for quick lookup
voices.forEach(v => voiceCache.set(v.id, v));
return voices;
},
getDefaultVoice: () => defaultVoice,
setDefaultVoice: (voiceId: string) => {
const voice = voiceCache.get(voiceId);
if (voice) defaultVoice = voice;
},
};
};
Features:
- Text-to-speech synthesis
- Speech-to-text transcription
- Voice management and caching
- Streaming support
- Provider abstraction
Composed Level Services
These services combine atomic services to provide higher-level functionality.
createConversationManager
Orchestrates history and context using the closure pattern:
export const createConversationManager = (options?: ConversationManagerOptions) => {
// Compose atomic services
const history = createConversationHistory(options?.historyOptions);
const context = createConversationContext(options?.contextOptions);
// Public API combining both services
return {
// Unified message handling
addUserMessage: (content: string) => {
history.addMessage({ role: "user", content });
context.incrementMessageCount();
},
processAgentResponse: (response: AgentResponse) => {
// Add assistant message
history.addMessage({
role: "assistant",
content: response.content,
toolCalls: response.toolCalls,
});
// Update context metrics
context.incrementMessageCount();
if (response.toolCalls?.length) {
context.incrementToolCallCount(response.toolCalls.length);
}
},
// Combined state access
getConversationState: () => ({
messages: history.getMessages(),
context: context.getState(),
metrics: context.getMetrics(),
}),
// Delegate to underlying services
...history, // All history methods
updateState: context.updateState,
getMetadata: context.getMetadata,
reset: () => {
history.clear();
context.resetState();
},
};
};
Responsibilities:
- Message flow coordination
- State synchronization
- Metrics tracking
- Unified interface
Organism Level Services
The highest level services that implement complete workflows and autonomous behaviors.
createConversationLoop
Manages complete conversation lifecycles with agent integration:
export const createConversationLoop = (options: ConversationLoopOptions) => {
// Private state
const manager = createConversationManager(options.managerOptions);
const agent = options.agent;
let isActive = true;
let turnCount = 0;
// Private helper functions
const resolveToolCalls = async (toolCalls: ToolCall[], maxRounds = 5) => {
let rounds = 0;
let currentToolCalls = toolCalls;
while (currentToolCalls.length > 0 && rounds < maxRounds) {
// Execute tools and collect results
for (const toolCall of currentToolCalls) {
const result = await options.toolExecutor.execute(toolCall);
manager.addToolResponse(toolCall.id, toolCall.name, result);
}
// Get next response with tool results
const response = await agent.act({
messages: manager.getMessages(),
});
currentToolCalls = response.toolCalls || [];
rounds++;
}
};
// Public API
return {
sendMessage: async (userMessage: string) => {
if (!isActive) throw new Error("Conversation has ended");
manager.addUserMessage(userMessage);
turnCount++;
const response = await agent.act({
messages: manager.getMessages(),
context: manager.getState(),
});
manager.processAgentResponse(response);
if (response.toolCalls?.length) {
await resolveToolCalls(response.toolCalls);
}
return response;
},
endConversation: () => {
isActive = false;
},
resetConversation: () => {
manager.reset();
isActive = true;
turnCount = 0;
},
getAnalytics: () => ({
...manager.getMetrics(),
turnCount,
isActive,
}),
exportConversation: () => ({
messages: manager.getMessages(),
context: manager.getState(),
metadata: {
turnCount,
exported: new Date().toISOString(),
},
}),
// Delegate manager methods
getMessages: manager.getMessages,
getConversationState: manager.getConversationState,
};
};
agentFlowService
Enables autonomous agent execution:
export const createAgentFlow = (options: AgentFlowOptions) => {
const { agent, goal, maxIterations = 5 } = options;
// Private state for autonomous execution
const executionHistory: ExecutionStep[] = [];
let isRunning = false;
return {
runAutonomous: async () => {
if (isRunning) throw new Error("Already running");
isRunning = true;
try {
for (let i = 0; i < maxIterations; i++) {
const step = await executeStep(i);
executionHistory.push(step);
if (step.isComplete) break;
}
return synthesizeResults(executionHistory);
} finally {
isRunning = false;
}
},
getExecutionHistory: () => [...executionHistory],
};
};
Service Integration
Composition Pattern
Services are composed together using function composition:
// Create services
const llmService = baseLLMService({
langfuse: { enabled: false }
});
const toolExecutor = createToolExecutor();
const voiceService = elevenlabsVoiceService({
apiKey: process.env.ELEVENLABS_API_KEY,
});
// Create an agent
const agent = createConfigurableAgent({
llmService,
toolExecutor,
voiceService, // Optional - enables voice capabilities
config: {
id: "example-agent",
type: "general",
version: "1.0.0",
prompts: {
system: "You are a helpful assistant"
},
metadata: {
id: "example-agent",
type: "general",
name: "Example Agent",
description: "An example agent",
capabilities: ["general"],
version: "1.0.0"
},
tools: {
builtin: [],
custom: [weatherTool, calculatorTool],
mcp: []
},
behavior: {
maxRetries: 3,
responseFormat: "text"
},
voice: {
enabled: true,
voiceId: "21m00Tcm4TlvDq8ikWAM",
autoSpeak: true,
}
}
});
// Create conversation loop with all features
const conversation = createConversationLoop({
agent,
onProgress: (update) => {
console.log(`[${update.type}] ${update.content}`);
},
});
// Use the composed service
const response = await conversation.sendMessage("What's the weather?");
Service Communication
Services communicate through:
- Direct method calls for synchronous operations
- Events for asynchronous notifications
- Shared state through context service
- Message passing for loosely coupled components
Observability Integration
All services integrate with Langfuse for comprehensive observability:
class ObservableService {
protected trace: LangfuseTrace;
async execute(operation: string, fn: Function) {
const span = this.trace.span({
name: operation,
startTime: new Date(),
});
try {
const result = await fn();
span.end({ output: result });
return result;
} catch (error) {
span.end({ level: "ERROR", statusMessage: error.message });
throw error;
}
}
}
Service Patterns
Error Handling
Consistent error handling using closure pattern:
export const createServiceWithErrorHandling = (options: ServiceOptions) => {
// Private error handling logic
const handleError = async (error: Error, context: string) => {
console.error(`[${context}] Error:`, error.message);
if (options.onError) {
await options.onError(error, context);
}
if (options.throwErrors) {
throw error;
}
return options.fallbackValue;
};
// Public API with error boundaries
return {
safeExecute: async <T>(
operation: () => Promise<T>,
context: string
): Promise<T> => {
try {
return await operation();
} catch (error) {
return handleError(error, context);
}
},
};
};
Caching
Services implement caching for performance:
export const createCachedService = () => {
// Private cache state
const cache = new Map<string, { value: any; expiry: number }>();
// Cache management functions
const isExpired = (expiry: number) => Date.now() > expiry;
const cleanup = () => {
for (const [key, entry] of cache.entries()) {
if (isExpired(entry.expiry)) {
cache.delete(key);
}
}
};
// Public API
return {
getCached: async <T>(
key: string,
factory: () => Promise<T>,
ttl: number = 3600000
): Promise<T> => {
const cached = cache.get(key);
if (cached && !isExpired(cached.expiry)) {
return cached.value as T;
}
const value = await factory();
cache.set(key, {
value,
expiry: Date.now() + ttl,
});
// Periodic cleanup
if (cache.size > 100) cleanup();
return value;
},
invalidate: (key: string) => {
cache.delete(key);
},
clear: () => {
cache.clear();
},
};
};
Rate Limiting
Protect external resources:
export const createRateLimitedService = (config: RateLimitConfig) => {
// Private rate limit state
let tokens = config.tokensPerInterval;
let lastRefill = Date.now();
// Token bucket algorithm
const refillTokens = () => {
const now = Date.now();
const timePassed = now - lastRefill;
const intervalsPasssed = timePassed / config.intervalMs;
tokens = Math.min(
config.tokensPerInterval,
tokens + intervalsPasssed * config.tokensPerInterval
);
lastRefill = now;
};
// Public API
return {
executeWithLimit: async <T>(
operation: () => Promise<T>
): Promise<T> => {
refillTokens();
if (tokens < 1) {
throw new Error("Rate limit exceeded");
}
tokens -= 1;
return operation();
},
getTokensRemaining: () => {
refillTokens();
return Math.floor(tokens);
},
};
};
Best Practices
1. Single Responsibility
Each service should have one clear purpose.
2. Interface Segregation
Services expose only necessary methods.
3. Dependency Inversion
Depend on abstractions, not concrete implementations.
4. Testability
Services should be easily testable in isolation.
5. Observability
All services should emit telemetry data.
Next Steps
- Conversation Management - Deep dive into conversation handling
- Observability - Monitor your services
- API Reference - Detailed service API documentation