JavaScript Architecture

JavaScript Microservices: Building Scalable Distributed Systems

Master microservices architecture with JavaScript. Learn service design, API communication, containerization, monitoring, and deployment strategies.

By JavaScript Document Team
microservicesdistributed-systemsapi-designcontainerizationscalability

Microservices architecture breaks down applications into small, independent services that communicate over well-defined APIs. This guide covers designing, building, and deploying JavaScript microservices for scalable, maintainable systems.

Microservices Fundamentals

Core Principles and Architecture

// Microservice Base Class
class MicroService {
  constructor(name, port, config = {}) {
    this.name = name;
    this.port = port;
    this.config = {
      healthCheckInterval: 30000,
      gracefulShutdownTimeout: 10000,
      maxRetries: 3,
      circuitBreakerThreshold: 5,
      ...config,
    };

    this.health = {
      status: 'starting',
      uptime: 0,
      version: process.env.npm_package_version || '1.0.0',
      dependencies: {},
      metrics: {
        requestCount: 0,
        errorCount: 0,
        responseTime: [],
      },
    };

    this.dependencies = new Map();
    this.circuitBreakers = new Map();
    this.middleware = [];

    this.setupHealthChecks();
    this.setupGracefulShutdown();
  }

  // Service discovery and registration
  async register(serviceRegistry) {
    const serviceInfo = {
      name: this.name,
      address: `http://localhost:${this.port}`,
      port: this.port,
      health: `http://localhost:${this.port}/health`,
      version: this.health.version,
      tags: this.config.tags || [],
      metadata: this.config.metadata || {},
    };

    try {
      await serviceRegistry.register(serviceInfo);
      console.log(`Service ${this.name} registered successfully`);
      this.health.status = 'healthy';
    } catch (error) {
      console.error(`Failed to register service ${this.name}:`, error);
      this.health.status = 'unhealthy';
    }
  }

  // Add service dependency
  addDependency(name, healthUrl, timeout = 5000) {
    this.dependencies.set(name, {
      name,
      healthUrl,
      timeout,
      status: 'unknown',
      lastCheck: null,
    });
  }

  // Health check implementation
  setupHealthChecks() {
    setInterval(async () => {
      await this.checkHealth();
    }, this.config.healthCheckInterval);
  }

  async checkHealth() {
    this.health.uptime = process.uptime();

    // Check dependencies
    for (const [name, dependency] of this.dependencies) {
      try {
        const startTime = Date.now();
        const response = await fetch(dependency.healthUrl, {
          timeout: dependency.timeout,
        });

        const responseTime = Date.now() - startTime;
        dependency.status = response.ok ? 'healthy' : 'unhealthy';
        dependency.lastCheck = new Date().toISOString();
        dependency.responseTime = responseTime;

        this.health.dependencies[name] = {
          status: dependency.status,
          responseTime,
          lastCheck: dependency.lastCheck,
        };
      } catch (error) {
        dependency.status = 'unhealthy';
        dependency.lastCheck = new Date().toISOString();
        dependency.error = error.message;

        this.health.dependencies[name] = {
          status: 'unhealthy',
          error: error.message,
          lastCheck: dependency.lastCheck,
        };
      }
    }

    // Determine overall health
    const dependencyStatuses = Array.from(this.dependencies.values()).map(
      (dep) => dep.status
    );

    const hasUnhealthyDependencies = dependencyStatuses.includes('unhealthy');
    const hasUnknownDependencies = dependencyStatuses.includes('unknown');

    if (hasUnhealthyDependencies) {
      this.health.status = 'degraded';
    } else if (hasUnknownDependencies) {
      this.health.status = 'checking';
    } else {
      this.health.status = 'healthy';
    }
  }

  // Graceful shutdown
  setupGracefulShutdown() {
    const gracefulShutdown = async (signal) => {
      console.log(`Received ${signal}, starting graceful shutdown...`);
      this.health.status = 'shutting-down';

      try {
        await this.shutdown();
        process.exit(0);
      } catch (error) {
        console.error('Error during shutdown:', error);
        process.exit(1);
      }
    };

    process.on('SIGTERM', gracefulShutdown);
    process.on('SIGINT', gracefulShutdown);
  }

  async shutdown() {
    // Override in subclasses
    console.log(`Service ${this.name} shutting down...`);
  }

  // Middleware system
  use(middleware) {
    this.middleware.push(middleware);
  }

  // Circuit breaker pattern
  createCircuitBreaker(serviceName, options = {}) {
    const breaker = new CircuitBreaker(serviceName, {
      failureThreshold:
        options.failureThreshold || this.config.circuitBreakerThreshold,
      resetTimeout: options.resetTimeout || 30000,
      ...options,
    });

    this.circuitBreakers.set(serviceName, breaker);
    return breaker;
  }

