Microservices Communication Patterns: Building Resilient Distributed Systems

Introduction

In our Modern Development Practices series, we’ve explored test-driven development, code quality gates, and API design patterns. Today, we’re diving into microservices communication patterns – the backbone of any successful distributed system. Effective communication between services determines the resilience, scalability, and maintainability of your entire architecture.

Synchronous Communication Patterns

HTTP/REST with Circuit Breaker Pattern

The most common synchronous pattern uses HTTP/REST calls with resilience mechanisms:

interface CircuitBreakerConfig {
  failureThreshold: number;
  resetTimeout: number;
  monitoringPeriod: number;
}

class CircuitBreaker {
  private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
  private failureCount = 0;
  private lastFailureTime?: Date;

  constructor(private config: CircuitBreakerConfig) {}

  async execute<T>(operation: () => Promise<T>): Promise<T> {
    if (this.state === 'OPEN') {
      if (this.shouldAttemptReset()) {
        this.state = 'HALF_OPEN';
      } else {
        throw new Error('Circuit breaker is OPEN');
      }
    }

    try {
      const result = await operation();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess(): void {
    this.failureCount = 0;
    this.state = 'CLOSED';
  }

  private onFailure(): void {
    this.failureCount++;
    this.lastFailureTime = new Date();
    
    if (this.failureCount >= this.config.failureThreshold) {
      this.state = 'OPEN';
    }
  }

  private shouldAttemptReset(): boolean {
    const now = new Date();
    const timeSinceLastFailure = now.getTime() - (this.lastFailureTime?.getTime() || 0);
    return timeSinceLastFailure >= this.config.resetTimeout;
  }
}

// Usage in a service client
class UserServiceClient {
  private circuitBreaker: CircuitBreaker;

  constructor() {
    this.circuitBreaker = new CircuitBreaker({
      failureThreshold: 5,
      resetTimeout: 60000, // 1 minute
      monitoringPeriod: 10000 // 10 seconds
    });
  }

  async getUser(userId: string): Promise<User> {
    return this.circuitBreaker.execute(async () => {
      const response = await fetch(`${this.baseUrl}/users/${userId}`, {
        timeout: 5000,
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${this.authToken}`
        }
      });

      if (!response.ok) {
        throw new Error(`Failed to fetch user: ${response.statusText}`);
      }

      return response.json();
    });
  }
}

Request-Response with Retry and Timeout

Implement robust retry mechanisms with exponential backoff:

interface RetryConfig {
  maxAttempts: number;
  baseDelay: number;
  maxDelay: number;
  backoffMultiplier: number;
}

class RetryableHttpClient {
  constructor(private retryConfig: RetryConfig) {}

  async request<T>(
    url: string, 
    options: RequestInit & { timeout?: number } = {}
  ): Promise<T> {
    const { timeout = 10000, ...fetchOptions } = options;
    
    for (let attempt = 1; attempt <= this.retryConfig.maxAttempts; attempt++) {
      try {
        const controller = new AbortController();
        const timeoutId = setTimeout(() => controller.abort(), timeout);

        const response = await fetch(url, {
          ...fetchOptions,
          signal: controller.signal
        });

        clearTimeout(timeoutId);

        if (response.ok) {
          return await response.json();
        }

        // Don't retry on client errors (4xx)
        if (response.status >= 400 && response.status < 500) {
          throw new Error(`Client error: ${response.statusText}`);
        }

        throw new Error(`Server error: ${response.statusText}`);
      } catch (error) {
        if (attempt === this.retryConfig.maxAttempts) {
          throw error;
        }

        const delay = Math.min(
          this.retryConfig.baseDelay * Math.pow(this.retryConfig.backoffMultiplier, attempt - 1),
          this.retryConfig.maxDelay
        );

        await this.sleep(delay);
      }
    }

    throw new Error('Max retry attempts exceeded');
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Asynchronous Communication Patterns

Event-Driven Architecture with AWS SNS/SQS

Implement robust event-driven communication using AWS services:

import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';

interface DomainEvent {
  eventType: string;
  aggregateId: string;
  version: number;
  timestamp: Date;
  data: Record<string, any>;
  metadata?: Record<string, any>;
}

class EventPublisher {
  private snsClient: SNSClient;

  constructor() {
    this.snsClient = new SNSClient({ region: process.env.AWS_REGION });
  }

  async publishEvent(event: DomainEvent, topicArn: string): Promise<void> {
    const message = {
      ...event,
      timestamp: event.timestamp.toISOString()
    };

    const command = new PublishCommand({
      TopicArn: topicArn,
      Message: JSON.stringify(message),
      MessageAttributes: {
        eventType: {
          DataType: 'String',
          StringValue: event.eventType
        },
        aggregateId: {
          DataType: 'String',
          StringValue: event.aggregateId
        }
      }
    });

    await this.snsClient.send(command);
  }
}

class EventHandler {
  private sqsClient: SQSClient;

  constructor() {
    this.sqsClient = new SQSClient({ region: process.env.AWS_REGION });
  }

  async processMessages(queueUrl: string, handler: (event: DomainEvent) => Promise<void>): Promise<void> {
    while (true) {
      try {
        const command = new ReceiveMessageCommand({
          QueueUrl: queueUrl,
          MaxNumberOfMessages: 10,
          WaitTimeSeconds: 20, // Long polling
          VisibilityTimeoutSeconds: 60
        });

        const response = await this.sqsClient.send(command);

        if (!response.Messages || response.Messages.length === 0) {
          continue;
        }

        for (const message of response.Messages) {
          try {
            const snsMessage = JSON.parse(message.Body || '{}');
            const event: DomainEvent = JSON.parse(snsMessage.Message);
            
            await handler(event);

            // Delete message only after successful processing
            await this.sqsClient.send(new DeleteMessageCommand({
              QueueUrl: queueUrl,
              ReceiptHandle: message.ReceiptHandle
            }));
          } catch (error) {
            console.error('Failed to process message:', error);
            // Message will become visible again after visibility timeout
          }
        }
      } catch (error) {
        console.error('Error receiving messages:', error);
        await this.sleep(5000); // Wait before retrying
      }
    }
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Saga Pattern for Distributed Transactions

Implement the Saga pattern for managing distributed transactions:

interface SagaStep {
  execute(): Promise<any>;
  compensate(): Promise<void>;
}

class OrderSaga {
  private steps: SagaStep[] = [];
  private completedSteps: SagaStep[] = [];

  constructor(
    private orderService: OrderService,
    private paymentService: PaymentService,
    private inventoryService: InventoryService,
    private shippingService: ShippingService
  ) {}

  async execute(orderData: CreateOrderRequest): Promise<void> {
    this.steps = [
      new CreateOrderStep(this.orderService, orderData),
      new ReserveInventoryStep(this.inventoryService, orderData.items),
      new ProcessPaymentStep(this.paymentService, orderData.payment),
      new CreateShipmentStep(this.shippingService, orderData)
    ];

    try {
      for (const step of this.steps) {
        await step.execute();
        this.completedSteps.push(step);
      }
    } catch (error) {
      await this.compensate();
      throw error;
    }
  }

  private async compensate(): Promise<void> {
    // Execute compensation in reverse order
    for (let i = this.completedSteps.length - 1; i >= 0; i--) {
      try {
        await this.completedSteps[i].compensate();
      } catch (compensationError) {
        console.error('Compensation failed:', compensationError);
        // Log for manual intervention
      }
    }
  }
}

class CreateOrderStep implements SagaStep {
  private orderId?: string;

  constructor(
    private orderService: OrderService,
    private orderData: CreateOrderRequest
  ) {}

  async execute(): Promise<string> {
    this.orderId = await this.orderService.createOrder(this.orderData);
    return this.orderId;
  }

  async compensate(): Promise<void> {
    if (this.orderId) {
      await this.orderService.cancelOrder(this.orderId);
    }
  }
}

class ReserveInventoryStep implements SagaStep {
  private reservationId?: string;

  constructor(
    private inventoryService: InventoryService,
    private items: OrderItem[]
  ) {}

  async execute(): Promise<string> {
    this.reservationId = await this.inventoryService.reserveItems(this.items);
    return this.reservationId;
  }

  async compensate(): Promise<void> {
    if (this.reservationId) {
      await this.inventoryService.releaseReservation(this.reservationId);
    }
  }
}

Event Sourcing with CQRS

Implement event sourcing for audit trails and eventual consistency:

interface Event {
  id: string;
  aggregateId: string;
  eventType: string;
  data: any;
  version: number;
  timestamp: Date;
}

class EventStore {
  private events: Map<string, Event[]> = new Map();

  async appendEvents(aggregateId: string, events: Event[], expectedVersion: number): Promise<void> {
    const existingEvents = this.events.get(aggregateId) || [];
    
    if (existingEvents.length !== expectedVersion) {
      throw new Error('Concurrency conflict');
    }

    this.events.set(aggregateId, [...existingEvents, ...events]);
  }

  async getEvents(aggregateId: string, fromVersion?: number): Promise<Event[]> {
    const events = this.events.get(aggregateId) || [];
    return fromVersion ? events.filter(e => e.version > fromVersion) : events;
  }
}

abstract class AggregateRoot {
  protected uncommittedEvents: Event[] = [];
  protected version = 0;

  constructor(protected id: string) {}

  getUncommittedEvents(): Event[] {
    return this.uncommittedEvents;
  }

  markEventsAsCommitted(): void {
    this.uncommittedEvents = [];
  }

  loadFromHistory(events: Event[]): void {
    events.forEach(event => {
      this.applyEvent(event);
      this.version = event.version;
    });
  }

  protected addEvent(eventType: string, data: any): void {
    const event: Event = {
      id: this.generateId(),
      aggregateId: this.id,
      eventType,
      data,
      version: this.version + 1,
      timestamp: new Date()
    };

    this.uncommittedEvents.push(event);
    this.applyEvent(event);
    this.version = event.version;
  }

  protected abstract applyEvent(event: Event): void;
  
  private generateId(): string {
    return Math.random().toString(36).substr(2, 9);
  }
}

class Order extends AggregateRoot {
  private status: 'PENDING' | 'CONFIRMED' | 'SHIPPED' | 'CANCELLED' = 'PENDING';
  private items: OrderItem[] = [];

  static create(id: string, items: OrderItem[]): Order {
    const order = new Order(id);
    order.addEvent('OrderCreated', { items });
    return order;
  }

  confirm(): void {
    if (this.status !== 'PENDING') {
      throw new Error('Order cannot be confirmed');
    }
    this.addEvent('OrderConfirmed', {});
  }

  protected applyEvent(event: Event): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.items = event.data.items;
        this.status = 'PENDING';
        break;
      case 'OrderConfirmed':
        this.status = 'CONFIRMED';
        break;
      // ... other events
    }
  }
}

Service Mesh and API Gateway Patterns

API Gateway with Rate Limiting

interface RateLimitConfig {
  windowSize: number; // in seconds
  maxRequests: number;
}

class RateLimiter {
  private requests: Map<string, number[]> = new Map();

  constructor(private config: RateLimitConfig) {}

  isAllowed(clientId: string): boolean {
    const now = Date.now();
    const windowStart = now - (this.config.windowSize * 1000);
    
    const clientRequests = this.requests.get(clientId) || [];
    const validRequests = clientRequests.filter(timestamp => timestamp > windowStart);
    
    if (validRequests.length >= this.config.maxRequests) {
      return false;
    }

    validRequests.push(now);
    this.requests.set(clientId, validRequests);
    return true;
  }
}

// AWS Lambda API Gateway integration
export const handler = async (event: APIGatewayProxyEvent): Promise<APIGatewayProxyResult> => {
  const rateLimiter = new RateLimiter({
    windowSize: 60, // 1 minute
    maxRequests: 100
  });

  const clientId = event.requestContext.identity.sourceIp;

  if (!rateLimiter.isAllowed(clientId)) {
    return {
      statusCode: 429,
      headers: {
        'Content-Type': 'application/json',
        'Retry-After': '60'
      },
      body: JSON.stringify({ error: 'Rate limit exceeded' })
    };
  }

  // Process request...
  return {
    statusCode: 200,
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ message: 'Success' })
  };
};

Monitoring and Observability

Distributed Tracing

import { trace, SpanStatusCode, SpanKind } from '@opentelemetry/api';

class TracedHttpClient {
  private tracer = trace.getTracer('http-client');

  async request<T>(url: string, options: RequestInit = {}): Promise<T> {
    const span = this.tracer.startSpan('http.request', {
      kind: SpanKind.CLIENT,
      attributes: {
        'http.method': options.method || 'GET',
        'http.url': url,
        'http.user_agent': 'microservice-client'
      }
    });

    try {
      const response = await fetch(url, {
        ...options,
        headers: {
          ...options.headers,
          'traceparent': this.getTraceParent(span)
        }
      });

      span.setAttributes({
        'http.status_code': response.status,
        'http.response.size': response.headers.get('content-length') || 0
      });

      if (!response.ok) {
        span.setStatus({
          code: SpanStatusCode.ERROR,
          message: `HTTP ${response.status}: ${response.statusText}`
        });
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }

      const data = await response.json();
      span.setStatus({ code: SpanStatusCode.OK });
      return data;
    } catch (error) {
      span.setStatus({
        code: SpanStatusCode.ERROR,
        message: error instanceof Error ? error.message : 'Unknown error'
      });
      throw error;
    } finally {
      span.end();
    }
  }

  private getTraceParent(span: any): string {
    // Implementation would extract trace context for propagation
    return 'trace-context-header';
  }
}

Testing Communication Patterns

Contract Testing with Pact

import { Pact } from '@pact-foundation/pact';
import { UserServiceClient } from './user-service-client';

describe('User Service Contract', () => {
  const provider = new Pact({
    consumer: 'order-service',
    provider: 'user-service',
    port: 1234,
    log: path.resolve(process.cwd(), 'logs', 'pact.log'),
    dir: path.resolve(process.cwd(), 'pacts'),
    logLevel: 'INFO'
  });

  beforeAll(() => provider.setup());
  afterAll(() => provider.finalize());
  afterEach(() => provider.verify());

  it('should get user by id', async () => {
    await provider.addInteraction({
      state: 'user exists',
      uponReceiving: 'a request for user with id 123',
      withRequest: {
        method: 'GET',
        path: '/users/123',
        headers: {
          'Accept': 'application/json'
        }
      },
      willRespondWith: {
        status: 200,
        headers: {
          'Content-Type': 'application/json'
        },
        body: {
          id: '123',
          name: 'John Doe',
          email: 'john@example.com'
        }
      }
    });

    const client = new UserServiceClient('http://localhost:1234');
    const user = await client.getUser('123');

    expect(user.id).toBe('123');
    expect(user.name).toBe('John Doe');
  });
});

Best Practices and Anti-Patterns

Service Boundaries and Data Consistency

// Good: Services own their data
class OrderService {
  async createOrder(customerId: string, items: OrderItem[]): Promise<Order> {
    // Don't call customer service to validate - use eventual consistency
    const order = new Order(customerId, items);
    
    // Publish event for other services to react
    await this.eventPublisher.publishEvent({
      eventType: 'OrderCreated',
      aggregateId: order.id,
      data: { customerId, items },
      version: 1,
      timestamp: new Date()
    });

    return order;
  }
}

// Bad: Distributed transactions across services
class BadOrderService {
  async createOrder(customerId: string, items: OrderItem[]): Promise<Order> {
    // Anti-pattern: Synchronous calls to multiple services
    await this.customerService.validateCustomer(customerId);
    await this.inventoryService.reserveItems(items);
    await this.paymentService.authorizePayment(customerId, total);
    
    // If any of these fail, the entire transaction fails
    return this.orderRepository.save(new Order(customerId, items));
  }
}

Conclusion

Effective microservices communication requires careful consideration of patterns, resilience, and consistency models. The patterns we’ve explored – from circuit breakers and saga patterns to event sourcing and service mesh – provide the foundation for building robust distributed systems.

Key takeaways:

  • Use synchronous communication sparingly and always with resilience patterns
  • Embrace asynchronous, event-driven communication for loose coupling
  • Implement proper monitoring and distributed tracing
  • Design for failure and eventual consistency
  • Test communication patterns with contract testing

In our next post, “Database Design for Serverless Applications,” we’ll explore how data architecture patterns align with these communication strategies in cloud-native environments.

Further Reading

Comments