junyeokk
Blog
Architecture·2025. 09. 03

Abstract Queue Worker 패턴

백그라운드에서 큐에 쌓인 작업을 하나씩 꺼내서 처리하는 워커를 만들어야 할 때, 처음에는 그냥 하나의 함수나 클래스로 작성하게 된다. AI 요약 요청을 처리하는 워커, 이메일을 발송하는 워커, 데이터를 크롤링하는 워커 — 각각 만들다 보면 금방 패턴이 보인다. 큐에서 메시지를 꺼내고, 파싱하고, 처리하고, 실패하면 재시도하거나 포기하는 흐름이 거의 동일하다는 것이다.

이 반복되는 뼈대를 추상 클래스로 뽑아내고, 각 워커는 "자기만의 로직"만 구현하게 만드는 것이 Abstract Queue Worker 패턴이다. GoF의 Template Method 패턴을 큐 처리에 특화시킨 형태라고 볼 수 있다.


왜 필요한가 — 중복의 문제

큐 워커를 여러 개 만들 때 중복되는 코드를 정리해보면 대략 이런 구조가 나온다.

typescript
// AI 요약 워커
class AiSummaryWorker {
  async start() {
    console.log("AI 워커 시작");
    const startTime = Date.now();
    try {
      await this.processQueue();
    } catch (error) {
      console.error("처리 중 오류:", error.message);
    }
    console.log(`실행 시간: ${(Date.now() - startTime) / 1000}s`);
    console.log("AI 워커 완료");
  }

  async processQueue() { /* AI 전용 로직 */ }
  async handleFailure(item, error) { /* 재시도 로직 */ }
}

// 크롤링 워커
class CrawlWorker {
  async start() {
    console.log("크롤러 시작");     // 똑같은 코드
    const startTime = Date.now();   // 똑같은 코드
    try {
      await this.processQueue();
    } catch (error) {
      console.error("처리 중 오류:", error.message);
    }
    console.log(`실행 시간: ${(Date.now() - startTime) / 1000}s`);
    console.log("크롤러 완료");
  }

  async processQueue() { /* 크롤링 전용 로직 */ }
  async handleFailure(item, error) { /* 재시도 로직 */ }
}

start() 메서드가 완전히 동일하다. 로깅, 타이밍 측정, 에러 캐치 — 모든 워커에서 반복된다. 워커가 3개, 5개, 10개로 늘어나면 이 코드가 계속 복사된다. 로깅 포맷을 바꾸려면 모든 워커를 수정해야 한다.


추상 클래스로 뼈대 잡기

공통 흐름을 추상 클래스에 한 번만 정의하고, 각 워커는 달라지는 부분만 구현한다.

typescript
abstract class AbstractQueueWorker<T> {
  protected readonly nameTag: string;
  protected readonly redis: RedisClient;

  constructor(nameTag: string, redis: RedisClient) {
    this.nameTag = nameTag;
    this.redis = redis;
  }

  async start(): Promise<void> {
    console.log(`========== ${this.nameTag} 작업 시작 ==========`);
    const startTime = Date.now();

    try {
      await this.processQueue();
    } catch (error) {
      console.error(`${this.nameTag} 처리 중 오류: ${error.message}`);
    }

    const elapsed = (Date.now() - startTime) / 1000;
    console.log(`${this.nameTag} 실행 시간: ${elapsed}s`);
    console.log(`========== ${this.nameTag} 작업 완료 ==========`);
  }

  // 하위 클래스가 반드시 구현해야 하는 메서드들
  protected abstract processQueue(): Promise<void>;
  protected abstract getQueueKey(): string;
  protected abstract parseQueueMessage(message: string): T;
  protected abstract processItem(item: T): Promise<void>;
  protected abstract handleFailure(item: T, error: Error): Promise<void>;
}

핵심 설계 포인트