  // Metrics collection
  recordMetric(type, value) {
    this.health.metrics.requestCount++;

    if (type === 'error') {
      this.health.metrics.errorCount++;
    }

    if (type === 'responseTime') {
      this.health.metrics.responseTime.push(value);
      // Keep only last 100 response times
      if (this.health.metrics.responseTime.length > 100) {
        this.health.metrics.responseTime.shift();
      }
    }
  }

  // Get service metrics
  getMetrics() {
    const responseTimes = this.health.metrics.responseTime;
    const avgResponseTime =
      responseTimes.length > 0
        ? responseTimes.reduce((a, b) => a + b, 0) / responseTimes.length
        : 0;

    return {
      ...this.health.metrics,
      averageResponseTime: avgResponseTime,
      errorRate:
        this.health.metrics.requestCount > 0
          ? (this.health.metrics.errorCount /
              this.health.metrics.requestCount) *
            100
          : 0,
    };
  }
}

// Circuit Breaker Implementation
class CircuitBreaker {
  constructor(name, options = {}) {
    this.name = name;
    this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    this.failureCount = 0;
    this.failureThreshold = options.failureThreshold || 5;
    this.resetTimeout = options.resetTimeout || 30000;
    this.nextAttempt = 0;
    this.metrics = {
      requests: 0,
      failures: 0,
      successes: 0,
      timeouts: 0,
    };
  }

  async execute(fn, timeout = 5000) {
    this.metrics.requests++;

    if (this.state === 'OPEN') {
      if (Date.now() < this.nextAttempt) {
        throw new Error(`Circuit breaker ${this.name} is OPEN`);
      } else {
        this.state = 'HALF_OPEN';
      }
    }

    try {
      const result = await Promise.race([
        fn(),
        new Promise((_, reject) =>
          setTimeout(() => reject(new Error('Timeout')), timeout)
        ),
      ]);

      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure(error);
      throw error;
    }
  }

  onSuccess() {
    this.metrics.successes++;
    this.failureCount = 0;

    if (this.state === 'HALF_OPEN') {
      this.state = 'CLOSED';
    }
  }

  onFailure(error) {
    if (error.message === 'Timeout') {
      this.metrics.timeouts++;
    } else {
      this.metrics.failures++;
    }

    this.failureCount++;

    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      this.nextAttempt = Date.now() + this.resetTimeout;
    }
  }

  getState() {
    return {
      name: this.name,
      state: this.state,
      failureCount: this.failureCount,
      metrics: this.metrics,
    };
  }
}

// Service Discovery
class ServiceRegistry {
  constructor() {
    this.services = new Map();
    this.watchers = [];
  }

  async register(serviceInfo) {
    const { name, address, port, health, version, tags, metadata } =
      serviceInfo;

    const service = {
      ...serviceInfo,
      id: `${name}-${port}`,
      registeredAt: new Date().toISOString(),
      lastHeartbeat: new Date().toISOString(),
      status: 'healthy',
    };

    this.services.set(service.id, service);

    // Notify watchers
    this.notifyWatchers('register', service);

    console.log(`Service registered: ${service.id}`);
    return service.id;
  }

  async deregister(serviceId) {
    const service = this.services.get(serviceId);
    if (service) {
      this.services.delete(serviceId);
      this.notifyWatchers('deregister', service);
      console.log(`Service deregistered: ${serviceId}`);
    }
  }

  async discover(serviceName, tags = []) {
    const services = Array.from(this.services.values())
      .filter((service) => {
        if (service.name !== serviceName) return false;
        if (tags.length > 0) {
          return tags.every((tag) => service.tags.includes(tag));
        }
        return true;
      })
      .filter((service) => service.status === 'healthy');

    return services;
  }

  async getService(serviceName, loadBalancer = 'round-robin') {
    const services = await this.discover(serviceName);

    if (services.length === 0) {
      throw new Error(`No healthy instances of service ${serviceName} found`);
    }

    switch (loadBalancer) {
      case 'round-robin':
        return this.roundRobinSelect(services);
      case 'random':
        return services[Math.floor(Math.random() * services.length)];
      case 'least-connections':
        return this.leastConnectionsSelect(services);
      default:
        return services[0];
    }
  }

  roundRobinSelect(services) {
    // Simple round-robin implementation
    const serviceName = services[0].name;
    let index = this.roundRobinCounters?.get(serviceName) || 0;
    index = (index + 1) % services.length;

    if (!this.roundRobinCounters) {
      this.roundRobinCounters = new Map();
    }
    this.roundRobinCounters.set(serviceName, index);

    return services[index];
  }

  leastConnectionsSelect(services) {
    // Return service with least active connections
    return services.reduce((least, current) => {
      const leastConnections = least.metadata?.activeConnections || 0;
      const currentConnections = current.metadata?.activeConnections || 0;
      return currentConnections < leastConnections ? current : least;
    });
  }

