junyeokk
Blog
Architecture·2025. 10. 20

RabbitMQ

서버 애플리케이션에서 이메일 발송, 이미지 처리, 외부 API 호출 같은 작업은 시간이 오래 걸린다. 이런 작업을 HTTP 요청 핸들러 안에서 동기적으로 처리하면 응답이 늦어지고, 그 사이에 외부 서비스가 장애를 일으키면 요청 자체가 실패한다.

[클라이언트] → [서버: 이메일 발송 3초] → 응답 ↑ 이 3초 동안 클라이언트는 대기

이 문제를 해결하는 가장 일반적인 방법이 메시지 큐다. 서버는 "이메일 보내줘"라는 메시지만 큐에 넣고 즉시 응답한다. 별도의 워커 프로세스가 큐에서 메시지를 꺼내서 실제 작업을 처리한다.

[클라이언트] → [서버: 큐에 메시지 전송] → 즉시 응답 ↓ [메시지 큐 (RabbitMQ)] ↓ [워커: 이메일 실제 발송]

RabbitMQ는 이런 메시지 큐를 구현하는 오픈소스 메시지 브로커다. AMQP(Advanced Message Queuing Protocol)를 기반으로 동작하며, Erlang으로 작성되어 높은 안정성과 동시성을 제공한다.


핵심 개념: Exchange, Queue, Binding

RabbitMQ에서 메시지는 프로듀서 → Exchange → Queue → 컨슈머 순서로 흐른다. 여기서 중요한 점은 프로듀서가 큐에 직접 메시지를 보내지 않는다는 것이다. 항상 Exchange를 거친다.

[Producer] → [Exchange] --routing--> [Queue] → [Consumer]

Exchange

Exchange는 메시지를 받아서 적절한 큐로 라우팅하는 역할을 한다. 우체국의 분류 센터와 비슷하다. 메시지를 받으면 라우팅 규칙에 따라 하나 이상의 큐로 전달한다.

Exchange에는 네 가지 타입이 있다:

Direct Exchange — 라우팅 키가 정확히 일치하는 큐에만 메시지를 전달한다. 가장 단순하고 많이 쓰이는 타입이다.

Exchange(direct) --routing_key="email.send"--> Queue(email.send.queue) --routing_key="crawl.full"--> Queue(crawl.full.queue)

Topic Exchange — 라우팅 키에 와일드카드 패턴 매칭을 지원한다. *는 단어 하나, #는 0개 이상의 단어를 매칭한다.

Exchange(topic) --"order.*.notification"--> Queue A (order.create.notification ✅) --"order.#"---------------> Queue B (order.create.notification ✅, order.cancel ✅)

Fanout Exchange — 라우팅 키를 무시하고 바인딩된 모든 큐에 메시지를 브로드캐스트한다. 같은 이벤트를 여러 서비스가 동시에 처리해야 할 때 유용하다.

Headers Exchange — 라우팅 키 대신 메시지 헤더 속성으로 라우팅한다. 잘 쓰이지 않는다.

Queue

메시지가 실제로 저장되는 버퍼다. 컨슈머가 메시지를 꺼내갈 때까지 큐에 보관된다. FIFO(First In, First Out) 순서로 메시지가 처리된다.

Binding

Exchange와 Queue를 연결하는 규칙이다. "이 Exchange에서 이 라우팅 키를 가진 메시지는 이 Queue로 보내라"는 매핑을 정의한다.

javascript
[클라이언트] → [서버: 이메일 발송 3초] → 응답
                  ↑ 이 3초 동안 클라이언트는 대기

Node.js에서 사용하기: amqplib

Node.js에서 RabbitMQ를 사용하려면 amqplib 패키지를 사용한다.

bash
[클라이언트] → [서버: 큐에 메시지 전송] → 즉시 응답

               [메시지 큐 (RabbitMQ)]

               [워커: 이메일 실제 발송]

연결과 채널

RabbitMQ와 통신하려면 먼저 Connection을 생성하고, 그 위에 Channel을 생성한다. Connection은 TCP 연결이고, Channel은 그 위에서 동작하는 가상의 경량 연결이다.

typescript
[Producer] → [Exchange] --routing--> [Queue] → [Consumer]