제네릭 타입 <T>: 각 워커가 처리하는 메시지의 타입이 다르다. AI 워커는 { id, content, summary } 형태일 수 있고, 크롤링 워커는 { rssId, url } 형태일 수 있다. 제네릭으로 선언해두면 하위 클래스에서 구체적인 타입을 지정할 수 있고, processItem이나 handleFailure에서 타입 안전하게 작업할 수 있다.

start()는 구현 완료: 이 메서드는 abstract가 아니다. 공통 흐름(로깅, 타이밍, 에러 캐치)이 여기에 고정되어 있고, 하위 클래스는 이것을 건드리지 않는다. Template Method 패턴의 핵심이다.

5개의 추상 메서드: 각각의 역할이 명확하게 분리되어 있다.

메서드역할
processQueue()큐에서 메시지를 꺼내는 전체 흐름
getQueueKey()Redis 큐의 키 이름 반환
parseQueueMessage()문자열 메시지를 타입 T로 변환
processItem()개별 아이템 처리 로직
handleFailure()실패 시 재시도/포기 결정

구체 워커 구현

AI 요약 워커

typescript
interface AiQueueItem {
  id: number;
  content: string;
  summary?: string;
  tags?: string[];
  deathCount: number;
}

class AiSummaryWorker extends AbstractQueueWorker<AiQueueItem> {
  private readonly aiClient: AiClient;

  constructor(redis: RedisClient, aiClient: AiClient) {
    super("[AI Service]", redis);
    this.aiClient = aiClient;
  }

  protected async processQueue(): Promise<void> {
    const items = await this.loadBatch();
    await Promise.all(items.map((item) => this.processItem(item)));
  }

  protected getQueueKey(): string {
    return "queue:ai";
  }

  protected parseQueueMessage(message: string): AiQueueItem {
    return JSON.parse(message);
  }

  protected async processItem(item: AiQueueItem): Promise<void> {
    try {
      const result = await this.aiClient.summarize(item.content);
      item.summary = result.summary;
      item.tags = result.tags;
      await this.saveResult(item);
    } catch (error) {
      await this.handleFailure(item, error);
    }
  }

  protected async handleFailure(
    item: AiQueueItem,
    error: Error
  ): Promise<void> {
    const retryable = this.isRetryableError(error);

    if (retryable && item.deathCount < 3) {
      item.deathCount++;
      await this.redis.rpush(this.getQueueKey(), JSON.stringify(item));
      console.warn(`${this.nameTag} ${item.id} 재시도 (${item.deathCount}/3)`);
    } else {
      console.error(`${this.nameTag} ${item.id} 영구 실패`);
      await this.markAsFailed(item.id);
    }
  }

  private isRetryableError(error: Error): boolean {
    const msg = error.message.toLowerCase();
    if (msg.includes("invalid") || msg.includes("401")) return false;
    if (msg.includes("json") || msg.includes("parse")) return false;
    if (msg.includes("rate limit") || msg.includes("429")) return true;
    if (msg.includes("timeout") || msg.includes("503")) return true;
    return true;
  }

  private async loadBatch(): Promise<AiQueueItem[]> {
    const results = await this.redis.executePipeline((pipeline) => {
      for (let i = 0; i < 5; i++) {
        pipeline.rpop(this.getQueueKey());
      }
    });
    return results
      .map((r) => JSON.parse(r[1] as string))
      .filter(Boolean);
  }

  private async saveResult(item: AiQueueItem): Promise<void> {
    // DB에 요약 저장, Redis 캐시 업데이트 등
  }

  private async markAsFailed(id: number): Promise<void> {
    // 실패 상태로 DB 업데이트
  }
}

주목할 점은 processQueue()의 구현 방식이다. AI 워커는 rate limit 때문에 한 번에 여러 개를 배치로 꺼내서 Promise.all로 병렬 처리한다. 반면 다른 워커는 하나씩 꺼내서 처리할 수도 있다. 이런 전략 차이를 processQueue() 수준에서 자유롭게 결정할 수 있다.

크롤링 워커

typescript
interface CrawlMessage {
  rssId: number;
  deathCount: number;
}