  watch(callback) {
    this.watchers.push(callback);
    return () => {
      const index = this.watchers.indexOf(callback);
      if (index > -1) {
        this.watchers.splice(index, 1);
      }
    };
  }

  notifyWatchers(event, service) {
    this.watchers.forEach((callback) => {
      try {
        callback(event, service);
      } catch (error) {
        console.error('Error notifying watcher:', error);
      }
    });
  }

  // Health checking for registered services
  async startHealthChecking(interval = 30000) {
    setInterval(async () => {
      for (const [serviceId, service] of this.services) {
        try {
          const response = await fetch(service.health, { timeout: 5000 });
          const previousStatus = service.status;
          service.status = response.ok ? 'healthy' : 'unhealthy';
          service.lastHeartbeat = new Date().toISOString();

          if (previousStatus !== service.status) {
            this.notifyWatchers('status-change', service);
          }
        } catch (error) {
          const previousStatus = service.status;
          service.status = 'unhealthy';
          service.lastHeartbeat = new Date().toISOString();

          if (previousStatus !== service.status) {
            this.notifyWatchers('status-change', service);
          }
        }
      }
    }, interval);
  }

  getServices() {
    return Array.from(this.services.values());
  }
}

HTTP-based Microservice Implementation

// HTTP Microservice with Express-like functionality
class HTTPMicroService extends MicroService {
  constructor(name, port, config = {}) {
    super(name, port, config);
    this.routes = [];
    this.server = null;
    this.corsOptions = config.cors || {
      origin: '*',
      methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
      headers: ['Content-Type', 'Authorization'],
    };
  }

  // Route definitions
  get(path, ...handlers) {
    this.addRoute('GET', path, handlers);
  }

  post(path, ...handlers) {
    this.addRoute('POST', path, handlers);
  }

  put(path, ...handlers) {
    this.addRoute('PUT', path, handlers);
  }

  delete(path, ...handlers) {
    this.addRoute('DELETE', path, handlers);
  }

  addRoute(method, path, handlers) {
    this.routes.push({
      method,
      path: this.pathToRegexp(path),
      handlers,
      originalPath: path,
    });
  }

  pathToRegexp(path) {
    // Simple path to regex conversion
    const pattern = path
      .replace(/:[^/]+/g, '([^/]+)') // Named parameters
      .replace(/\*/g, '(.*)'); // Wildcards

    return new RegExp(`^${pattern}$`);
  }

  // Middleware for common functionality
  cors() {
    return (req, res, next) => {
      const { origin, methods, headers } = this.corsOptions;

      res.setHeader('Access-Control-Allow-Origin', origin);
      res.setHeader('Access-Control-Allow-Methods', methods.join(', '));
      res.setHeader('Access-Control-Allow-Headers', headers.join(', '));

      if (req.method === 'OPTIONS') {
        res.statusCode = 200;
        res.end();
        return;
      }

      next();
    };
  }

  json() {
    return (req, res, next) => {
      if (req.method === 'POST' || req.method === 'PUT') {
        let body = '';

        req.on('data', (chunk) => {
          body += chunk.toString();
        });

        req.on('end', () => {
          try {
            req.body = body ? JSON.parse(body) : {};
            next();
          } catch (error) {
            res.statusCode = 400;
            res.end(JSON.stringify({ error: 'Invalid JSON' }));
          }
        });
      } else {
        next();
      }
    };
  }

  requestLogger() {
    return (req, res, next) => {
      const start = Date.now();
      const originalEnd = res.end;

      res.end = function (...args) {
        const responseTime = Date.now() - start;
        console.log(
          `${req.method} ${req.url} - ${res.statusCode} - ${responseTime}ms`
        );

        // Record metrics
        this.recordMetric('responseTime', responseTime);
        if (res.statusCode >= 400) {
          this.recordMetric('error');
        }

        originalEnd.apply(res, args);
      }.bind(this);

      next();
    };
  }

  errorHandler() {
    return (error, req, res, next) => {
      console.error('Error:', error);
      this.recordMetric('error');

      if (res.headersSent) {
        return next(error);
      }

      res.statusCode = error.statusCode || 500;
      res.setHeader('Content-Type', 'application/json');
      res.end(
        JSON.stringify({
          error: {
            message: error.message,
            ...(process.env.NODE_ENV === 'development' && {
              stack: error.stack,
            }),
          },
        })
      );
    };
  }

