junyeokk
Blog
database·2025. 03. 05

Redis Pipeline

Redis는 기본적으로 요청-응답(Request-Response) 모델로 동작한다. 클라이언트가 커맨드를 보내면 서버가 처리하고 응답을 돌려주는데, 그 응답을 받아야 다음 커맨드를 보낼 수 있다. 커맨드 하나하나는 빠르지만, 실행할 커맨드가 수십~수백 개일 때는 네트워크 라운드트립(Round Trip Time, RTT)이 쌓이면서 전체 처리 시간이 급격히 늘어난다.

예를 들어 피드 100개의 데이터를 Redis에 각각 저장해야 한다고 하자. 커맨드 하나당 RTT가 1ms라면, 100개를 순차적으로 보내면 최소 100ms가 걸린다. 커맨드 실행 자체는 마이크로초 단위인데, 대부분의 시간을 네트워크 왕복에 쓰는 셈이다.

Pipeline은 이 문제를 해결한다. 여러 커맨드를 한 번에 묶어서 보내고, 응답도 한 번에 받는 방식이다. 각 커맨드의 응답을 기다리지 않고 연속으로 전송하기 때문에 RTT를 1회로 줄일 수 있다.

동작 원리

일반적인 Redis 통신은 이렇게 흘러간다:

Client: HSET key1 field1 value1 Server: OK Client: HSET key2 field2 value2 Server: OK Client: LPUSH list1 item1 Server: 1

매 커맨드마다 서버 응답을 기다린다. Pipeline을 쓰면:

Client: HSET key1 field1 value1 Client: HSET key2 field2 value2 Client: LPUSH list1 item1 Server: [OK, OK, 1]

클라이언트가 커맨드 3개를 연속으로 보내고, 서버가 결과 3개를 한꺼번에 돌려준다. 이게 가능한 이유는 Redis가 TCP 기반이고, 클라이언트 라이브러리가 보내기 버퍼에 커맨드를 쌓아뒀다가 한 번에 flush하기 때문이다.

중요한 점은 Pipeline은 트랜잭션이 아니라는 것이다. 중간에 하나가 실패해도 나머지는 그대로 실행된다. 원자성이 필요하면 MULTI/EXEC(트랜잭션)을 써야 하고, Pipeline 안에 MULTI/EXEC을 넣어서 둘을 조합할 수도 있다.

Node.js에서 ioredis Pipeline 사용

ioredis에서는 pipeline() 메서드로 파이프라인 인스턴스를 만들고, 체이닝으로 커맨드를 쌓은 뒤 exec()으로 실행한다.

typescript
Client: HSET key1 field1 value1
Server: OK
Client: HSET key2 field2 value2
Server: OK
Client: LPUSH list1 item1
Server: 1

exec()의 반환값은 [error, result] 튜플의 배열이다. 각 커맨드의 결과를 순서대로 돌려준다:

typescript
Client: HSET key1 field1 value1
Client: HSET key2 field2 value2
Client: LPUSH list1 item1
Server: [OK, OK, 1]

첫 번째 요소가 null이면 성공, 에러 객체면 해당 커맨드가 실패한 것이다. Pipeline은 트랜잭션이 아니므로 하나가 실패해도 나머지는 정상 실행된다는 걸 다시 강조한다.

유틸리티 메서드로 래핑하기

Pipeline을 매번 직접 만들면 코드가 반복된다. 콜백 패턴으로 래핑하면 깔끔하다:

typescript
import Redis from 'ioredis';

const redis = new Redis();

// pipeline 생성
const pipeline = redis.pipeline();

// 커맨드 쌓기
pipeline.hset('user:1', { name: 'Alice', age: '30' });
pipeline.hset('user:2', { name: 'Bob', age: '25' });
pipeline.lpush('recent-users', '1', '2');
pipeline.expire('user:1', 3600);
pipeline.expire('user:2', 3600);

// 한 번에 실행
const results = await pipeline.exec();