class FullCrawlWorker extends AbstractQueueWorker<CrawlMessage> {
  private readonly crawler: FeedCrawler;
  private readonly repository: RssRepository;

  constructor(
    redis: RedisClient,
    crawler: FeedCrawler,
    repository: RssRepository
  ) {
    super("[Full Crawler]", redis);
    this.crawler = crawler;
    this.repository = repository;
  }

  protected async processQueue(): Promise<void> {
    const message = await this.redis.rpop(this.getQueueKey());
    if (!message) {
      console.log("처리할 크롤링 요청이 없습니다.");
      return;
    }
    const item = this.parseQueueMessage(message);
    await this.processItem(item);
  }

  protected getQueueKey(): string {
    return "queue:full-crawl";
  }

  protected parseQueueMessage(message: string): CrawlMessage {
    return JSON.parse(message);
  }

  protected async processItem(item: CrawlMessage): Promise<void> {
    const rss = await this.repository.findById(item.rssId);
    if (!rss) {
      console.warn(`RSS ID ${item.rssId}를 찾을 수 없습니다.`);
      return;
    }

    try {
      const feeds = await this.crawler.crawlAll(rss);
      console.log(`${feeds.length}개 피드 처리 완료`);
    } catch (error) {
      await this.handleFailure(item, error);
    }
  }

  protected async handleFailure(
    item: CrawlMessage,
    error: Error
  ): Promise<void> {
    if (item.deathCount < 3) {
      item.deathCount++;
      await this.redis.rpush(this.getQueueKey(), JSON.stringify(item));
    } else {
      console.error(`RSS ID ${item.rssId} 영구 실패`);
    }
  }
}

이 워커는 AI 워커와 달리 processQueue()에서 하나만 꺼낸다. 전체 크롤링은 무거운 작업이라 하나씩 처리하는 게 적절하기 때문이다. 같은 추상 클래스를 상속받지만 처리 전략이 완전히 다르다.


패턴의 구조 — Template Method와의 관계

Abstract Queue Worker는 Template Method 패턴의 구체적인 적용이다. Template Method는 알고리즘의 뼈대를 상위 클래스에 정의하고, 일부 단계를 하위 클래스가 재정의하게 하는 패턴이다.

AbstractQueueWorker (추상 클래스) ├── start() ← 고정된 뼈대 (Template Method) │ ├── 로깅 시작 │ ├── processQueue() ← 하위 클래스가 구현 │ ├── 에러 캐치 │ └── 로깅 완료 │ ├── processQueue() ← abstract ├── getQueueKey() ← abstract ├── parseQueueMessage() ← abstract ├── processItem() ← abstract └── handleFailure() ← abstract

start()가 Template Method다. 이 메서드 안에서 processQueue()를 호출하는데, 이 메서드의 실제 구현은 하위 클래스에 위임된다. 상위 클래스는 "큐 처리 작업이 시작-실행-종료되는 흐름"만 알고, "큐에서 어떻게 꺼내고 어떻게 처리하는지"는 모른다.


제네릭의 역할

제네릭 없이 만들면 어떻게 될까?

typescript
AbstractQueueWorker (추상 클래스)
├── start()              ← 고정된 뼈대 (Template Method)
│   ├── 로깅 시작
│   ├── processQueue()   ← 하위 클래스가 구현
│   ├── 에러 캐치
│   └── 로깅 완료

├── processQueue()       ← abstract
├── getQueueKey()        ← abstract
├── parseQueueMessage()  ← abstract
├── processItem()        ← abstract
└── handleFailure()      ← abstract

any를 쓰면 하위 클래스에서 매번 타입 캐스팅을 해야 한다.

typescript
// 제네릭 없는 버전
abstract class AbstractQueueWorker {
  protected abstract processItem(item: any): Promise<void>;
  protected abstract handleFailure(item: any, error: Error): Promise<void>;
}

제네릭을 쓰면 하위 클래스에서 타입을 선언하는 순간 모든 추상 메서드에 자동으로 타입이 적용된다.