  // Request handling
  async handleRequest(req, res) {
    const url = new URL(req.url, `http://localhost:${this.port}`);
    req.path = url.pathname;
    req.query = Object.fromEntries(url.searchParams);

    // Find matching route
    const route = this.routes.find(
      (r) => r.method === req.method && r.path.test(req.path)
    );

    if (!route) {
      res.statusCode = 404;
      res.setHeader('Content-Type', 'application/json');
      res.end(JSON.stringify({ error: 'Route not found' }));
      return;
    }

    // Extract path parameters
    const matches = req.path.match(route.path);
    req.params = {};

    if (matches && matches.length > 1) {
      const paramNames = route.originalPath.match(/:([^/]+)/g) || [];
      paramNames.forEach((param, index) => {
        const paramName = param.slice(1); // Remove ':'
        req.params[paramName] = matches[index + 1];
      });
    }

    // Execute middleware and handlers
    let index = 0;
    const handlers = [...this.middleware, ...route.handlers];

    const next = (error) => {
      if (error) {
        const errorHandler = this.errorHandler();
        return errorHandler(error, req, res, () => {});
      }

      if (index >= handlers.length) {
        res.statusCode = 404;
        res.end('Not Found');
        return;
      }

      const handler = handlers[index++];

      try {
        const result = handler(req, res, next);

        // Handle async handlers
        if (result && typeof result.catch === 'function') {
          result.catch(next);
        }
      } catch (err) {
        next(err);
      }
    };

    next();
  }

  // Start the service
  async start() {
    const http = await import('http');

    this.server = http.createServer((req, res) => {
      this.handleRequest(req, res);
    });

    return new Promise((resolve, reject) => {
      this.server.listen(this.port, (error) => {
        if (error) {
          reject(error);
        } else {
          console.log(`${this.name} service listening on port ${this.port}`);
          resolve();
        }
      });
    });
  }

  // Stop the service
  async shutdown() {
    if (this.server) {
      return new Promise((resolve) => {
        this.server.close(() => {
          console.log(`${this.name} service stopped`);
          resolve();
        });
      });
    }
  }
}

// API Gateway Implementation
class APIGateway extends HTTPMicroService {
  constructor(port, config = {}) {
    super('api-gateway', port, config);
    this.routes = [];
    this.rateLimiter = new Map();
    this.authProvider = config.authProvider;

    this.setupDefaultMiddleware();
    this.setupDefaultRoutes();
  }

  setupDefaultMiddleware() {
    this.use(this.cors());
    this.use(this.json());
    this.use(this.requestLogger());
    this.use(this.rateLimit());
    this.use(this.authentication());
  }

  setupDefaultRoutes() {
    // Health check endpoint
    this.get('/health', (req, res) => {
      res.setHeader('Content-Type', 'application/json');
      res.end(JSON.stringify(this.health));
    });

    // Service discovery endpoint
    this.get('/services', async (req, res) => {
      try {
        const services = this.serviceRegistry?.getServices() || [];
        res.setHeader('Content-Type', 'application/json');
        res.end(JSON.stringify(services));
      } catch (error) {
        res.statusCode = 500;
        res.end(JSON.stringify({ error: error.message }));
      }
    });
  }

  // Rate limiting middleware
  rateLimit(windowMs = 60000, maxRequests = 100) {
    return (req, res, next) => {
      const clientId =
        req.headers['x-forwarded-for'] || req.connection.remoteAddress;
      const now = Date.now();
      const windowStart = now - windowMs;

      if (!this.rateLimiter.has(clientId)) {
        this.rateLimiter.set(clientId, []);
      }

      const requests = this.rateLimiter.get(clientId);

      // Remove old requests outside window
      const validRequests = requests.filter(
        (timestamp) => timestamp > windowStart
      );

      if (validRequests.length >= maxRequests) {
        res.statusCode = 429;
        res.setHeader('Retry-After', Math.ceil(windowMs / 1000));
        res.end(JSON.stringify({ error: 'Rate limit exceeded' }));
        return;
      }

      validRequests.push(now);
      this.rateLimiter.set(clientId, validRequests);

      next();
    };
  }

  // Authentication middleware
  authentication() {
    return async (req, res, next) => {
      if (req.path === '/health' || req.path === '/services') {
        return next();
      }

      const authHeader = req.headers.authorization;

      if (!authHeader || !authHeader.startsWith('Bearer ')) {
        res.statusCode = 401;
        res.end(
          JSON.stringify({ error: 'Missing or invalid authorization header' })
        );
        return;
      }

      const token = authHeader.slice(7);

      try {
        if (this.authProvider) {
          const user = await this.authProvider.validateToken(token);
          req.user = user;
        }
        next();
      } catch (error) {
        res.statusCode = 401;
        res.end(JSON.stringify({ error: 'Invalid token' }));
      }
    };
  }