왜 Connection과 Channel을 분리했을까? TCP 연결을 생성하는 건 비용이 크다. 하나의 Connection 위에 여러 Channel을 만들면 TCP 연결 하나로 여러 작업을 병렬 처리할 수 있다. 채널은 스레드 세이프하지 않으므로 각 작업마다 별도 채널을 사용하거나, 하나의 채널을 순차적으로 사용해야 한다.

실제 프로젝트에서는 채널을 매번 새로 만들지 않고, 한 번 생성한 채널을 재사용하는 매니저 클래스를 만드는 게 일반적이다:

typescript
Exchange(direct) --routing_key="email.send"--> Queue(email.send.queue)
                 --routing_key="crawl.full"--> Queue(crawl.full.queue)

channelPromise를 캐싱하는 이유는 동시성 문제를 방지하기 위해서다. 여러 곳에서 동시에 getChannel()을 호출하면 채널이 여러 개 생성될 수 있는데, Promise를 캐싱해두면 첫 번째 호출이 완료될 때까지 나머지는 같은 Promise를 기다린다.

Exchange, Queue, Binding 선언

채널을 얻었으면 Exchange와 Queue를 선언하고 바인딩한다:

typescript
Exchange(topic) --"order.*.notification"--> Queue A  (order.create.notification ✅)
                --"order.#"---------------> Queue B  (order.create.notification ✅, order.cancel ✅)

durable: true는 RabbitMQ 서버가 재시작되어도 Exchange와 Queue가 사라지지 않도록 한다. 운영 환경에서는 거의 항상 true로 설정한다.

메시지 발행 (Publish)

Exchange에 메시지를 발행하려면 publish() 메서드를 사용한다:

typescript
// Exchange 'EmailExchange'에서 라우팅 키 'email.send'인 메시지를
// 'email.send.queue'로 보내라
channel.bindQueue('email.send.queue', 'EmailExchange', 'email.send');

메시지는 반드시 Buffer로 변환해서 보내야 한다. 보통 JSON 문자열을 Buffer.from()으로 변환한다.

특정 큐에 직접 메시지를 보내려면 sendToQueue()를 사용할 수도 있다:

typescript
npm install amqplib

메시지 소비 (Consume)

큐에서 메시지를 소비하려면 consume() 메서드를 사용한다:

typescript
import amqplib, { Channel, ChannelModel } from 'amqplib';

// 1. 연결 생성
const connection: ChannelModel = await amqplib.connect('amqp://localhost:5672');

// 2. 채널 생성
const channel: Channel = await connection.createChannel();

여기서 가장 중요한 개념이 ack/nack 메커니즘이다.


ack / nack: 메시지 확인 메커니즘

RabbitMQ는 기본적으로 메시지를 큐에서 꺼내더라도 바로 삭제하지 않는다. 컨슈머가 "이 메시지 처리 완료했어"라고 명시적으로 알려줘야(ack) 비로소 큐에서 제거한다.

typescript
class MessageQueueManager {
  private channel: Channel | null = null;
  private channelPromise: Promise<Channel> | null = null;

  constructor(private readonly connection: ChannelModel) {}

  async getChannel(): Promise<Channel> {
    if (this.channel) return this.channel;
    if (this.channelPromise !== null) return this.channelPromise;

    // 동시에 여러 곳에서 getChannel()을 호출해도
    // 채널이 하나만 생성되도록 Promise를 캐싱
    this.channelPromise = this.connection.createChannel();
    this.channel = await this.channelPromise;
    this.channelPromise = null;

    return this.channel;
  }

