junyeokk
Blog
NestJS·2024. 11. 20

NestJS EventEmitter

NestJS 애플리케이션을 개발하다 보면 하나의 동작이 여러 후속 작업을 트리거해야 하는 상황이 자주 생긴다. 예를 들어 피드 랭킹이 변경되면 SSE로 클라이언트에 알림을 보내야 하고, 사용자가 회원가입하면 이메일을 보내야 한다. 이런 작업들을 하나의 서비스 메서드에 직접 넣으면 코드가 뒤엉키고, 서비스 간 순환 의존성이 생기기 쉽다.

이벤트 기반 아키텍처는 이 문제를 해결한다. "무언가 발생했다"는 사실만 알리고, 그에 대한 반응은 각자 알아서 처리하게 만드는 것이다. 발행자(publisher)와 구독자(subscriber)가 서로를 직접 참조하지 않기 때문에 모듈 간 결합도가 낮아진다.

NestJS에서는 @nestjs/event-emitter 패키지가 이 역할을 담당한다. 내부적으로 Node.js의 EventEmitter2 라이브러리를 사용하며, NestJS의 DI 시스템과 자연스럽게 통합된다.

EventEmitter2 vs Node.js 기본 EventEmitter

Node.js에는 기본 events 모듈의 EventEmitter가 있다. 그런데 왜 별도 라이브러리를 쓸까?

기능Node.js EventEmitterEventEmitter2
와일드카드 이벤트✓ (order.* 패턴 매칭)
네임스페이스✓ (점으로 구분된 계층 구조)
비동기 이벤트✗ (동기만)✓ (emitAsync)
타임아웃✓ (리스너 실행 타임아웃)
prepend 리스너

기본 EventEmitter는 단순한 이벤트 발행/구독만 지원한다. EventEmitter2는 와일드카드 매칭, 비동기 처리, 네임스페이스 같은 실전에서 필요한 기능들을 추가로 제공한다. 대규모 애플리케이션에서 이벤트가 많아지면 이런 기능들이 중요해진다.

설치 및 설정

bash
npm install @nestjs/event-emitter

루트 모듈에서 EventEmitterModule.forRoot()를 import하면 된다.

typescript
import { Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
  imports: [
    EventEmitterModule.forRoot(),
  ],
})
export class AppModule {}

forRoot()에 옵션을 넘길 수 있다. EventEmitter2의 생성자 옵션이 그대로 전달된다.

typescript
EventEmitterModule.forRoot({
  // 와일드카드 이벤트 사용
  wildcard: true,
  // 네임스페이스 구분자 (기본값: '.')
  delimiter: '.',
  // 에러 발생 시 newListener 이벤트 발행
  newListener: false,
  // 에러 발생 시 removeListener 이벤트 발행
  removeListener: false,
  // 최대 리스너 수 (메모리 누수 감지용)
  maxListeners: 10,
  // 에러 이벤트 이름이 'error'일 때 에러를 throw할지
  verboseMemoryLeak: false,
  // Promise 거부 시 에러를 무시할지
  ignoreErrors: false,
})

대부분의 경우 기본 설정으로 충분하다. wildcard: true는 이벤트가 많아져서 패턴 매칭이 필요할 때 켜면 된다.

이벤트 발행하기

이벤트를 발행하려면 EventEmitter2를 주입받아서 emit() 메서드를 호출한다.

typescript
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';

@Injectable()
export class FeedScheduler {
  constructor(private readonly eventEmitter: EventEmitter2) {}

  async analyzeTrend() {
    const trendFeeds = await this.calculateTrends();
    
    // 이벤트 발행
    this.eventEmitter.emit('ranking-update', trendFeeds);
  }
}

emit()의 첫 번째 인자는 이벤트 이름(문자열), 두 번째부터는 리스너에 전달할 데이터다. 여러 개의 인자를 넘길 수도 있다.

typescript
// 여러 인자 전달
this.eventEmitter.emit('order.created', order, user, timestamp);

// 객체로 묶어서 전달 (이 방식이 더 명확함)
this.eventEmitter.emit('order.created', {
  order,
  user,
  timestamp,
});

비동기 이벤트 발행