  // Proxy requests to services
  proxy(serviceName, options = {}) {
    return async (req, res, next) => {
      try {
        const service = await this.serviceRegistry.getService(serviceName);
        const targetUrl = `${service.address}${req.path}`;

        // Create proxy request
        const proxyOptions = {
          method: req.method,
          headers: { ...req.headers },
          ...(req.body && { body: JSON.stringify(req.body) }),
        };

        // Add service discovery headers
        proxyOptions.headers['X-Gateway-Request-ID'] = Date.now().toString();
        proxyOptions.headers['X-Service-Name'] = serviceName;

        const startTime = Date.now();
        const response = await fetch(targetUrl, proxyOptions);
        const responseTime = Date.now() - startTime;

        // Copy response headers
        for (const [key, value] of response.headers) {
          res.setHeader(key, value);
        }

        res.statusCode = response.status;

        // Stream response body
        const body = await response.text();
        res.end(body);

        // Record metrics
        this.recordMetric('responseTime', responseTime);
        if (!response.ok) {
          this.recordMetric('error');
        }
      } catch (error) {
        console.error(`Proxy error for service ${serviceName}:`, error);
        res.statusCode = 502;
        res.end(JSON.stringify({ error: 'Service unavailable' }));
      }
    };
  }

  // Route registration with service proxy
  route(path, serviceName, options = {}) {
    const methods = options.methods || ['GET', 'POST', 'PUT', 'DELETE'];

    methods.forEach((method) => {
      this.addRoute(method.toLowerCase(), path, [
        this.proxy(serviceName, options),
      ]);
    });
  }

  setServiceRegistry(serviceRegistry) {
    this.serviceRegistry = serviceRegistry;
  }
}

// Example User Service
class UserService extends HTTPMicroService {
  constructor(port) {
    super('user-service', port);
    this.users = new Map();
    this.setupRoutes();
    this.setupMiddleware();
  }

  setupMiddleware() {
    this.use(this.cors());
    this.use(this.json());
    this.use(this.requestLogger());
  }

  setupRoutes() {
    // Health check
    this.get('/health', (req, res) => {
      res.setHeader('Content-Type', 'application/json');
      res.end(JSON.stringify(this.health));
    });

    // Get all users
    this.get('/users', (req, res) => {
      const users = Array.from(this.users.values());
      res.setHeader('Content-Type', 'application/json');
      res.end(JSON.stringify(users));
    });

    // Get user by ID
    this.get('/users/:id', (req, res) => {
      const user = this.users.get(req.params.id);

      if (!user) {
        res.statusCode = 404;
        res.end(JSON.stringify({ error: 'User not found' }));
        return;
      }

      res.setHeader('Content-Type', 'application/json');
      res.end(JSON.stringify(user));
    });

    // Create user
    this.post('/users', (req, res) => {
      const { name, email } = req.body;

      if (!name || !email) {
        res.statusCode = 400;
        res.end(JSON.stringify({ error: 'Name and email are required' }));
        return;
      }

      const user = {
        id: Date.now().toString(),
        name,
        email,
        createdAt: new Date().toISOString(),
      };

      this.users.set(user.id, user);

      res.statusCode = 201;
      res.setHeader('Content-Type', 'application/json');
      res.end(JSON.stringify(user));
    });

    // Update user
    this.put('/users/:id', (req, res) => {
      const user = this.users.get(req.params.id);

      if (!user) {
        res.statusCode = 404;
        res.end(JSON.stringify({ error: 'User not found' }));
        return;
      }

      const updatedUser = {
        ...user,
        ...req.body,
        updatedAt: new Date().toISOString(),
      };
      this.users.set(req.params.id, updatedUser);

      res.setHeader('Content-Type', 'application/json');
      res.end(JSON.stringify(updatedUser));
    });

    // Delete user
    this.delete('/users/:id', (req, res) => {
      if (!this.users.has(req.params.id)) {
        res.statusCode = 404;
        res.end(JSON.stringify({ error: 'User not found' }));
        return;
      }

      this.users.delete(req.params.id);
      res.statusCode = 204;
      res.end();
    });
  }
}

// Usage Example
async function startMicroservicesSystem() {
  // Create service registry
  const serviceRegistry = new ServiceRegistry();
  serviceRegistry.startHealthChecking();

  // Create services
  const userService = new UserService(3001);
  const apiGateway = new APIGateway(3000);

  // Set up API Gateway
  apiGateway.setServiceRegistry(serviceRegistry);
  apiGateway.route('/api/users/*', 'user-service');

  try {
    // Start services
    await userService.start();
    await apiGateway.start();

    // Register services
    await userService.register(serviceRegistry);

    console.log('Microservices system started successfully');
    console.log('API Gateway: http://localhost:3000');
    console.log('User Service: http://localhost:3001');
  } catch (error) {
    console.error('Failed to start microservices system:', error);
    process.exit(1);
  }
}

// Start the system
if (typeof module !== 'undefined' && require.main === module) {
  startMicroservicesSystem();
}

Inter-Service Communication

