AI Queue Worker
외부 AI API를 호출해서 대량의 데이터를 처리해야 하는 상황을 생각해보자. 예를 들어 수백 개의 글을 AI로 요약하거나 태깅하는 작업이다. 가장 단순한 접근은 for 루프로 하나씩 API를 호출하는 것이다.
for (const item of items) {
const result = await aiClient.process(item);
await saveResult(result);
}
이 방식은 몇 가지 심각한 문제가 있다.
- Rate Limit: 대부분의 AI API는 분당 요청 수 제한이 있다. 빠르게 연속 호출하면 429 에러가 터진다.
- 일시적 장애: 네트워크 타임아웃, 서버 과부하(503) 같은 일시적 에러가 발생하면 전체 루프가 중단된다.
- 재시도 관리: 실패한 항목만 골라서 재시도하는 로직이 없다.
- 스케일: 처리할 데이터가 쌓이는 속도가 처리 속도보다 빠르면 밀린다.
이런 문제들을 해결하려면 큐 기반 워커 패턴이 필요하다.
큐 기반 워커의 핵심 아이디어
AI 요청을 바로 처리하지 않고, Redis 같은 큐에 작업 항목을 넣어둔다. 별도의 워커 프로세스가 큐에서 항목을 꺼내서 처리한다. 실패하면 다시 큐에 넣는다.
[데이터 수집] → [Redis Queue] → [AI Worker] → [결과 저장]
↑ |
└── 실패 시 재삽입 ──┘
이 구조의 장점:
- 속도 제어: 워커가 한 번에 꺼내는 항목 수를 제한해서 rate limit을 준수할 수 있다.
- 내결함성: 특정 항목이 실패해도 다른 항목 처리에 영향을 주지 않는다.
- 비동기 분리: 데이터를 수집하는 쪽과 AI 처리하는 쪽이 독립적으로 동작한다.
추상 큐 워커 설계
여러 종류의 큐 작업(AI 요약, 태깅, 이미지 분석 등)이 있을 수 있으니, 공통 흐름을 추상 클래스로 정의한다.
[데이터 수집] → [Redis Queue] → [AI Worker] → [결과 저장]
↑ |
└── 실패 시 재삽입 ──┘
이 구조가 Template Method 패턴이다. start() 메서드가 전체 흐름(시작 로깅 → 큐 처리 → 에러 핸들링 → 종료 로깅)을 고정하고, 실제 처리 로직은 하위 클래스에 위임한다.
각 추상 메서드의 역할:
| 메서드 | 역할 |
|---|---|
processQueue() | 큐에서 항목을 꺼내서 처리하는 메인 루프 |
getQueueKey() | Redis 큐의 키 이름 반환 |
parseQueueMessage() | 큐에서 꺼낸 문자열을 타입 T로 파싱 |
processItem() | 개별 항목 처리 (API 호출 + 결과 저장) |
handleFailure() | 실패 시 재시도/포기 판단 |
AI 워커 구현
추상 클래스를 상속받아 AI 요약 워커를 구현한다.
abstract class AbstractQueueWorker<T> {
protected readonly nameTag: string;
protected readonly redis: RedisClient;
constructor(nameTag: string, redis: RedisClient) {
this.nameTag = nameTag;
this.redis = redis;
}
async start(): Promise<void> {
console.log(`[${this.nameTag}] 작업 시작`);
const startTime = Date.now();
try {
await this.processQueue();
} catch (error) {
console.error(`[${this.nameTag}] 처리 중 오류: ${error.message}`);
}
const elapsed = (Date.now() - startTime) / 1000;
console.log(`[${this.nameTag}] 완료 (${elapsed}s)`);
}
// 하위 클래스가 구현할 메서드들
protected abstract processQueue(): Promise<void>;
protected abstract getQueueKey(): string;
protected abstract parseQueueMessage(message: string): T;
protected abstract processItem(item: T): Promise<void>;
protected abstract handleFailure(item: T, error: Error): Promise<void>;
}
Rate Limit 대응: 배치 로딩
큐에 100개가 쌓여 있어도, 한 번에 전부 꺼내면 API rate limit에 걸린다. rpop을 N번만 실행해서 한 사이클에 처리할 양을 제한한다.
interface AIQueueItem {
id: number;
content: string;
summary?: string;
tagList?: string[];
deathCount: number; // 실패 횟수 추적
}
class AIWorker extends AbstractQueueWorker<AIQueueItem> {
private readonly aiClient: AIClient;
constructor(redis: RedisClient, aiClient: AIClient) {
super('[AI Service]', redis);
this.aiClient = aiClient;
}
protected async processQueue(): Promise<void> {
const items = await this.loadFromQueue();
await Promise.all(items.map((item) => this.processItem(item)));
}
protected getQueueKey(): string {
return 'feed:ai:queue';
}
protected parseQueueMessage(message: string): AIQueueItem {
return JSON.parse(message);
}
protected async processItem(item: AIQueueItem): Promise<void> {
try {
const result = await this.requestAI(item);
await this.saveResult(result);
} catch (error) {
await this.handleFailure(item, error);
}
}
// ... (아래에서 계속)
}
여기서 Redis Pipeline을 사용하는 이유는, rpop을 N번 개별 호출하면 네트워크 라운드트립이 N번 발생하지만, Pipeline으로 묶으면 1번의 라운드트립으로 처리할 수 있기 때문이다.
BATCH_SIZE를 환경 변수로 관리하면, API 제공자의 rate limit이 바뀌었을 때 코드 수정 없이 조절할 수 있다.
에러 분류와 재시도 전략
AI API 호출에서 발생하는 에러는 크게 두 종류로 나뉜다.
일시적 에러 (재시도 가능)
- 429 Too Many Requests: Rate limit 초과. 시간이 지나면 해결된다.
- 503 Service Unavailable: 서버 일시 과부하.
- Timeout: 네트워크 지연.
영구적 에러 (재시도 불가)
- 401 Unauthorized: API 키가 잘못됨. 몇 번을 재시도해도 똑같다.
- JSON Parse Error: AI 응답이 잘못된 형식. 같은 입력이면 같은 결과.
- 400 Bad Request: 요청 자체가 잘못됨.
이 구분이 중요한 이유는, 영구적 에러를 재시도하면 큐에 무한히 돌면서 리소스만 낭비하기 때문이다.
private async loadFromQueue(): Promise<AIQueueItem[]> {
const BATCH_SIZE = parseInt(process.env.AI_RATE_LIMIT_COUNT);
const results = await this.redis.executePipeline((pipeline) => {
for (let i = 0; i < BATCH_SIZE; i++) {
pipeline.rpop(this.getQueueKey());
}
});
return results
.map(([err, value]) => JSON.parse(value as string))
.filter((item) => item !== null);
}
기본값을 true로 둔 이유는, 예상치 못한 에러가 일시적일 가능성이 더 높기 때문이다. 네트워크 관련 에러의 대부분은 재시도하면 해결된다.
Death Count 기반 재시도 제한
재시도 가능한 에러라도 무한히 재시도하면 안 된다. 서버가 장시간 다운되어 있으면 큐에서 빠져나오지 못하는 항목이 생긴다. deathCount 필드로 실패 횟수를 추적하고, 임계값을 초과하면 포기한다.
private isRetryableError(error: Error): boolean {
const message = error.message.toLowerCase();
// 영구적 에러 → 재시도 안 함
if (message.includes('invalid') || message.includes('401')) return false;
if (message.includes('json') || message.includes('parse')) return false;
// 일시적 에러 → 재시도
if (message.includes('rate limit') || message.includes('429')) return true;
if (message.includes('timeout') || message.includes('503')) return true;
// 알 수 없는 에러는 일단 재시도 (보수적 접근)
return true;
}
이 흐름을 다이어그램으로 보면:
에러 발생
│
├─ 영구적 에러? ──→ 즉시 포기 → 기본값 저장
│
└─ 일시적 에러?
│
├─ deathCount < 3? ──→ deathCount++ → 큐에 재삽입
│
└─ deathCount >= 3? ──→ 포기 → 기본값 저장
deathCount를 항목 자체에 포함시키는 것이 핵심이다. 별도의 상태 저장소 없이 큐 메시지 자체가 재시도 상태를 들고 다닌다. 큐에서 꺼내고 다시 넣을 때 deathCount가 증가된 상태로 들어가니까, 다음 사이클에서 꺼냈을 때 이전 실패 이력을 알 수 있다.
AI 요청과 결과 저장
실제 AI API 호출 부분이다.
protected async handleFailure(
item: AIQueueItem,
error: Error,
): Promise<void> {
const shouldRetry = this.isRetryableError(error);
console.error(
`[${this.nameTag}] ${item.id} 처리 실패:`,
`에러: ${error.name} - ${error.message}`,
`재시도 가능: ${shouldRetry}`,
`현재 deathCount: ${item.deathCount}`,
);
if (shouldRetry && item.deathCount < 3) {
// 재시도: deathCount 증가시키고 큐 뒤에 다시 삽입
item.deathCount++;
await this.redis.rpush(this.getQueueKey(), [JSON.stringify(item)]);
console.warn(`[${this.nameTag}] ${item.id} 재시도 예약 (${item.deathCount}/3)`);
} else {
// 영구 실패: 기본값으로 저장하고 넘어감
const reason = shouldRetry
? 'Death Count 3회 초과'
: `재시도 불가능한 에러 (${error.name})`;
console.error(`[${this.nameTag}] ${item.id} 영구 실패 - ${reason}`);
await this.saveDefaultResult(item.id);
}
}
AI 응답을 JSON으로 파싱하는 부분에서 주의할 점:
- AI 모델의 응답에는 줄바꿈, 탭 등이 포함될 수 있어서
replace로 정리한 뒤 파싱한다. - 파싱 실패 시
isRetryableError에서false를 반환하므로 재시도하지 않는다. AI 모델이 같은 입력에 대해 다른 형식으로 응답할 가능성은 있지만, 보통 같은 잘못된 형식을 반복하므로 재시도 효용이 낮다.
결과 저장은 DB와 Redis 양쪽에 한다.
에러 발생
│
├─ 영구적 에러? ──→ 즉시 포기 → 기본값 저장
│
└─ 일시적 에러?
│
├─ deathCount < 3? ──→ deathCount++ → 큐에 재삽입
│
└─ deathCount >= 3? ──→ 포기 → 기본값 저장
Redis와 DB에 이중 저장하는 이유는, 최신 피드는 Redis에서 빠르게 읽고, 오래된 데이터는 DB에서 조회하기 위함이다.
워커 실행 스케줄링
큐 워커는 한 번 실행하고 끝나는 게 아니라, 주기적으로 실행되어야 한다. 큐에 새 항목이 들어올 때마다 처리해야 하기 때문이다.
private async requestAI(item: AIQueueItem): Promise<AIQueueItem> {
const params = {
max_tokens: 8192,
system: SYSTEM_PROMPT,
messages: [{ role: 'user', content: item.content }],
model: 'claude-3-5-haiku-latest',
};
const response = await this.aiClient.messages.create(params);
const text = response.content[0].text.replace(/[\n\r\t\s]+/g, ' ');
const parsed = JSON.parse(text);
item.summary = parsed.summary;
item.tagList = Object.keys(parsed.tags);
return item;
}
start()가 호출되면 현재 큐에 있는 항목을 배치 크기만큼 꺼내서 처리하고 종료한다. 큐에 남은 항목이 있으면 다음 스케줄에서 처리된다.
이 방식의 장점은:
- 자연스러운 rate limit 준수: 1분 간격으로 N개씩 처리하면 분당 N개로 자동 제한된다.
- 에러 격리: 한 사이클에서 에러가 나도 다음 사이클은 정상 실행된다.
- 모니터링 용이: 각 사이클의 실행 시간과 처리량을 로깅할 수 있다.
실무에서 고려할 점
Backpressure
큐에 항목이 쌓이는 속도가 처리 속도보다 빠르면 큐가 계속 커진다. 이 상황을 감지하려면 큐 길이를 모니터링해야 한다.
private async saveResult(item: AIQueueItem): Promise<void> {
// DB에 태그 관계 저장
await this.tagRepository.insertTags(item.id, item.tagList);
// Redis 캐시에 태그 저장 (빠른 조회용)
await this.redis.hset(`feed:recent:${item.id}`, 'tag', item.tagList.join(','));
// DB에 요약 저장
await this.feedRepository.updateSummary(item.id, item.summary);
}
Poison Message
어떤 항목이 파싱 자체가 안 되면 (parseQueueMessage에서 에러), deathCount를 증가시킬 수도 없다. 이런 경우를 대비해서 파싱 실패한 메시지는 별도의 Dead Letter Queue에 넣어서 나중에 수동으로 확인할 수 있게 하는 게 좋다.
import * as schedule from 'node-schedule';
// 매 분마다 워커 실행
schedule.scheduleJob('*/1 * * * *', async () => {
await aiWorker.start();
});
멱등성
같은 항목이 두 번 처리될 수 있다. 워커가 처리 중에 크래시되면 결과 저장 전에 큐에서는 이미 빠져나간 상태다. 이걸 완벽하게 막으려면 트랜잭션이나 중복 체크가 필요하지만, AI 요약처럼 결과가 덮어써져도 문제없는 경우에는 크게 신경 쓰지 않아도 된다.
동시성 제어
Promise.all로 배치 내 항목을 병렬 처리하고 있는데, 이러면 배치 크기가 곧 동시 요청 수가 된다. API의 동시 요청 제한이 있다면 Promise.all 대신 p-limit 같은 라이브러리로 동시성을 제어해야 한다.
const queueLength = await redis.llen('feed:ai:queue');
if (queueLength > THRESHOLD) {
// 알림 발송 또는 배치 크기 증가
}
핵심 정리
| 개념 | 설명 |
|---|---|
| 큐 기반 비동기 처리 | 즉시 처리 대신 큐에 넣고 워커가 꺼내서 처리 |
| 추상 큐 워커 | Template Method 패턴으로 공통 흐름 정의 |
| 배치 로딩 | Pipeline으로 N개씩 꺼내서 rate limit 준수 |
| 에러 분류 | 일시적/영구적 에러를 구분해서 재시도 여부 결정 |
| Death Count | 항목 자체에 실패 횟수를 포함시켜 재시도 제한 |
| 이중 저장 | Redis(캐시) + DB(영속) 병행 |
큐 기반 워커 패턴은 AI API뿐 아니라 이메일 발송, 이미지 처리, 외부 API 연동 등 rate limit이 있거나 실패 가능성이 높은 모든 비동기 작업에 적용할 수 있다. 핵심은 "실패를 예상하고, 실패에서 복구하는 구조를 미리 설계한다"는 것이다.
관련 문서
- Anthropic Claude API - Claude SDK 초기화, 호출, 응답 파싱
- Redis Pipeline - 다중 커맨드 배치 실행으로 네트워크 라운드트립 절감
- Producer-Consumer 패턴 - 큐 기반 비동기 처리의 기본 구조