emit()은 동기적으로 리스너를 호출한다. 리스너가 비동기 작업을 수행하더라도 그 완료를 기다리지 않는다. 리스너의 완료를 보장해야 한다면 emitAsync()를 사용한다.

typescript
// 모든 리스너의 실행 완료를 기다림
await this.eventEmitter.emitAsync('critical-event', data);

emitAsync()는 모든 리스너가 반환한 Promise를 Promise.all()로 묶어서 기다린다. 리스너 중 하나라도 에러를 던지면 전체가 실패한다. 일반적인 경우에는 emit()으로 fire-and-forget 방식을 쓰는 게 성능상 유리하다.

이벤트 구독하기

이벤트를 구독하는 방법은 두 가지다. 데코레이터 기반과 직접 등록 방식이다.

@OnEvent 데코레이터

NestJS답게 데코레이터로 리스너를 등록하는 방식이다. 가장 깔끔하고 권장되는 방법이다.

typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

@Injectable()
export class NotificationListener {
  @OnEvent('ranking-update')
  handleRankingUpdate(trendFeeds: TrendFeed[]) {
    // 트렌드 변경 시 처리 로직
    console.log('트렌드 업데이트:', trendFeeds.length);
  }

  @OnEvent('user.created')
  async handleUserCreated(payload: UserCreatedEvent) {
    // 비동기 처리도 가능
    await this.sendWelcomeEmail(payload.email);
  }
}

@OnEvent에 옵션을 넘길 수 있다.

typescript
@OnEvent('order.created', {
  // 비동기 리스너일 때, emitAsync()가 이 리스너의 완료를 기다릴지
  async: true,
  // 리스너에서 에러가 발생해도 다른 리스너 실행을 중단하지 않음
  promisify: true,
  // 에러 발생 시 조용히 넘어감
  suppressErrors: false,
})
handleOrderCreated(event: OrderCreatedEvent) {
  // ...
}

리스너 클래스는 반드시 어떤 모듈의 providers에 등록되어 있어야 한다. NestJS의 DI 컨테이너가 인스턴스를 관리하고 있어야 데코레이터가 동작하기 때문이다.

typescript
@Module({
  providers: [NotificationListener],
})
export class NotificationModule {}

직접 등록 방식 (on/once)

EventEmitter2를 직접 사용해서 리스너를 등록할 수도 있다. 런타임에 동적으로 리스너를 추가/제거해야 할 때 유용하다.

typescript
@Injectable()
export class FeedController {
  constructor(private readonly eventEmitter: EventEmitter2) {}

  @Sse('trend/sse')
  streamTrendUpdates() {
    return new Observable((observer) => {
      // 초기 데이터 전송
      this.loadInitialData().then(data => observer.next({ data }));

      // 이벤트 리스너 등록
      this.eventEmitter.on('ranking-update', (trendData) => {
        observer.next({
          data: {
            message: '새로운 트렌드 피드 수신 완료',
            data: trendData,
          },
        });
      });
    });
  }
}

이 패턴은 SSE(Server-Sent Events)와 조합할 때 특히 유용하다. 클라이언트가 SSE 연결을 맺으면 Observable을 생성하고, 이벤트가 발생할 때마다 데이터를 push한다. 연결이 끊기면 Observable이 자동으로 정리된다.

once()를 사용하면 이벤트를 딱 한 번만 수신한다.

typescript
this.eventEmitter.once('initialization-complete', () => {
  console.log('초기화 완료!');
});

이벤트 페이로드 타입 정의

타입 안전성을 위해 이벤트 페이로드를 클래스나 인터페이스로 정의하는 것이 좋다.

typescript
// events/ranking-update.event.ts
export class RankingUpdateEvent {
  constructor(
    public readonly feeds: TrendFeed[],
    public readonly timestamp: Date = new Date(),
  ) {}
}

// 발행 측
this.eventEmitter.emit(
  'ranking-update',
  new RankingUpdateEvent(trendFeeds),
);

// 구독 측
@OnEvent('ranking-update')
handleRankingUpdate(event: RankingUpdateEvent) {
  console.log(event.feeds, event.timestamp);
}