Event-Driven Architecture

// Event Bus for Service Communication
class EventBus {
  constructor() {
    this.subscribers = new Map();
    this.middleware = [];
    this.deadLetterQueue = [];
    this.config = {
      maxRetries: 3,
      retryDelay: 1000,
      deadLetterThreshold: 5,
    };
  }

  // Subscribe to events
  subscribe(eventType, handler, options = {}) {
    if (!this.subscribers.has(eventType)) {
      this.subscribers.set(eventType, []);
    }

    const subscription = {
      id: Date.now() + Math.random(),
      handler,
      options: {
        once: false,
        priority: 0,
        filter: null,
        ...options,
      },
      stats: {
        processed: 0,
        errors: 0,
        lastProcessed: null,
      },
    };

    this.subscribers.get(eventType).push(subscription);

    // Sort by priority
    this.subscribers
      .get(eventType)
      .sort((a, b) => b.options.priority - a.options.priority);

    return {
      unsubscribe: () => {
        const subs = this.subscribers.get(eventType);
        const index = subs.findIndex((s) => s.id === subscription.id);
        if (index > -1) {
          subs.splice(index, 1);
        }
      },
    };
  }

  // Publish events
  async publish(eventType, data, metadata = {}) {
    const event = {
      id: Date.now() + Math.random(),
      type: eventType,
      data,
      metadata: {
        timestamp: new Date().toISOString(),
        source: metadata.source || 'unknown',
        correlationId: metadata.correlationId || this.generateCorrelationId(),
        ...metadata,
      },
      attempts: 0,
    };

    // Apply middleware
    for (const middleware of this.middleware) {
      try {
        await middleware(event);
      } catch (error) {
        console.error('Middleware error:', error);
      }
    }

    await this.processEvent(event);
  }

  async processEvent(event) {
    const subscribers = this.subscribers.get(event.type) || [];

    if (subscribers.length === 0) {
      console.warn(`No subscribers for event type: ${event.type}`);
      return;
    }

    const promises = subscribers.map((subscription) =>
      this.processSubscription(subscription, event)
    );

    await Promise.allSettled(promises);
  }

  async processSubscription(subscription, event) {
    // Apply filter if present
    if (subscription.options.filter && !subscription.options.filter(event)) {
      return;
    }

    try {
      await this.executeHandler(subscription, event);

      subscription.stats.processed++;
      subscription.stats.lastProcessed = new Date().toISOString();

      // Remove once subscription
      if (subscription.options.once) {
        const subs = this.subscribers.get(event.type);
        const index = subs.findIndex((s) => s.id === subscription.id);
        if (index > -1) {
          subs.splice(index, 1);
        }
      }
    } catch (error) {
      subscription.stats.errors++;
      console.error(`Error processing event ${event.id}:`, error);

      await this.handleEventError(event, subscription, error);
    }
  }

  async executeHandler(subscription, event) {
    const result = subscription.handler(event.data, event.metadata);

    // Handle async handlers
    if (result && typeof result.then === 'function') {
      await result;
    }
  }

  async handleEventError(event, subscription, error) {
    event.attempts++;

    if (event.attempts < this.config.maxRetries) {
      // Retry with delay
      setTimeout(() => {
        this.processSubscription(subscription, event);
      }, this.config.retryDelay * event.attempts);
    } else {
      // Send to dead letter queue
      this.deadLetterQueue.push({
        event,
        subscription: subscription.id,
        error: error.message,
        timestamp: new Date().toISOString(),
      });

      // Trigger dead letter event
      this.publish(
        'event.deadletter',
        {
          originalEvent: event,
          error: error.message,
        },
        { source: 'event-bus' }
      );
    }
  }

  // Middleware support
  use(middleware) {
    this.middleware.push(middleware);
  }

  // Utilities
  generateCorrelationId() {
    return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  getStats() {
    const stats = {
      totalSubscribers: 0,
      eventTypes: [],
      deadLetterCount: this.deadLetterQueue.length,
    };

    for (const [eventType, subscribers] of this.subscribers) {
      stats.totalSubscribers += subscribers.length;
      stats.eventTypes.push({
        type: eventType,
        subscriberCount: subscribers.length,
        totalProcessed: subscribers.reduce(
          (sum, sub) => sum + sub.stats.processed,
          0
        ),
        totalErrors: subscribers.reduce(
          (sum, sub) => sum + sub.stats.errors,
          0
        ),
      });
    }

    return stats;
  }
}

// Message Queue Implementation
class MessageQueue {
  constructor(name, options = {}) {
    this.name = name;
    this.messages = [];
    this.consumers = [];
    this.config = {
      maxSize: options.maxSize || 1000,
      persistence: options.persistence || false,
      deadLetterQueue: options.deadLetterQueue || null,
      ...options,
    };

    this.stats = {
      enqueued: 0,
      dequeued: 0,
      failed: 0,
      currentSize: 0,
    };
  }