호출하는 쪽에서는 콜백 안에서 커맨드만 쌓으면 된다:

typescript
// results 구조
[
  [null, 'OK'],      // hset user:1
  [null, 'OK'],      // hset user:2
  [null, 2],         // lpush
  [null, 1],         // expire user:1
  [null, 1],         // expire user:2
]
typescript
import Redis, { ChainableCommander } from 'ioredis';

class RedisService {
  private redisClient: Redis;

  constructor() {
    this.redisClient = new Redis();
  }

  async executePipeline(
    commands: (pipeline: ChainableCommander) => void,
  ) {
    const pipeline = this.redisClient.pipeline();
    commands(pipeline);
    const results = await pipeline.exec();
    return results;
  }
}
typescript
// 여러 해시 데이터를 한 번에 저장
await redisService.executePipeline((pipeline) => {
  for (const item of itemList) {
    pipeline.hset(`item:${item.id}`, {
      id: item.id,
      title: item.title,
      createdAt: item.createdAt,
    });
  }
});

이 패턴의 장점은 pipeline 생성과 실행 로직을 한 곳에 캡슐화하고, 사용자는 "어떤 커맨드를 쌓을지"만 신경 쓰면 된다는 것이다. ChainableCommander 타입 덕분에 커맨드 자동완성도 된다.

Pipeline vs 개별 호출 성능 차이

실제로 어느 정도 차이가 나는지 간단한 벤치마크로 확인할 수 있다:

typescript
// 여러 키의 데이터를 한 번에 조회
const results = await redisService.executePipeline((pipeline) => {
  for (const key of keys) {
    pipeline.hgetall(key);
  }
});

// 결과 파싱
const items = results.map(([err, data]) => {
  if (err) throw err;
  return data;
});

로컬 환경에서도 차이가 나지만, Redis 서버가 네트워크를 거쳐야 하는 환경(클라우드 등)에서는 차이가 극적이다. RTT가 1ms인 환경에서 1000개 커맨드를 실행하면:

  • 개별 호출: ~1000ms (1000 × 1ms RTT)
  • Pipeline: ~1ms RTT + 커맨드 처리 시간

공식 문서에서도 Pipeline 사용 시 처리량(throughput)이 5~10배 이상 증가한다고 언급한다.

Pipeline vs MULTI/EXEC (트랜잭션)

Pipeline과 트랜잭션은 다른 목적을 가진다:

PipelineMULTI/EXEC
목적네트워크 RTT 최적화원자적 실행 보장
원자성없음 — 각 커맨드가 독립 실행있음 — 모든 커맨드가 한 번에 실행
격리성없음 — 다른 클라이언트 커맨드가 끼어들 수 있음있음 — 트랜잭션 중 다른 커맨드 차단
실패 처리개별 커맨드 실패 시 나머지 계속 실행EXEC 전에 에러 발생하면 전체 취소
성능빠름 (배치 전송)Pipeline보다 약간 느림 (큐잉 오버헤드)

둘을 조합할 수도 있다:

typescript
// 삭제 후 새 데이터 삽입을 한 번에
await redisService.executePipeline((pipeline) => {
  pipeline.del('ranking-snapshot');
  pipeline.rpush('ranking-snapshot', ...newRankingIds);
});

대부분의 경우 원자성이 필요 없다면 Pipeline만으로 충분하다. 캐시 저장, 배치 조회, 로그 기록 등 "실패해도 다른 커맨드에 영향 없는" 작업에 적합하다.

관련 문서

Pipeline 사용 시 주의점

메모리 사용량

Pipeline에 쌓는 커맨드가 너무 많으면 클라이언트와 서버 양쪽에서 메모리를 많이 쓴다. 서버 측에서는 응답을 버퍼에 쌓아두기 때문이다. 수만 개 이상의 커맨드를 파이프라이닝해야 한다면 적절한 크기(예: 1000개)로 나눠서 여러 번 실행하는 게 좋다:

typescript
const COUNT = 1000;

// 개별 호출
console.time('individual');
for (let i = 0; i < COUNT; i++) {
  await redis.set(`key:${i}`, `value:${i}`);
}
console.timeEnd('individual');

// Pipeline
console.time('pipeline');
const pipeline = redis.pipeline();
for (let i = 0; i < COUNT; i++) {
  pipeline.set(`key:${i}`, `value:${i}`);
}
await pipeline.exec();
console.timeEnd('pipeline');

에러 핸들링

Pipeline의 exec() 자체가 reject되는 경우는 네트워크 단절 등 치명적 상황뿐이다. 개별 커맨드 에러는 결과 배열의 첫 번째 요소에 담기므로, 결과를 반드시 확인해야 한다:

typescript
const pipeline = redis.pipeline();
pipeline.multi();
pipeline.set('key1', 'value1');
pipeline.set('key2', 'value2');
pipeline.exec(); // MULTI/EXEC의 exec
const results = await pipeline.exec(); // pipeline의 exec

의존성 있는 커맨드

Pipeline 안에서 이전 커맨드의 결과를 사용할 수 없다. 모든 커맨드가 한 번에 보내지기 때문이다:

typescript
const BATCH_SIZE = 1000;

for (let i = 0; i < items.length; i += BATCH_SIZE) {
  const batch = items.slice(i, i + BATCH_SIZE);
  await redisService.executePipeline((pipeline) => {
    for (const item of batch) {
      pipeline.hset(`item:${item.id}`, item);
    }
  });
}

실제 활용 패턴

배치 저장

새로운 데이터 목록을 한 번에 캐시에 저장할 때:

typescript
const results = await redisService.executePipeline((pipeline) => {
  pipeline.hset('valid-key', { field: 'value' });
  pipeline.incr('string-key'); // 타입 불일치 시 에러
});

for (const [err, result] of results) {
  if (err) {
    console.error('커맨드 실패:', err.message);
  }
}

배치 조회

여러 키의 데이터를 한 번에 읽어올 때:

typescript
// ❌ 이렇게 하면 안 됨
pipeline.get('counter');
pipeline.set('counter', previousValue + 1); // previousValue를 모름

// ✅ 의존성 있으면 분리
const value = await redis.get('counter');
await redis.set('counter', Number(value) + 1);

// ✅ 또는 Lua 스크립트로 서버 사이드에서 처리
await redis.eval(
  `local v = redis.call('GET', KEYS[1])
   return redis.call('SET', KEYS[1], tonumber(v) + 1)`,
  1,
  'counter'
);

삭제 + 재설정

기존 데이터를 지우고 새 데이터로 교체할 때:

typescript
async function cacheItems(items: Item[]) {
  await redisService.executePipeline((pipeline) => {
    for (const item of items) {
      pipeline.hset(`item:recent:${item.id}`, {
        id: item.id,
        title: item.title,
        thumbnail: item.imageUrl,
        createdAt: item.createdAt,
      });
    }
  });
}

큐 삽입

작업 큐에 여러 아이템을 한 번에 넣을 때:

typescript
async function getItems(keys: string[]) {
  const results = await redisService.executePipeline((pipeline) => {
    for (const key of keys) {
      pipeline.hgetall(key);
    }
  });

  return results
    .filter(([err]) => !err)
    .map(([, data]) => data);
}

정리

Redis Pipeline은 "같은 일을 여러 번 할 때 네트워크를 아끼자"는 단순한 아이디어지만, 실제 성능 차이는 극적이다. 트랜잭션과 달리 원자성을 보장하지 않으므로, 원자성이 필요하면 MULTI/EXEC을 조합하거나 Lua 스크립트를 써야 한다. 대부분의 배치 작업(캐시 저장, 대량 조회, 큐 삽입)에서는 Pipeline만으로 충분하고, 이걸 유틸리티 메서드로 래핑해두면 코드 중복 없이 깔끔하게 쓸 수 있다.