이벤트 이름도 상수로 관리하면 오타를 방지할 수 있다.

typescript
export const EVENTS = {
  RANKING_UPDATE: 'ranking-update',
  USER_CREATED: 'user.created',
  FEED_CRAWLED: 'feed.crawled',
} as const;

// 사용
this.eventEmitter.emit(EVENTS.RANKING_UPDATE, payload);

@OnEvent(EVENTS.RANKING_UPDATE)
handleRankingUpdate(payload: RankingUpdateEvent) {}

와일드카드 이벤트

forRoot()에서 wildcard: true를 설정하면 패턴 매칭이 가능하다.

typescript
// 'order.created', 'order.cancelled', 'order.shipped' 등 모두 매칭
@OnEvent('order.*')
handleAllOrderEvents(payload: any) {
  console.log('주문 관련 이벤트 발생');
}

// 모든 이벤트 매칭
@OnEvent('**')
handleAllEvents(payload: any) {
  console.log('어떤 이벤트든 발생');
}

*는 한 레벨의 네임스페이스를 매칭하고, **는 모든 레벨을 매칭한다. 로깅이나 모니터링 용도로 모든 이벤트를 감지할 때 **를 쓸 수 있다.

실전 패턴: 스케줄러 + 이벤트 + SSE

Denamu 프로젝트에서 실제로 사용된 패턴을 살펴보자. 트렌드 피드의 랭킹 변동을 감지해서 SSE로 클라이언트에 실시간 전달하는 구조다.

text
┌──────────────┐     emit      ┌──────────────┐    SSE push    ┌──────────┐
│ FeedScheduler│ ──────────▶   │FeedController│ ──────────▶    │  Client  │
│  (@Cron 30s) │  'ranking-   │  (@Sse)      │               │          │
│              │   update'     │              │               │          │
└──────────────┘               └──────────────┘               └──────────┘

스케줄러가 30초마다 Redis에서 현재 트렌드와 이전 트렌드를 비교한다. 변동이 있으면 이벤트를 발행한다.

typescript
@Injectable()
export class FeedScheduler {
  constructor(
    private readonly redisService: RedisService,
    private readonly eventEmitter: EventEmitter2,
    private readonly feedService: FeedService,
  ) {}

  @Cron(CronExpression.EVERY_30_SECONDS)
  async analyzeTrend() {
    const [originTrend, nowTrend] = await Promise.all([
      this.redisService.lrange('feed:origin_trend', 0, 3),
      this.redisService.zrevrange('feed:trend', 0, 3),
    ]);

    if (!_.isEqual(originTrend, nowTrend)) {
      // Redis 업데이트
      await this.redisService.executePipeline((pipeline) => {
        pipeline.del('feed:origin_trend');
        pipeline.rpush('feed:origin_trend', ...nowTrend);
      });
      
      const trendFeeds = await this.feedService.readTrendFeedList();
      this.eventEmitter.emit('ranking-update', trendFeeds);
    }
  }
}

컨트롤러는 SSE 엔드포인트에서 이벤트를 기다리다가, 이벤트가 발행되면 클라이언트에 데이터를 push한다.

typescript
@Sse('trend/sse')
readTrendFeedList() {
  return new Observable((observer) => {
    // 연결 즉시 현재 데이터 전송
    this.feedService.readTrendFeedList()
      .then(data => observer.next({ data }));

    // 이후 변경사항은 이벤트로 수신
    this.eventEmitter.on('ranking-update', (trendData) => {
      observer.next({ data: trendData });
    });
  });
}

이 구조의 핵심은 스케줄러가 컨트롤러를 전혀 알 필요가 없다는 것이다. 스케줄러는 "랭킹이 바뀌었다"는 사실만 알리고, 그걸 SSE로 보내든 웹소켓으로 보내든 로그로 남기든 관심이 없다. 나중에 웹소켓 알림을 추가하고 싶으면 새 리스너만 만들면 된다. 기존 코드를 건드릴 필요가 없다.

주의할 점

메모리 누수

on()으로 직접 등록한 리스너는 해제하지 않으면 메모리 누수가 발생할 수 있다. 특히 요청마다 리스너를 등록하는 SSE 패턴에서 주의해야 한다.