  // Add message to queue
  async enqueue(message, priority = 0, delay = 0) {
    if (this.messages.length >= this.config.maxSize) {
      throw new Error(`Queue ${this.name} is full`);
    }

    const queueMessage = {
      id: Date.now() + Math.random(),
      data: message,
      priority,
      delay,
      enqueueTime: Date.now(),
      attempts: 0,
      maxAttempts: this.config.maxAttempts || 3,
    };

    if (delay > 0) {
      queueMessage.availableAt = Date.now() + delay;
    }

    // Insert based on priority
    const insertIndex = this.messages.findIndex((m) => m.priority < priority);
    if (insertIndex === -1) {
      this.messages.push(queueMessage);
    } else {
      this.messages.splice(insertIndex, 0, queueMessage);
    }

    this.stats.enqueued++;
    this.stats.currentSize = this.messages.length;

    // Notify consumers
    this.notifyConsumers();

    return queueMessage.id;
  }

  // Remove message from queue
  async dequeue() {
    const now = Date.now();

    // Find first available message
    const messageIndex = this.messages.findIndex(
      (m) => !m.availableAt || m.availableAt <= now
    );

    if (messageIndex === -1) {
      return null;
    }

    const message = this.messages.splice(messageIndex, 1)[0];
    this.stats.dequeued++;
    this.stats.currentSize = this.messages.length;

    return message;
  }

  // Register consumer
  consume(handler, options = {}) {
    const consumer = {
      id: Date.now() + Math.random(),
      handler,
      options: {
        concurrency: 1,
        autoAck: true,
        ...options,
      },
      stats: {
        processed: 0,
        errors: 0,
      },
    };

    this.consumers.push(consumer);
    this.startConsumer(consumer);

    return {
      stop: () => {
        const index = this.consumers.findIndex((c) => c.id === consumer.id);
        if (index > -1) {
          this.consumers.splice(index, 1);
        }
      },
    };
  }

  async startConsumer(consumer) {
    const processMessage = async () => {
      try {
        const message = await this.dequeue();

        if (!message) {
          // No messages available, wait and try again
          setTimeout(processMessage, 100);
          return;
        }

        try {
          await consumer.handler(message.data, {
            messageId: message.id,
            attempts: message.attempts,
            enqueueTime: message.enqueueTime,
          });

          consumer.stats.processed++;

          if (!consumer.options.autoAck) {
            // Message needs manual acknowledgment
            message.acked = true;
          }
        } catch (error) {
          message.attempts++;
          consumer.stats.errors++;

          if (message.attempts >= message.maxAttempts) {
            // Send to dead letter queue
            if (this.config.deadLetterQueue) {
              await this.config.deadLetterQueue.enqueue({
                originalMessage: message,
                error: error.message,
                failedAt: new Date().toISOString(),
              });
            }

            this.stats.failed++;
          } else {
            // Retry with exponential backoff
            const retryDelay = Math.pow(2, message.attempts) * 1000;
            await this.enqueue(message.data, message.priority, retryDelay);
          }
        }

        // Continue processing
        setImmediate(processMessage);
      } catch (error) {
        console.error(`Consumer error in queue ${this.name}:`, error);
        setTimeout(processMessage, 1000);
      }
    };

    // Start concurrent workers
    for (let i = 0; i < consumer.options.concurrency; i++) {
      processMessage();
    }
  }

  notifyConsumers() {
    // Implementation depends on specific requirements
    // Could use events, polling, or push notifications
  }

  getStats() {
    return {
      ...this.stats,
      consumers: this.consumers.length,
      averageWaitTime: this.calculateAverageWaitTime(),
    };
  }

  calculateAverageWaitTime() {
    if (this.messages.length === 0) return 0;

    const now = Date.now();
    const waitTimes = this.messages.map((m) => now - m.enqueueTime);
    return waitTimes.reduce((sum, time) => sum + time, 0) / waitTimes.length;
  }
}

// Saga Pattern for Distributed Transactions
class SagaOrchestrator {
  constructor(eventBus) {
    this.eventBus = eventBus;
    this.sagas = new Map();
    this.setupEventHandlers();
  }

  setupEventHandlers() {
    this.eventBus.subscribe('saga.start', (data) => {
      this.startSaga(data);
    });

    this.eventBus.subscribe('saga.step.completed', (data) => {
      this.handleStepCompleted(data);
    });

    this.eventBus.subscribe('saga.step.failed', (data) => {
      this.handleStepFailed(data);
    });
  }

  defineSaga(name, steps) {
    const saga = {
      name,
      steps,
      compensations: [],
    };

    // Generate compensation steps
    steps.forEach((step, index) => {
      if (step.compensation) {
        saga.compensations.unshift({
          ...step.compensation,
          originalStepIndex: index,
        });
      }
    });

    this.sagaDefinitions.set(name, saga);
  }