  async close() {
    if (this.channel) {
      await this.channel.close();
      this.channel = null;
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

nack의 두 번째 인자 allUpTo이 true면 이 메시지까지의 모든 미확인 메시지를 한꺼번에 nack한다. 세 번째 인자 requeue가 true면 메시지를 다시 큐에 넣고, false면 버린다 (Dead Letter Exchange가 설정되어 있으면 그쪽으로 이동).

이 메커니즘이 왜 중요할까? 만약 컨슈머가 메시지를 처리하다가 크래시되면 어떻게 될까? ack를 보내지 않았으므로 RabbitMQ는 메시지가 아직 처리되지 않았다고 판단하고, 다른 컨슈머에게 재전달한다. 이렇게 메시지 유실을 방지할 수 있다.

1. Consumer가 메시지 수신 2. 처리 중 크래시 발생 (ack 없음) 3. RabbitMQ가 연결 끊김 감지 4. 메시지를 다시 큐에 넣음 (또는 다른 Consumer에게 전달)

Auto-ack 모드

수동 ack 대신 자동 ack 모드를 사용할 수도 있다:

typescript
const channel = await manager.getChannel();

// Exchange 선언
await channel.assertExchange('EmailExchange', 'direct', {
  durable: true,  // RabbitMQ 재시작 후에도 Exchange 유지
});

// Queue 선언
await channel.assertQueue('email.send.queue', {
  durable: true,      // RabbitMQ 재시작 후에도 Queue 유지
  deadLetterExchange: 'DeadLetterExchange',     // 실패한 메시지를 보낼 Exchange
  deadLetterRoutingKey: 'email.deadLetter',      // Dead Letter 라우팅 키
});

// Binding: Exchange와 Queue 연결
await channel.bindQueue('email.send.queue', 'EmailExchange', 'email.send');

이 모드에서는 메시지를 전달하는 즉시 큐에서 제거한다. 처리 실패 시 메시지가 유실되므로, 메시지 유실이 허용되는 경우에만 사용한다 (로그 수집 등).


Dead Letter Exchange (DLX)

메시지 처리에 실패했을 때 그냥 버리면 문제를 추적하기 어렵다. Dead Letter Exchange는 실패한 메시지를 별도의 큐로 보내서 나중에 확인하거나 재처리할 수 있게 해주는 메커니즘이다.

메시지가 Dead Letter가 되는 조건:

  1. 컨슈머가 nack(message, false, false)로 거부 (requeue=false)
  2. 메시지 TTL(Time-To-Live) 만료
  3. 큐의 최대 길이 초과
typescript
async sendMessage(exchange: string, routingKey: string, message: string) {
  const channel = await this.manager.getChannel();
  channel.publish(exchange, routingKey, Buffer.from(message));
}

// 사용 예시
await service.sendMessage(
  'EmailExchange',
  'email.send',
  JSON.stringify({ to: 'user@example.com', subject: '가입 환영', body: '...' })
);

이렇게 설정하면 email.send.queue에서 nack된 메시지는 자동으로 email.deadLetter.queue로 이동한다. 관리자가 나중에 Dead Letter Queue를 확인하고 문제를 분석하거나, 별도의 워커가 Dead Letter Queue를 소비해서 재시도 로직을 구현할 수 있다.


재시도 패턴

실패한 메시지를 무조건 버리지 않고, 몇 번은 재시도하고 싶을 때가 있다. 메시지 헤더에 재시도 횟수를 저장하면 간단하게 구현할 수 있다:

typescript
channel.sendToQueue('email.send.queue', Buffer.from(message), {
  persistent: true,  // 메시지를 디스크에 저장 (RabbitMQ 재시작 시에도 유지)
});

프로듀서 쪽에서 메시지를 다시 보낼 때 x-retry-count 헤더를 증가시키면, 컨슈머는 재시도 횟수를 확인해서 최대 재시도 횟수를 초과하면 영구 실패로 처리할 수 있다:

typescript
async consumeMessage<T>(
  queue: string,
  onMessage: (payload: T) => void | Promise<void>,
) {
  const channel = await this.manager.getChannel();

  const { consumerTag } = await channel.consume(queue, (message) => {
    void (async () => {
      try {
        if (!message) return;

        const parsedMessage = JSON.parse(message.content.toString()) as T;
        await onMessage(parsedMessage);

        // 처리 완료 → ack
        channel.ack(message);
      } catch (error) {
        console.error('메시지 처리 중 오류:', error);
        // 처리 실패 → nack
        channel.nack(message, false, false);
      }
    })();
  });

  return consumerTag;
}

Graceful Shutdown

워커 프로세스가 종료될 때 현재 처리 중인 메시지가 있을 수 있다. 갑자기 종료하면 처리 중이던 메시지가 ack도 nack도 되지 않은 상태로 남는다.

Graceful shutdown은 종료 신호를 받으면 새 메시지 수신을 중단하고, 현재 처리 중인 메시지가 완료될 때까지 기다린 후 종료하는 패턴이다:

typescript
// 처리 성공 → 큐에서 메시지 제거
channel.ack(message);

// 처리 실패 → 큐에서 메시지 제거 (Dead Letter Queue로 이동)
channel.nack(message, false, false);
//                     ↑      ↑
//                allUpTo   requeue

처리 중인 메시지가 있는 상태에서 종료 요청이 오면, 해당 메시지를 nack(message, false, true)로 큐에 다시 넣어주는 방법도 있다:

typescript
1. Consumer가 메시지 수신
2. 처리 중 크래시 발생 (ack 없음)
3. RabbitMQ가 연결 끊김 감지
4. 메시지를 다시 큐에 넣음 (또는 다른 Consumer에게 전달)

Prefetch (QoS)

기본적으로 RabbitMQ는 컨슈머에게 메시지를 한꺼번에 밀어넣는다. 컨슈머가 처리 속도가 느리면 메모리가 넘칠 수 있다. prefetch를 설정하면 한 번에 처리할 수 있는 미확인 메시지 수를 제한할 수 있다:

typescript
channel.consume(queue, callback, { noAck: true });

이렇게 하면 컨슈머가 ack를 보내기 전까지 새로운 메시지를 받지 않는다. 외부 API 호출처럼 rate limit이 있는 작업에서 특히 유용하다.


RabbitMQ definitions.json

Exchange, Queue, Binding 설정을 코드가 아닌 JSON 파일로 선언적으로 관리할 수도 있다. RabbitMQ Management Plugin이 이 파일을 읽어서 자동으로 설정을 적용한다:

json
// Dead Letter Exchange 선언
await channel.assertExchange('DeadLetterExchange', 'direct', { durable: true });

// Dead Letter Queue 선언
await channel.assertQueue('email.deadLetter.queue', { durable: true });

// 바인딩
await channel.bindQueue(
  'email.deadLetter.queue',
  'DeadLetterExchange',
  'email.deadLetter'
);

// 원래 큐에 DLX 설정
await channel.assertQueue('email.send.queue', {
  durable: true,
  deadLetterExchange: 'DeadLetterExchange',
  deadLetterRoutingKey: 'email.deadLetter',
});

Docker Compose에서 이 파일을 마운트하면 컨테이너 시작 시 자동으로 적용된다:

yaml
async consumeMessage<T>(
  queue: string,
  onMessage: (payload: T, retryCount: number) => void | Promise<void>,
) {
  const channel = await this.manager.getChannel();

  await channel.consume(queue, (message) => {
    void (async () => {
      try {
        if (!message) return;

        const parsedMessage = JSON.parse(message.content.toString()) as T;
        const retryCount = message.properties.headers?.['x-retry-count'] || 0;
        await onMessage(parsedMessage, retryCount);

        channel.ack(message);
      } catch (error) {
        channel.nack(message, false, false);
      }
    })();
  });
}

이 방식의 장점은 인프라 설정을 코드와 분리할 수 있다는 것이다. 코드에서는 assertExchangeassertQueue를 호출할 필요 없이 바로 publish/consume만 하면 된다.


Redis Pub/Sub vs RabbitMQ

간단한 실시간 알림이라면 Redis Pub/Sub으로 충분하다. 하지만 메시지의 신뢰성이 중요한 경우에는 RabbitMQ가 적합하다.

비교 항목Redis Pub/SubRabbitMQ
메시지 지속성없음 (수신자 없으면 유실)있음 (디스크 저장 가능)
ack/nack없음있음
라우팅채널 이름 매칭만Exchange 타입별 다양한 라우팅
Dead Letter없음내장 지원
사용 사례실시간 알림, 캐시 무효화작업 큐, 이메일 발송, 비동기 처리

핵심 차이는 메시지 보장이다. Redis Pub/Sub은 구독자가 없으면 메시지가 사라진다. RabbitMQ는 큐에 저장해두고 컨슈머가 처리할 때까지 보관한다.


정리

  • Exchange→Queue→Consumer 흐름에서 Exchange 타입(Direct/Topic/Fanout)이 라우팅 전략을 결정하고, ack/nack이 메시지 유실을 방지한다
  • DLX로 실패 메시지를 격리하고, 헤더 기반 재시도 횟수 추적으로 영구 실패와 일시 실패를 구분한다
  • prefetch로 컨슈머 과부하를 막고, definitions.json으로 인프라 설정을 코드와 분리할 수 있다

관련 문서