Skip to main content

Mercury: Migrating to Event-Driven Architecture

· 3 min read
Max Kaido
Architect

Mercury, our trading system, has undergone a significant architectural transformation from direct service dependencies to an event-driven architecture. This migration has improved the system's maintainability, testability, and extensibility.

Why Event-Driven Architecture?

Before the migration, Mercury components were tightly coupled. Services directly called methods on other services, creating a web of dependencies that made testing difficult and changes risky.

The event-driven approach decouples components by allowing them to communicate through events. Services publish events when important things happen, and interested services subscribe to those events and react accordingly.

Benefits We've Seen

  1. Improved Decoupling: Services no longer need direct references to each other
  2. Better Testability: We can test components in isolation by mocking event inputs
  3. Easier Extensibility: Adding new functionality often just means subscribing to existing events
  4. Clearer Domain Boundaries: Event definitions serve as contracts between services
  5. Better Resilience: Services can handle event processing failures independently

Key Components of Our Implementation

1. Event Infrastructure

The core of our event system is quite simple:

// Base event interface
export interface BaseEvent {
id: string;
timestamp: Date;
type: string;
version: string;
correlationId?: string;
sourceId?: string;
}

// Event handler type
export type EventHandler<T extends BaseEvent> = (event: T) => Promise<void> | void;

// Event service (wrapper around NestJS EventEmitter2)
@Injectable()
export class EventService {
constructor(private readonly eventEmitter: EventEmitter2) {}

createEvent<T extends BaseEvent>(type, payload, options): T {
return { id: uuidv4(), timestamp: new Date(), type, ... } as T;
}

async publish<T extends BaseEvent>(event: T): Promise<void> {
this.eventEmitter.emit(event.type, event);
}

subscribe<T extends BaseEvent>(
eventType: string,
handler: EventHandler<T>,
): () => void {
this.eventEmitter.on(eventType, handler);
return () => this.eventEmitter.removeListener(eventType, handler);
}
}

2. Domain-Specific Events

We've defined domain-specific events for each part of the system:

  • Position Events: Created, Closed, StatusChanged, PartiallyClosed
  • Order Events: Created, StatusChanged, Filled, Cancelled
  • Transaction Events: Created, AccountBalanceChanged

3. Event Publishers

Each domain has its own publisher service that creates and publishes events:

@Injectable()
export class PositionEventPublisherService {
constructor(private readonly eventService: EventService) {}

async publishPositionCreated(position: ShadowPosition): Promise<void> {
const event = this.eventService.createEvent<PositionCreatedEvent>(
'position.created',
{ payload: { positionId: position.id, ... } },
{ sourceId: position.id },
);
await this.eventService.publish(event);
}
// ... other publishing methods
}

4. Event Listeners

Services that need to react to events have listener implementations:

@Injectable()
export class OrderListenerService implements OnModuleInit, OnModuleDestroy {
private unsubscribeFunctions: Array<() => void> = [];

constructor(
private readonly eventService: EventService,
private readonly orderService: OrderService,
) {}

onModuleInit() {
const subscriptions = [
this.eventService.subscribe<PositionCreatedEvent>(
'position.created',
this.handlePositionCreated.bind(this),
),
// ... other subscriptions
];
this.unsubscribeFunctions.push(...subscriptions);
}

private async handlePositionCreated(
event: PositionCreatedEvent,
): Promise<void> {
// Create orders based on the new position
}

// ... other handlers and cleanup
}

Migration Strategy

We migrated incrementally, following these steps:

  1. Initial Infrastructure: Set up event service and core event types
  2. Event Publishers: Created domain-specific publishers for each entity
  3. Event Listeners: Built listeners for services that need to react to events
  4. Service Refactoring: Updated services to publish events and removed direct calls
  5. Testing: Added tests for event publishers and listeners

Before vs After

Before Migration

async closePosition(positionId, exitReason, exitPrice) {
// Update position
position.status = PositionStatus.CLOSED;

// Direct service call
await this.shadowAccountService.processPositionClosure(position);

// Direct service call
await this.orderService.cancelOrdersByPosition(positionId);
}

After Migration

async closePosition(positionId, exitReason, exitPrice) {
// Update position
position.status = PositionStatus.CLOSED;

// Publish events instead of direct calls
await this.positionEventPublisher.publishPositionStatusChanged(
position, previousStatus, exitPrice
);

await this.positionEventPublisher.publishPositionClosed(
position, exitReason
);
}

The ShadowAccountService and OrderService now listen for these events and react accordingly.

Conclusion

Migrating to an event-driven architecture has significantly improved Mercury's maintainability and flexibility. Services are now more focused, easier to test, and the system is more resilient to changes.

The migration process itself was incremental, allowing us to gradually reap the benefits of event-driven design while minimizing risk. Our test coverage also improved as we added specific tests for event publishers and listeners.

As we move forward, the event history provides valuable insights for debugging and monitoring the system, and adding new functionality is now much simpler as it often requires just subscribing to existing events.