typescript
// 매번 이런 코드가 필요
protected async processItem(item: any): Promise<void> {
  const typedItem = item as AiQueueItem;  // 매번 캐스팅
  // ...
}

컴파일 시점에 타입 불일치를 잡아주니까 런타임 에러를 예방할 수 있다.


deathCount 기반 재시도 전략

대부분의 큐 워커에서 공통으로 필요한 것이 재시도 로직이다. 이 패턴에서는 메시지 자체에 deathCount 필드를 넣어서 몇 번 실패했는지 추적한다.

typescript
class AiWorker extends AbstractQueueWorker<AiQueueItem> {
  // processItem의 시그니처가 자동으로
  // processItem(item: AiQueueItem): Promise<void>가 됨
}

처리 흐름은 이렇다.

1. 큐에서 메시지를 꺼냄 (deathCount: 0) 2. 처리 시도 → 실패 3. 에러가 재시도 가능한지 판단 ├── 재시도 불가 (잘못된 데이터, 인증 오류 등) → 즉시 포기 └── 재시도 가능 (타임아웃, rate limit 등) ├── deathCount < 3 → deathCount++ 후 큐에 다시 넣음 └── deathCount >= 3 → 영구 실패 처리

재시도 가능 여부 판단

모든 에러를 재시도하면 안 된다. JSON 파싱 에러나 인증 실패는 몇 번을 재시도해도 성공할 수 없다. 반면 네트워크 타임아웃이나 rate limit은 잠시 후에 다시 시도하면 성공할 가능성이 있다.

typescript
interface QueueMessage {
  // ... 실제 데이터 필드
  deathCount: number;  // 실패 횟수
}

이 판단 로직도 각 워커마다 다를 수 있다. 크롤링 워커는 네트워크 관련 에러에 더 관대하고, AI 워커는 파싱 에러에 엄격할 수 있다. handleFailure()가 추상 메서드인 이유다.


Dead Letter 처리

deathCount가 임계값을 넘으면 "Dead Letter"로 취급한다. 메시지 큐 시스템에서 Dead Letter Queue(DLQ)는 처리할 수 없는 메시지를 별도로 보관하는 큐를 말한다. RabbitMQ나 AWS SQS에는 DLQ가 내장되어 있지만, Redis 기반 구현에서는 직접 만들어야 한다.

typescript
1. 큐에서 메시지를 꺼냄 (deathCount: 0)
2. 처리 시도 → 실패
3. 에러가 재시도 가능한지 판단
   ├── 재시도 불가 (잘못된 데이터, 인증 오류 등) → 즉시 포기
   └── 재시도 가능 (타임아웃, rate limit 등)
       ├── deathCount < 3 → deathCount++ 후 큐에 다시 넣음
       └── deathCount >= 3 → 영구 실패 처리

Dead Letter를 어떻게 처리할지는 서비스 요구사항에 따라 다르다. 그냥 로그만 남기고 무시할 수도 있고, 별도 큐에 보관했다가 운영자가 수동으로 확인할 수도 있고, 알림을 보낼 수도 있다.


DI(의존성 주입)와의 결합

TypeScript에서 DI 컨테이너를 쓴다면 추상 클래스의 하위 워커들을 깔끔하게 등록할 수 있다.

typescript
private isRetryableError(error: Error): boolean {
  const msg = error.message.toLowerCase();

  // 영구적 에러 — 재시도 무의미
  if (msg.includes("invalid")) return false;   // 잘못된 요청
  if (msg.includes("401")) return false;        // 인증 실패
  if (msg.includes("json")) return false;       // 파싱 불가

  // 일시적 에러 — 재시도 의미 있음
  if (msg.includes("rate limit")) return true;  // API 호출 제한
  if (msg.includes("429")) return true;          // Too Many Requests
  if (msg.includes("timeout")) return true;      // 타임아웃
  if (msg.includes("503")) return true;          // 서버 일시 장애

  // 모르는 에러는 일단 재시도
  return true;
}