typescript
@Sse('trend/sse')
readTrendFeedList() {
  return new Observable((observer) => {
    const handler = (data) => observer.next({ data });
    this.eventEmitter.on('ranking-update', handler);

    // 연결 종료 시 리스너 제거
    return () => {
      this.eventEmitter.off('ranking-update', handler);
    };
  });
}

Observable의 teardown 함수에서 off()로 리스너를 제거해야 한다. @OnEvent 데코레이터를 사용하면 NestJS가 애플리케이션 종료 시 자동으로 정리해주기 때문에 이 문제가 없다.

에러 처리

emit()으로 발행된 이벤트의 리스너에서 에러가 발생하면, 다른 리스너의 실행이 중단될 수 있다. 리스너에서 try-catch로 에러를 잡거나, @OnEventsuppressErrors: true 옵션을 사용해야 한다.

typescript
@OnEvent('ranking-update')
handleRankingUpdate(data: TrendFeed[]) {
  try {
    // 처리 로직
  } catch (error) {
    // 에러를 잡아서 다른 리스너에 영향을 주지 않도록
    this.logger.error('랭킹 업데이트 처리 실패', error);
  }
}

순환 의존성 회피

이벤트 기반 아키텍처의 가장 큰 장점이 바로 이것이다. 모듈 A가 모듈 B의 기능을 호출해야 할 때, 직접 의존하는 대신 이벤트를 발행하면 순환 의존성을 피할 수 있다.

text
// 직접 의존 (순환 위험)
FeedModule → NotificationModule → FeedModule (순환!)

// 이벤트 기반 (순환 없음)
FeedModule --emit--> EventEmitter <--listen-- NotificationModule

테스트

이벤트 기반 코드를 테스트할 때는 EventEmitter2를 목(mock)으로 대체하거나, 실제 인스턴스를 만들어서 이벤트 발행 여부를 확인한다.

typescript
describe('FeedScheduler', () => {
  let scheduler: FeedScheduler;
  let eventEmitter: EventEmitter2;

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      providers: [
        FeedScheduler,
        EventEmitter2,
        // ... 다른 의존성
      ],
    }).compile();

    scheduler = module.get(FeedScheduler);
    eventEmitter = module.get(EventEmitter2);
  });

  it('트렌드가 변경되면 이벤트를 발행해야 한다', async () => {
    const emitSpy = jest.spyOn(eventEmitter, 'emit');
    
    // 트렌드 변경 시뮬레이션
    await scheduler.analyzeTrend();

    expect(emitSpy).toHaveBeenCalledWith(
      'ranking-update',
      expect.any(Array),
    );
  });
});

EventEmitter vs 메시지 큐

프로세스 내부에서의 이벤트 통신은 EventEmitter로 충분하다. 하지만 프로세스가 분리된 마이크로서비스 간 통신이 필요하다면 RabbitMQ 같은 메시지 큐를 써야 한다.

특성EventEmitter메시지 큐 (RabbitMQ)
범위단일 프로세스 내프로세스/서버 간
전달 보장✗ (프로세스 종료 시 유실)✓ (영속화 가능)
설정 복잡도낮음높음
지연 시간거의 0 (메모리 내)네트워크 지연 있음
재시도/DLQ직접 구현 필요내장 지원

같은 NestJS 프로세스 안에서 모듈 간 느슨한 결합이 목적이라면 EventEmitter, 다른 프로세스로 작업을 위임하거나 메시지 전달을 보장해야 한다면 메시지 큐를 선택하면 된다. 두 가지를 조합해서 쓰는 것도 일반적인 패턴이다.

정리

  • emit()은 fire-and-forget으로 발행자와 구독자를 분리하며, 완료 보장이 필요하면 emitAsync()를 사용한다
  • on()으로 직접 등록한 리스너는 teardown에서 off()로 해제해야 메모리 누수를 방지할 수 있고, @OnEvent는 자동 정리된다
  • 단일 프로세스 내 모듈 간 결합도를 낮추는 데 적합하며, 프로세스 간 통신이나 전달 보장이 필요하면 메시지 큐로 전환한다

관련 문서