JavaScript Microservices: Building Scalable Distributed Systems
Master microservices architecture with JavaScript. Learn service design, API communication, containerization, monitoring, and deployment strategies.
Microservices architecture breaks down applications into small, independent services that communicate over well-defined APIs. This guide covers designing, building, and deploying JavaScript microservices for scalable, maintainable systems.
Microservices Fundamentals
Core Principles and Architecture
// Microservice Base Class
class MicroService {
constructor(name, port, config = {}) {
this.name = name;
this.port = port;
this.config = {
healthCheckInterval: 30000,
gracefulShutdownTimeout: 10000,
maxRetries: 3,
circuitBreakerThreshold: 5,
...config,
};
this.health = {
status: 'starting',
uptime: 0,
version: process.env.npm_package_version || '1.0.0',
dependencies: {},
metrics: {
requestCount: 0,
errorCount: 0,
responseTime: [],
},
};
this.dependencies = new Map();
this.circuitBreakers = new Map();
this.middleware = [];
this.setupHealthChecks();
this.setupGracefulShutdown();
}
// Service discovery and registration
async register(serviceRegistry) {
const serviceInfo = {
name: this.name,
address: `http://localhost:${this.port}`,
port: this.port,
health: `http://localhost:${this.port}/health`,
version: this.health.version,
tags: this.config.tags || [],
metadata: this.config.metadata || {},
};
try {
await serviceRegistry.register(serviceInfo);
console.log(`Service ${this.name} registered successfully`);
this.health.status = 'healthy';
} catch (error) {
console.error(`Failed to register service ${this.name}:`, error);
this.health.status = 'unhealthy';
}
}
// Add service dependency
addDependency(name, healthUrl, timeout = 5000) {
this.dependencies.set(name, {
name,
healthUrl,
timeout,
status: 'unknown',
lastCheck: null,
});
}
// Health check implementation
setupHealthChecks() {
setInterval(async () => {
await this.checkHealth();
}, this.config.healthCheckInterval);
}
async checkHealth() {
this.health.uptime = process.uptime();
// Check dependencies
for (const [name, dependency] of this.dependencies) {
try {
const startTime = Date.now();
const response = await fetch(dependency.healthUrl, {
timeout: dependency.timeout,
});
const responseTime = Date.now() - startTime;
dependency.status = response.ok ? 'healthy' : 'unhealthy';
dependency.lastCheck = new Date().toISOString();
dependency.responseTime = responseTime;
this.health.dependencies[name] = {
status: dependency.status,
responseTime,
lastCheck: dependency.lastCheck,
};
} catch (error) {
dependency.status = 'unhealthy';
dependency.lastCheck = new Date().toISOString();
dependency.error = error.message;
this.health.dependencies[name] = {
status: 'unhealthy',
error: error.message,
lastCheck: dependency.lastCheck,
};
}
}
// Determine overall health
const dependencyStatuses = Array.from(this.dependencies.values()).map(
(dep) => dep.status
);
const hasUnhealthyDependencies = dependencyStatuses.includes('unhealthy');
const hasUnknownDependencies = dependencyStatuses.includes('unknown');
if (hasUnhealthyDependencies) {
this.health.status = 'degraded';
} else if (hasUnknownDependencies) {
this.health.status = 'checking';
} else {
this.health.status = 'healthy';
}
}
// Graceful shutdown
setupGracefulShutdown() {
const gracefulShutdown = async (signal) => {
console.log(`Received ${signal}, starting graceful shutdown...`);
this.health.status = 'shutting-down';
try {
await this.shutdown();
process.exit(0);
} catch (error) {
console.error('Error during shutdown:', error);
process.exit(1);
}
};
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
}
async shutdown() {
// Override in subclasses
console.log(`Service ${this.name} shutting down...`);
}
// Middleware system
use(middleware) {
this.middleware.push(middleware);
}
// Circuit breaker pattern
createCircuitBreaker(serviceName, options = {}) {
const breaker = new CircuitBreaker(serviceName, {
failureThreshold:
options.failureThreshold || this.config.circuitBreakerThreshold,
resetTimeout: options.resetTimeout || 30000,
...options,
});
this.circuitBreakers.set(serviceName, breaker);
return breaker;
}
// Metrics collection
recordMetric(type, value) {
this.health.metrics.requestCount++;
if (type === 'error') {
this.health.metrics.errorCount++;
}
if (type === 'responseTime') {
this.health.metrics.responseTime.push(value);
// Keep only last 100 response times
if (this.health.metrics.responseTime.length > 100) {
this.health.metrics.responseTime.shift();
}
}
}
// Get service metrics
getMetrics() {
const responseTimes = this.health.metrics.responseTime;
const avgResponseTime =
responseTimes.length > 0
? responseTimes.reduce((a, b) => a + b, 0) / responseTimes.length
: 0;
return {
...this.health.metrics,
averageResponseTime: avgResponseTime,
errorRate:
this.health.metrics.requestCount > 0
? (this.health.metrics.errorCount /
this.health.metrics.requestCount) *
100
: 0,
};
}
}
// Circuit Breaker Implementation
class CircuitBreaker {
constructor(name, options = {}) {
this.name = name;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 30000;
this.nextAttempt = 0;
this.metrics = {
requests: 0,
failures: 0,
successes: 0,
timeouts: 0,
};
}
async execute(fn, timeout = 5000) {
this.metrics.requests++;
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error(`Circuit breaker ${this.name} is OPEN`);
} else {
this.state = 'HALF_OPEN';
}
}
try {
const result = await Promise.race([
fn(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), timeout)
),
]);
this.onSuccess();
return result;
} catch (error) {
this.onFailure(error);
throw error;
}
}
onSuccess() {
this.metrics.successes++;
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.state = 'CLOSED';
}
}
onFailure(error) {
if (error.message === 'Timeout') {
this.metrics.timeouts++;
} else {
this.metrics.failures++;
}
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.resetTimeout;
}
}
getState() {
return {
name: this.name,
state: this.state,
failureCount: this.failureCount,
metrics: this.metrics,
};
}
}
// Service Discovery
class ServiceRegistry {
constructor() {
this.services = new Map();
this.watchers = [];
}
async register(serviceInfo) {
const { name, address, port, health, version, tags, metadata } =
serviceInfo;
const service = {
...serviceInfo,
id: `${name}-${port}`,
registeredAt: new Date().toISOString(),
lastHeartbeat: new Date().toISOString(),
status: 'healthy',
};
this.services.set(service.id, service);
// Notify watchers
this.notifyWatchers('register', service);
console.log(`Service registered: ${service.id}`);
return service.id;
}
async deregister(serviceId) {
const service = this.services.get(serviceId);
if (service) {
this.services.delete(serviceId);
this.notifyWatchers('deregister', service);
console.log(`Service deregistered: ${serviceId}`);
}
}
async discover(serviceName, tags = []) {
const services = Array.from(this.services.values())
.filter((service) => {
if (service.name !== serviceName) return false;
if (tags.length > 0) {
return tags.every((tag) => service.tags.includes(tag));
}
return true;
})
.filter((service) => service.status === 'healthy');
return services;
}
async getService(serviceName, loadBalancer = 'round-robin') {
const services = await this.discover(serviceName);
if (services.length === 0) {
throw new Error(`No healthy instances of service ${serviceName} found`);
}
switch (loadBalancer) {
case 'round-robin':
return this.roundRobinSelect(services);
case 'random':
return services[Math.floor(Math.random() * services.length)];
case 'least-connections':
return this.leastConnectionsSelect(services);
default:
return services[0];
}
}
roundRobinSelect(services) {
// Simple round-robin implementation
const serviceName = services[0].name;
let index = this.roundRobinCounters?.get(serviceName) || 0;
index = (index + 1) % services.length;
if (!this.roundRobinCounters) {
this.roundRobinCounters = new Map();
}
this.roundRobinCounters.set(serviceName, index);
return services[index];
}
leastConnectionsSelect(services) {
// Return service with least active connections
return services.reduce((least, current) => {
const leastConnections = least.metadata?.activeConnections || 0;
const currentConnections = current.metadata?.activeConnections || 0;
return currentConnections < leastConnections ? current : least;
});
}
watch(callback) {
this.watchers.push(callback);
return () => {
const index = this.watchers.indexOf(callback);
if (index > -1) {
this.watchers.splice(index, 1);
}
};
}
notifyWatchers(event, service) {
this.watchers.forEach((callback) => {
try {
callback(event, service);
} catch (error) {
console.error('Error notifying watcher:', error);
}
});
}
// Health checking for registered services
async startHealthChecking(interval = 30000) {
setInterval(async () => {
for (const [serviceId, service] of this.services) {
try {
const response = await fetch(service.health, { timeout: 5000 });
const previousStatus = service.status;
service.status = response.ok ? 'healthy' : 'unhealthy';
service.lastHeartbeat = new Date().toISOString();
if (previousStatus !== service.status) {
this.notifyWatchers('status-change', service);
}
} catch (error) {
const previousStatus = service.status;
service.status = 'unhealthy';
service.lastHeartbeat = new Date().toISOString();
if (previousStatus !== service.status) {
this.notifyWatchers('status-change', service);
}
}
}
}, interval);
}
getServices() {
return Array.from(this.services.values());
}
}
HTTP-based Microservice Implementation
// HTTP Microservice with Express-like functionality
class HTTPMicroService extends MicroService {
constructor(name, port, config = {}) {
super(name, port, config);
this.routes = [];
this.server = null;
this.corsOptions = config.cors || {
origin: '*',
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
headers: ['Content-Type', 'Authorization'],
};
}
// Route definitions
get(path, ...handlers) {
this.addRoute('GET', path, handlers);
}
post(path, ...handlers) {
this.addRoute('POST', path, handlers);
}
put(path, ...handlers) {
this.addRoute('PUT', path, handlers);
}
delete(path, ...handlers) {
this.addRoute('DELETE', path, handlers);
}
addRoute(method, path, handlers) {
this.routes.push({
method,
path: this.pathToRegexp(path),
handlers,
originalPath: path,
});
}
pathToRegexp(path) {
// Simple path to regex conversion
const pattern = path
.replace(/:[^/]+/g, '([^/]+)') // Named parameters
.replace(/\*/g, '(.*)'); // Wildcards
return new RegExp(`^${pattern}$`);
}
// Middleware for common functionality
cors() {
return (req, res, next) => {
const { origin, methods, headers } = this.corsOptions;
res.setHeader('Access-Control-Allow-Origin', origin);
res.setHeader('Access-Control-Allow-Methods', methods.join(', '));
res.setHeader('Access-Control-Allow-Headers', headers.join(', '));
if (req.method === 'OPTIONS') {
res.statusCode = 200;
res.end();
return;
}
next();
};
}
json() {
return (req, res, next) => {
if (req.method === 'POST' || req.method === 'PUT') {
let body = '';
req.on('data', (chunk) => {
body += chunk.toString();
});
req.on('end', () => {
try {
req.body = body ? JSON.parse(body) : {};
next();
} catch (error) {
res.statusCode = 400;
res.end(JSON.stringify({ error: 'Invalid JSON' }));
}
});
} else {
next();
}
};
}
requestLogger() {
return (req, res, next) => {
const start = Date.now();
const originalEnd = res.end;
res.end = function (...args) {
const responseTime = Date.now() - start;
console.log(
`${req.method} ${req.url} - ${res.statusCode} - ${responseTime}ms`
);
// Record metrics
this.recordMetric('responseTime', responseTime);
if (res.statusCode >= 400) {
this.recordMetric('error');
}
originalEnd.apply(res, args);
}.bind(this);
next();
};
}
errorHandler() {
return (error, req, res, next) => {
console.error('Error:', error);
this.recordMetric('error');
if (res.headersSent) {
return next(error);
}
res.statusCode = error.statusCode || 500;
res.setHeader('Content-Type', 'application/json');
res.end(
JSON.stringify({
error: {
message: error.message,
...(process.env.NODE_ENV === 'development' && {
stack: error.stack,
}),
},
})
);
};
}
// Request handling
async handleRequest(req, res) {
const url = new URL(req.url, `http://localhost:${this.port}`);
req.path = url.pathname;
req.query = Object.fromEntries(url.searchParams);
// Find matching route
const route = this.routes.find(
(r) => r.method === req.method && r.path.test(req.path)
);
if (!route) {
res.statusCode = 404;
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({ error: 'Route not found' }));
return;
}
// Extract path parameters
const matches = req.path.match(route.path);
req.params = {};
if (matches && matches.length > 1) {
const paramNames = route.originalPath.match(/:([^/]+)/g) || [];
paramNames.forEach((param, index) => {
const paramName = param.slice(1); // Remove ':'
req.params[paramName] = matches[index + 1];
});
}
// Execute middleware and handlers
let index = 0;
const handlers = [...this.middleware, ...route.handlers];
const next = (error) => {
if (error) {
const errorHandler = this.errorHandler();
return errorHandler(error, req, res, () => {});
}
if (index >= handlers.length) {
res.statusCode = 404;
res.end('Not Found');
return;
}
const handler = handlers[index++];
try {
const result = handler(req, res, next);
// Handle async handlers
if (result && typeof result.catch === 'function') {
result.catch(next);
}
} catch (err) {
next(err);
}
};
next();
}
// Start the service
async start() {
const http = await import('http');
this.server = http.createServer((req, res) => {
this.handleRequest(req, res);
});
return new Promise((resolve, reject) => {
this.server.listen(this.port, (error) => {
if (error) {
reject(error);
} else {
console.log(`${this.name} service listening on port ${this.port}`);
resolve();
}
});
});
}
// Stop the service
async shutdown() {
if (this.server) {
return new Promise((resolve) => {
this.server.close(() => {
console.log(`${this.name} service stopped`);
resolve();
});
});
}
}
}
// API Gateway Implementation
class APIGateway extends HTTPMicroService {
constructor(port, config = {}) {
super('api-gateway', port, config);
this.routes = [];
this.rateLimiter = new Map();
this.authProvider = config.authProvider;
this.setupDefaultMiddleware();
this.setupDefaultRoutes();
}
setupDefaultMiddleware() {
this.use(this.cors());
this.use(this.json());
this.use(this.requestLogger());
this.use(this.rateLimit());
this.use(this.authentication());
}
setupDefaultRoutes() {
// Health check endpoint
this.get('/health', (req, res) => {
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(this.health));
});
// Service discovery endpoint
this.get('/services', async (req, res) => {
try {
const services = this.serviceRegistry?.getServices() || [];
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(services));
} catch (error) {
res.statusCode = 500;
res.end(JSON.stringify({ error: error.message }));
}
});
}
// Rate limiting middleware
rateLimit(windowMs = 60000, maxRequests = 100) {
return (req, res, next) => {
const clientId =
req.headers['x-forwarded-for'] || req.connection.remoteAddress;
const now = Date.now();
const windowStart = now - windowMs;
if (!this.rateLimiter.has(clientId)) {
this.rateLimiter.set(clientId, []);
}
const requests = this.rateLimiter.get(clientId);
// Remove old requests outside window
const validRequests = requests.filter(
(timestamp) => timestamp > windowStart
);
if (validRequests.length >= maxRequests) {
res.statusCode = 429;
res.setHeader('Retry-After', Math.ceil(windowMs / 1000));
res.end(JSON.stringify({ error: 'Rate limit exceeded' }));
return;
}
validRequests.push(now);
this.rateLimiter.set(clientId, validRequests);
next();
};
}
// Authentication middleware
authentication() {
return async (req, res, next) => {
if (req.path === '/health' || req.path === '/services') {
return next();
}
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
res.statusCode = 401;
res.end(
JSON.stringify({ error: 'Missing or invalid authorization header' })
);
return;
}
const token = authHeader.slice(7);
try {
if (this.authProvider) {
const user = await this.authProvider.validateToken(token);
req.user = user;
}
next();
} catch (error) {
res.statusCode = 401;
res.end(JSON.stringify({ error: 'Invalid token' }));
}
};
}
// Proxy requests to services
proxy(serviceName, options = {}) {
return async (req, res, next) => {
try {
const service = await this.serviceRegistry.getService(serviceName);
const targetUrl = `${service.address}${req.path}`;
// Create proxy request
const proxyOptions = {
method: req.method,
headers: { ...req.headers },
...(req.body && { body: JSON.stringify(req.body) }),
};
// Add service discovery headers
proxyOptions.headers['X-Gateway-Request-ID'] = Date.now().toString();
proxyOptions.headers['X-Service-Name'] = serviceName;
const startTime = Date.now();
const response = await fetch(targetUrl, proxyOptions);
const responseTime = Date.now() - startTime;
// Copy response headers
for (const [key, value] of response.headers) {
res.setHeader(key, value);
}
res.statusCode = response.status;
// Stream response body
const body = await response.text();
res.end(body);
// Record metrics
this.recordMetric('responseTime', responseTime);
if (!response.ok) {
this.recordMetric('error');
}
} catch (error) {
console.error(`Proxy error for service ${serviceName}:`, error);
res.statusCode = 502;
res.end(JSON.stringify({ error: 'Service unavailable' }));
}
};
}
// Route registration with service proxy
route(path, serviceName, options = {}) {
const methods = options.methods || ['GET', 'POST', 'PUT', 'DELETE'];
methods.forEach((method) => {
this.addRoute(method.toLowerCase(), path, [
this.proxy(serviceName, options),
]);
});
}
setServiceRegistry(serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}
}
// Example User Service
class UserService extends HTTPMicroService {
constructor(port) {
super('user-service', port);
this.users = new Map();
this.setupRoutes();
this.setupMiddleware();
}
setupMiddleware() {
this.use(this.cors());
this.use(this.json());
this.use(this.requestLogger());
}
setupRoutes() {
// Health check
this.get('/health', (req, res) => {
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(this.health));
});
// Get all users
this.get('/users', (req, res) => {
const users = Array.from(this.users.values());
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(users));
});
// Get user by ID
this.get('/users/:id', (req, res) => {
const user = this.users.get(req.params.id);
if (!user) {
res.statusCode = 404;
res.end(JSON.stringify({ error: 'User not found' }));
return;
}
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(user));
});
// Create user
this.post('/users', (req, res) => {
const { name, email } = req.body;
if (!name || !email) {
res.statusCode = 400;
res.end(JSON.stringify({ error: 'Name and email are required' }));
return;
}
const user = {
id: Date.now().toString(),
name,
email,
createdAt: new Date().toISOString(),
};
this.users.set(user.id, user);
res.statusCode = 201;
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(user));
});
// Update user
this.put('/users/:id', (req, res) => {
const user = this.users.get(req.params.id);
if (!user) {
res.statusCode = 404;
res.end(JSON.stringify({ error: 'User not found' }));
return;
}
const updatedUser = {
...user,
...req.body,
updatedAt: new Date().toISOString(),
};
this.users.set(req.params.id, updatedUser);
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(updatedUser));
});
// Delete user
this.delete('/users/:id', (req, res) => {
if (!this.users.has(req.params.id)) {
res.statusCode = 404;
res.end(JSON.stringify({ error: 'User not found' }));
return;
}
this.users.delete(req.params.id);
res.statusCode = 204;
res.end();
});
}
}
// Usage Example
async function startMicroservicesSystem() {
// Create service registry
const serviceRegistry = new ServiceRegistry();
serviceRegistry.startHealthChecking();
// Create services
const userService = new UserService(3001);
const apiGateway = new APIGateway(3000);
// Set up API Gateway
apiGateway.setServiceRegistry(serviceRegistry);
apiGateway.route('/api/users/*', 'user-service');
try {
// Start services
await userService.start();
await apiGateway.start();
// Register services
await userService.register(serviceRegistry);
console.log('Microservices system started successfully');
console.log('API Gateway: http://localhost:3000');
console.log('User Service: http://localhost:3001');
} catch (error) {
console.error('Failed to start microservices system:', error);
process.exit(1);
}
}
// Start the system
if (typeof module !== 'undefined' && require.main === module) {
startMicroservicesSystem();
}
Inter-Service Communication
Event-Driven Architecture
// Event Bus for Service Communication
class EventBus {
constructor() {
this.subscribers = new Map();
this.middleware = [];
this.deadLetterQueue = [];
this.config = {
maxRetries: 3,
retryDelay: 1000,
deadLetterThreshold: 5,
};
}
// Subscribe to events
subscribe(eventType, handler, options = {}) {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
const subscription = {
id: Date.now() + Math.random(),
handler,
options: {
once: false,
priority: 0,
filter: null,
...options,
},
stats: {
processed: 0,
errors: 0,
lastProcessed: null,
},
};
this.subscribers.get(eventType).push(subscription);
// Sort by priority
this.subscribers
.get(eventType)
.sort((a, b) => b.options.priority - a.options.priority);
return {
unsubscribe: () => {
const subs = this.subscribers.get(eventType);
const index = subs.findIndex((s) => s.id === subscription.id);
if (index > -1) {
subs.splice(index, 1);
}
},
};
}
// Publish events
async publish(eventType, data, metadata = {}) {
const event = {
id: Date.now() + Math.random(),
type: eventType,
data,
metadata: {
timestamp: new Date().toISOString(),
source: metadata.source || 'unknown',
correlationId: metadata.correlationId || this.generateCorrelationId(),
...metadata,
},
attempts: 0,
};
// Apply middleware
for (const middleware of this.middleware) {
try {
await middleware(event);
} catch (error) {
console.error('Middleware error:', error);
}
}
await this.processEvent(event);
}
async processEvent(event) {
const subscribers = this.subscribers.get(event.type) || [];
if (subscribers.length === 0) {
console.warn(`No subscribers for event type: ${event.type}`);
return;
}
const promises = subscribers.map((subscription) =>
this.processSubscription(subscription, event)
);
await Promise.allSettled(promises);
}
async processSubscription(subscription, event) {
// Apply filter if present
if (subscription.options.filter && !subscription.options.filter(event)) {
return;
}
try {
await this.executeHandler(subscription, event);
subscription.stats.processed++;
subscription.stats.lastProcessed = new Date().toISOString();
// Remove once subscription
if (subscription.options.once) {
const subs = this.subscribers.get(event.type);
const index = subs.findIndex((s) => s.id === subscription.id);
if (index > -1) {
subs.splice(index, 1);
}
}
} catch (error) {
subscription.stats.errors++;
console.error(`Error processing event ${event.id}:`, error);
await this.handleEventError(event, subscription, error);
}
}
async executeHandler(subscription, event) {
const result = subscription.handler(event.data, event.metadata);
// Handle async handlers
if (result && typeof result.then === 'function') {
await result;
}
}
async handleEventError(event, subscription, error) {
event.attempts++;
if (event.attempts < this.config.maxRetries) {
// Retry with delay
setTimeout(() => {
this.processSubscription(subscription, event);
}, this.config.retryDelay * event.attempts);
} else {
// Send to dead letter queue
this.deadLetterQueue.push({
event,
subscription: subscription.id,
error: error.message,
timestamp: new Date().toISOString(),
});
// Trigger dead letter event
this.publish(
'event.deadletter',
{
originalEvent: event,
error: error.message,
},
{ source: 'event-bus' }
);
}
}
// Middleware support
use(middleware) {
this.middleware.push(middleware);
}
// Utilities
generateCorrelationId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
getStats() {
const stats = {
totalSubscribers: 0,
eventTypes: [],
deadLetterCount: this.deadLetterQueue.length,
};
for (const [eventType, subscribers] of this.subscribers) {
stats.totalSubscribers += subscribers.length;
stats.eventTypes.push({
type: eventType,
subscriberCount: subscribers.length,
totalProcessed: subscribers.reduce(
(sum, sub) => sum + sub.stats.processed,
0
),
totalErrors: subscribers.reduce(
(sum, sub) => sum + sub.stats.errors,
0
),
});
}
return stats;
}
}
// Message Queue Implementation
class MessageQueue {
constructor(name, options = {}) {
this.name = name;
this.messages = [];
this.consumers = [];
this.config = {
maxSize: options.maxSize || 1000,
persistence: options.persistence || false,
deadLetterQueue: options.deadLetterQueue || null,
...options,
};
this.stats = {
enqueued: 0,
dequeued: 0,
failed: 0,
currentSize: 0,
};
}
// Add message to queue
async enqueue(message, priority = 0, delay = 0) {
if (this.messages.length >= this.config.maxSize) {
throw new Error(`Queue ${this.name} is full`);
}
const queueMessage = {
id: Date.now() + Math.random(),
data: message,
priority,
delay,
enqueueTime: Date.now(),
attempts: 0,
maxAttempts: this.config.maxAttempts || 3,
};
if (delay > 0) {
queueMessage.availableAt = Date.now() + delay;
}
// Insert based on priority
const insertIndex = this.messages.findIndex((m) => m.priority < priority);
if (insertIndex === -1) {
this.messages.push(queueMessage);
} else {
this.messages.splice(insertIndex, 0, queueMessage);
}
this.stats.enqueued++;
this.stats.currentSize = this.messages.length;
// Notify consumers
this.notifyConsumers();
return queueMessage.id;
}
// Remove message from queue
async dequeue() {
const now = Date.now();
// Find first available message
const messageIndex = this.messages.findIndex(
(m) => !m.availableAt || m.availableAt <= now
);
if (messageIndex === -1) {
return null;
}
const message = this.messages.splice(messageIndex, 1)[0];
this.stats.dequeued++;
this.stats.currentSize = this.messages.length;
return message;
}
// Register consumer
consume(handler, options = {}) {
const consumer = {
id: Date.now() + Math.random(),
handler,
options: {
concurrency: 1,
autoAck: true,
...options,
},
stats: {
processed: 0,
errors: 0,
},
};
this.consumers.push(consumer);
this.startConsumer(consumer);
return {
stop: () => {
const index = this.consumers.findIndex((c) => c.id === consumer.id);
if (index > -1) {
this.consumers.splice(index, 1);
}
},
};
}
async startConsumer(consumer) {
const processMessage = async () => {
try {
const message = await this.dequeue();
if (!message) {
// No messages available, wait and try again
setTimeout(processMessage, 100);
return;
}
try {
await consumer.handler(message.data, {
messageId: message.id,
attempts: message.attempts,
enqueueTime: message.enqueueTime,
});
consumer.stats.processed++;
if (!consumer.options.autoAck) {
// Message needs manual acknowledgment
message.acked = true;
}
} catch (error) {
message.attempts++;
consumer.stats.errors++;
if (message.attempts >= message.maxAttempts) {
// Send to dead letter queue
if (this.config.deadLetterQueue) {
await this.config.deadLetterQueue.enqueue({
originalMessage: message,
error: error.message,
failedAt: new Date().toISOString(),
});
}
this.stats.failed++;
} else {
// Retry with exponential backoff
const retryDelay = Math.pow(2, message.attempts) * 1000;
await this.enqueue(message.data, message.priority, retryDelay);
}
}
// Continue processing
setImmediate(processMessage);
} catch (error) {
console.error(`Consumer error in queue ${this.name}:`, error);
setTimeout(processMessage, 1000);
}
};
// Start concurrent workers
for (let i = 0; i < consumer.options.concurrency; i++) {
processMessage();
}
}
notifyConsumers() {
// Implementation depends on specific requirements
// Could use events, polling, or push notifications
}
getStats() {
return {
...this.stats,
consumers: this.consumers.length,
averageWaitTime: this.calculateAverageWaitTime(),
};
}
calculateAverageWaitTime() {
if (this.messages.length === 0) return 0;
const now = Date.now();
const waitTimes = this.messages.map((m) => now - m.enqueueTime);
return waitTimes.reduce((sum, time) => sum + time, 0) / waitTimes.length;
}
}
// Saga Pattern for Distributed Transactions
class SagaOrchestrator {
constructor(eventBus) {
this.eventBus = eventBus;
this.sagas = new Map();
this.setupEventHandlers();
}
setupEventHandlers() {
this.eventBus.subscribe('saga.start', (data) => {
this.startSaga(data);
});
this.eventBus.subscribe('saga.step.completed', (data) => {
this.handleStepCompleted(data);
});
this.eventBus.subscribe('saga.step.failed', (data) => {
this.handleStepFailed(data);
});
}
defineSaga(name, steps) {
const saga = {
name,
steps,
compensations: [],
};
// Generate compensation steps
steps.forEach((step, index) => {
if (step.compensation) {
saga.compensations.unshift({
...step.compensation,
originalStepIndex: index,
});
}
});
this.sagaDefinitions.set(name, saga);
}
async startSaga(sagaData) {
const { sagaName, sagaId, data } = sagaData;
const sagaDefinition = this.sagaDefinitions.get(sagaName);
if (!sagaDefinition) {
throw new Error(`Saga ${sagaName} not found`);
}
const sagaInstance = {
id: sagaId,
name: sagaName,
data,
currentStep: 0,
completedSteps: [],
status: 'running',
startTime: Date.now(),
error: null,
};
this.sagas.set(sagaId, sagaInstance);
await this.executeNextStep(sagaInstance);
}
async executeNextStep(sagaInstance) {
const sagaDefinition = this.sagaDefinitions.get(sagaInstance.name);
const step = sagaDefinition.steps[sagaInstance.currentStep];
if (!step) {
// Saga completed successfully
sagaInstance.status = 'completed';
await this.eventBus.publish('saga.completed', {
sagaId: sagaInstance.id,
sagaName: sagaInstance.name,
data: sagaInstance.data,
});
return;
}
try {
await this.eventBus.publish(step.command, {
sagaId: sagaInstance.id,
stepIndex: sagaInstance.currentStep,
...sagaInstance.data,
...step.data,
});
} catch (error) {
await this.handleStepFailed({
sagaId: sagaInstance.id,
stepIndex: sagaInstance.currentStep,
error: error.message,
});
}
}
async handleStepCompleted(data) {
const { sagaId, stepIndex, result } = data;
const sagaInstance = this.sagas.get(sagaId);
if (!sagaInstance) return;
sagaInstance.completedSteps.push({
stepIndex,
result,
completedAt: Date.now(),
});
sagaInstance.currentStep++;
// Merge result data
if (result) {
sagaInstance.data = { ...sagaInstance.data, ...result };
}
await this.executeNextStep(sagaInstance);
}
async handleStepFailed(data) {
const { sagaId, stepIndex, error } = data;
const sagaInstance = this.sagas.get(sagaId);
if (!sagaInstance) return;
sagaInstance.status = 'compensating';
sagaInstance.error = error;
await this.startCompensation(sagaInstance);
}
async startCompensation(sagaInstance) {
const sagaDefinition = this.sagaDefinitions.get(sagaInstance.name);
// Execute compensations in reverse order
for (const completedStep of sagaInstance.completedSteps.reverse()) {
const compensation = sagaDefinition.compensations.find(
(c) => c.originalStepIndex === completedStep.stepIndex
);
if (compensation) {
try {
await this.eventBus.publish(compensation.command, {
sagaId: sagaInstance.id,
originalStepIndex: completedStep.stepIndex,
originalResult: completedStep.result,
...sagaInstance.data,
});
} catch (error) {
console.error(
`Compensation failed for saga ${sagaInstance.id}:`,
error
);
}
}
}
sagaInstance.status = 'failed';
await this.eventBus.publish('saga.failed', {
sagaId: sagaInstance.id,
sagaName: sagaInstance.name,
error: sagaInstance.error,
data: sagaInstance.data,
});
}
getSagaStatus(sagaId) {
return this.sagas.get(sagaId);
}
}
// Usage Example
const eventBus = new EventBus();
const sagaOrchestrator = new SagaOrchestrator(eventBus);
// Define a saga for order processing
sagaOrchestrator.defineSaga('process-order', [
{
command: 'inventory.reserve',
data: { timeout: 30000 },
compensation: {
command: 'inventory.release',
},
},
{
command: 'payment.charge',
compensation: {
command: 'payment.refund',
},
},
{
command: 'shipping.create',
compensation: {
command: 'shipping.cancel',
},
},
]);
// Start a saga
eventBus.publish('saga.start', {
sagaName: 'process-order',
sagaId: 'order-123',
data: {
orderId: '123',
customerId: '456',
items: [{ productId: 'p1', quantity: 2 }],
amount: 99.99,
},
});
Conclusion
Microservices architecture offers significant benefits for scalable, maintainable systems but comes with increased complexity in areas like service communication, data consistency, and operational overhead. Start with a monolithic architecture and extract services as your system grows and requirements become clearer. Focus on proper service boundaries, robust communication patterns, comprehensive monitoring, and automated deployment pipelines. Remember that microservices are a tool for solving specific problems - use them when the benefits outweigh the complexity they introduce.