@injectable()로 DI 컨테이너에 등록하면 의존성이 자동으로 주입된다. 테스트할 때는 mock 객체를 주입하면 되니까 단위 테스트가 쉬워진다.

typescript
protected async handleFailure(item: T, error: Error): Promise<void> {
  if (retryable && item.deathCount < 3) {
    // 재시도
    item.deathCount++;
    await this.redis.rpush(this.getQueueKey(), JSON.stringify(item));
  } else {
    // Dead Letter 처리
    console.error(`${item.id} 영구 실패`);

    // 방법 1: DB에 실패 상태 기록
    await this.markAsFailed(item.id);

    // 방법 2: Dead Letter Queue에 보관 (나중에 수동 재처리)
    await this.redis.rpush("queue:dead-letter", JSON.stringify({
      ...item,
      error: error.message,
      failedAt: new Date().toISOString(),
    }));
  }
}

이 패턴을 쓸 때와 쓰지 말아야 할 때

적합한 경우

  • 큐 워커가 2개 이상이고 처리 흐름이 비슷할 때
  • 공통 로직(로깅, 타이밍, 에러 처리)을 한 곳에서 관리하고 싶을 때
  • 각 워커의 처리 전략(배치/단건, 재시도 정책)이 다를 때

부적합한 경우

  • 워커가 1개뿐일 때 — 추상화가 오히려 복잡성만 추가한다
  • 처리 흐름 자체가 워커마다 완전히 다를 때 — 공통 뼈대가 없으면 의미 없다
  • 이미 Bull, BullMQ 같은 큐 라이브러리를 쓰고 있을 때 — 라이브러리가 이런 추상화를 이미 제공한다

비교: Bull/BullMQ vs 직접 구현

Node.js에서 큐 처리를 할 때 가장 많이 쓰는 라이브러리는 Bull(또는 BullMQ)이다. Bull을 쓰면 재시도, Dead Letter, 동시성 제어, 지연 큐 등이 내장되어 있어서 직접 구현할 필요가 없다.

typescript
import { injectable, inject } from "tsyringe";

@injectable()
class AiSummaryWorker extends AbstractQueueWorker<AiQueueItem> {
  constructor(
    @inject("RedisClient") redis: RedisClient,
    @inject("AiClient") private readonly aiClient: AiClient,
    @inject("FeedRepository") private readonly feedRepo: FeedRepository
  ) {
    super("[AI Service]", redis);
  }

  // ...
}

그렇다면 왜 직접 Abstract Queue Worker를 만들까?

비교 항목Bull/BullMQ직접 구현
재시도내장 (attempts, backoff)직접 구현 (deathCount)
Dead Letter내장직접 구현
의존성bull + ioredisioredis만
유연성라이브러리 API에 맞춰야 함완전한 제어
학습 곡선Bull API 학습 필요팀 내부 코드만 이해하면 됨
적합한 규모중~대규모소~중규모

소규모 프로젝트에서는 Bull의 모든 기능이 필요하지 않고, 간단한 Redis rpush/rpop 기반 큐면 충분할 때가 많다. 이런 경우 Abstract Queue Worker 패턴으로 최소한의 구조만 잡아두면 코드가 깔끔하면서도 가볍다.


정리

Abstract Queue Worker 패턴은 결국 "공통 흐름은 한 곳에, 다른 부분은 각자"라는 단순한 원칙을 큐 처리에 적용한 것이다.

  • 추상 클래스로 실행 흐름(start → processQueue → 에러 처리 → 완료)을 고정
  • 추상 메서드로 큐 키, 메시지 파싱, 처리 로직, 실패 처리를 위임
  • 제네릭으로 메시지 타입 안전성 확보
  • deathCount로 재시도 횟수 추적, 에러 종류에 따라 재시도 여부 결정

워커가 늘어나도 뼈대 코드는 한 곳에만 있으니 유지보수가 쉽고, 새 워커를 추가할 때도 추상 메서드만 구현하면 되니까 빠르게 만들 수 있다.


관련 문서