  async startSaga(sagaData) {
    const { sagaName, sagaId, data } = sagaData;
    const sagaDefinition = this.sagaDefinitions.get(sagaName);

    if (!sagaDefinition) {
      throw new Error(`Saga ${sagaName} not found`);
    }

    const sagaInstance = {
      id: sagaId,
      name: sagaName,
      data,
      currentStep: 0,
      completedSteps: [],
      status: 'running',
      startTime: Date.now(),
      error: null,
    };

    this.sagas.set(sagaId, sagaInstance);

    await this.executeNextStep(sagaInstance);
  }

  async executeNextStep(sagaInstance) {
    const sagaDefinition = this.sagaDefinitions.get(sagaInstance.name);
    const step = sagaDefinition.steps[sagaInstance.currentStep];

    if (!step) {
      // Saga completed successfully
      sagaInstance.status = 'completed';
      await this.eventBus.publish('saga.completed', {
        sagaId: sagaInstance.id,
        sagaName: sagaInstance.name,
        data: sagaInstance.data,
      });
      return;
    }

    try {
      await this.eventBus.publish(step.command, {
        sagaId: sagaInstance.id,
        stepIndex: sagaInstance.currentStep,
        ...sagaInstance.data,
        ...step.data,
      });
    } catch (error) {
      await this.handleStepFailed({
        sagaId: sagaInstance.id,
        stepIndex: sagaInstance.currentStep,
        error: error.message,
      });
    }
  }

  async handleStepCompleted(data) {
    const { sagaId, stepIndex, result } = data;
    const sagaInstance = this.sagas.get(sagaId);

    if (!sagaInstance) return;

    sagaInstance.completedSteps.push({
      stepIndex,
      result,
      completedAt: Date.now(),
    });

    sagaInstance.currentStep++;

    // Merge result data
    if (result) {
      sagaInstance.data = { ...sagaInstance.data, ...result };
    }

    await this.executeNextStep(sagaInstance);
  }

  async handleStepFailed(data) {
    const { sagaId, stepIndex, error } = data;
    const sagaInstance = this.sagas.get(sagaId);

    if (!sagaInstance) return;

    sagaInstance.status = 'compensating';
    sagaInstance.error = error;

    await this.startCompensation(sagaInstance);
  }

  async startCompensation(sagaInstance) {
    const sagaDefinition = this.sagaDefinitions.get(sagaInstance.name);

    // Execute compensations in reverse order
    for (const completedStep of sagaInstance.completedSteps.reverse()) {
      const compensation = sagaDefinition.compensations.find(
        (c) => c.originalStepIndex === completedStep.stepIndex
      );

      if (compensation) {
        try {
          await this.eventBus.publish(compensation.command, {
            sagaId: sagaInstance.id,
            originalStepIndex: completedStep.stepIndex,
            originalResult: completedStep.result,
            ...sagaInstance.data,
          });
        } catch (error) {
          console.error(
            `Compensation failed for saga ${sagaInstance.id}:`,
            error
          );
        }
      }
    }

    sagaInstance.status = 'failed';

    await this.eventBus.publish('saga.failed', {
      sagaId: sagaInstance.id,
      sagaName: sagaInstance.name,
      error: sagaInstance.error,
      data: sagaInstance.data,
    });
  }

  getSagaStatus(sagaId) {
    return this.sagas.get(sagaId);
  }
}

// Usage Example
const eventBus = new EventBus();
const sagaOrchestrator = new SagaOrchestrator(eventBus);

// Define a saga for order processing
sagaOrchestrator.defineSaga('process-order', [
  {
    command: 'inventory.reserve',
    data: { timeout: 30000 },
    compensation: {
      command: 'inventory.release',
    },
  },
  {
    command: 'payment.charge',
    compensation: {
      command: 'payment.refund',
    },
  },
  {
    command: 'shipping.create',
    compensation: {
      command: 'shipping.cancel',
    },
  },
]);

// Start a saga
eventBus.publish('saga.start', {
  sagaName: 'process-order',
  sagaId: 'order-123',
  data: {
    orderId: '123',
    customerId: '456',
    items: [{ productId: 'p1', quantity: 2 }],
    amount: 99.99,
  },
});

Conclusion

Microservices architecture offers significant benefits for scalable, maintainable systems but comes with increased complexity in areas like service communication, data consistency, and operational overhead. Start with a monolithic architecture and extract services as your system grows and requirements become clearer. Focus on proper service boundaries, robust communication patterns, comprehensive monitoring, and automated deployment pipelines. Remember that microservices are a tool for solving specific problems - use them when the benefits outweigh the complexity they introduce.