junyeokk
Blog
JavaScript·2025. 11. 15

RxJS 기초

비동기 데이터를 다루는 건 프론트엔드든 백엔드든 피할 수 없는 문제다. HTTP 요청, 사용자 입력, 웹소켓 메시지, 타이머 — 이 모든 게 "시간에 걸쳐 발생하는 값"이다. JavaScript에는 이미 Promise, async/await, 콜백 같은 비동기 처리 도구가 있는데, 왜 RxJS가 필요할까?

Promise는 단일 비동기 값을 다루는 데 최적화되어 있다. API 요청 하나를 보내고 응답 하나를 받는 패턴에 완벽하다. 하지만 웹소켓처럼 여러 값이 시간에 걸쳐 연속으로 도착하는 경우, Promise로는 자연스럽게 표현할 수 없다. 매번 새 Promise를 만들거나, 콜백 기반으로 돌아가야 한다.

콜백은 여러 값을 받을 수 있지만, 취소가 어렵고, 에러 처리가 일관되지 않고, 여러 비동기 스트림을 조합하려면 코드가 순식간에 복잡해진다. 이른바 "콜백 지옥"이 발생한다.

RxJS는 이 문제를 Observable이라는 단일 추상화로 해결한다. "시간에 걸쳐 발생하는 0개 이상의 값"을 하나의 타입으로 표현하고, 이 스트림을 변환하고 조합하는 연산자(operator)를 제공한다. 마치 배열의 map, filter를 시간축으로 확장한 것과 같다.


Observable과 Observer

RxJS의 핵심은 Observable(관찰 가능한 스트림)과 Observer(관찰자) 두 가지다.

Observable은 "앞으로 어떤 값들이 올 거야"라는 선언이다. 아직 아무 일도 일어나지 않는다. subscribe()를 호출해야 비로소 값이 흘러나오기 시작한다. 이걸 lazy evaluation이라고 한다.

typescript
import { Observable } from 'rxjs';

const numbers$ = new Observable<number>((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

관례적으로 Observable 변수명 끝에 $를 붙인다. numbers$는 아직 아무 값도 생산하지 않는다. subscribe해야 실행된다.

Observer는 Observable이 보내는 값을 받는 쪽이다. 세 가지 콜백으로 구성된다.

typescript
numbers$.subscribe({
  next: (value) => console.log(value),     // 값이 도착할 때
  error: (err) => console.error(err),      // 에러 발생 시
  complete: () => console.log('완료'),      // 스트림 종료 시
});
// 출력: 1, 2, 3, 완료

next는 값이 올 때마다 호출되고, error는 에러가 발생하면 호출된 후 스트림이 종료되고, complete는 모든 값이 전달된 후 호출된다. errorcomplete는 둘 중 하나만 발생할 수 있고, 한 번 발생하면 스트림은 끝난다.

간단하게 함수만 전달할 수도 있다.

typescript
numbers$.subscribe((value) => console.log(value));

이 경우 첫 번째 인자가 next 콜백이 된다.

Promise와의 핵심 차이

특성PromiseObservable
값 개수1개0개 ~ 무한
실행 시점생성 즉시 (eager)subscribe 시 (lazy)
취소불가 (AbortController 별도)unsubscribe()
멀티캐스트항상 공유기본 유니캐스트, Subject로 멀티캐스트

Promise는 생성하는 순간 실행이 시작되고, 취소할 수 없다. Observable은 subscribe하기 전까지 아무 일도 안 하고, unsubscribe()로 언제든 중단할 수 있다.


Subscription과 메모리 누수 방지

subscribe()Subscription 객체를 반환한다. 이걸로 구독을 취소할 수 있다.

typescript
const subscription = interval(1000).subscribe((n) => {
  console.log(`${n}초 경과`);
});

// 5초 후 구독 취소
setTimeout(() => {
  subscription.unsubscribe();
}, 5000);

구독 취소를 빠뜨리면 메모리 누수가 발생한다. 특히 NestJS에서 모듈이 destroy될 때, 또는 React에서 컴포넌트가 언마운트될 때 반드시 정리해야 한다. 이건 setIntervalclearInterval과 같은 맥락이다.

여러 구독을 한 번에 관리하려면 Subscriptionadd 메서드를 사용한다.

typescript
const parent = new Subscription();

parent.add(stream1$.subscribe(handler1));
parent.add(stream2$.subscribe(handler2));

// 한 번에 모두 정리
parent.unsubscribe();

생성 함수 (Creation Functions)

매번 new Observable()로 만들기보다, RxJS가 제공하는 생성 함수를 사용하는 게 일반적이다.

of — 고정된 값들을 순서대로 방출

typescript
import { of } from 'rxjs';

of(1, 2, 3).subscribe(console.log);
// 1, 2, 3 → complete

동기적으로 즉시 모든 값을 방출하고 완료한다. 테스트나 기본값 제공에 유용하다.

from — 배열, Promise, Iterable을 Observable로 변환

typescript
import { from } from 'rxjs';

// 배열
from([10, 20, 30]).subscribe(console.log);
// 10, 20, 30

// Promise
from(fetch('/api/users')).subscribe(console.log);
// Response 객체

기존 비동기 코드를 Observable 세계로 가져올 때 가장 많이 사용한다. Promise를 from으로 감싸면 Observable의 연산자를 적용할 수 있게 된다.

interval / timer — 시간 기반 스트림

typescript
import { interval, timer } from 'rxjs';

// 1초마다 0, 1, 2, 3... 방출
interval(1000).subscribe(console.log);

// 3초 후에 한 번 방출하고 완료
timer(3000).subscribe(() => console.log('3초 지남'));

// 2초 후 시작, 이후 1초 간격
timer(2000, 1000).subscribe(console.log);

intervalsetInterval의 Observable 버전이다. 차이점은 unsubscribe()로 깔끔하게 정리된다는 것.

fromEvent — DOM 이벤트를 스트림으로

typescript
import { fromEvent } from 'rxjs';

const clicks$ = fromEvent(document, 'click');
clicks$.subscribe((event) => console.log(event.clientX, event.clientY));

addEventListener를 Observable로 감싸서, 다른 연산자와 조합할 수 있게 만든다. 검색 입력의 디바운스 처리 같은 패턴을 선언적으로 구현할 수 있다.

throwError / EMPTY / NEVER — 특수 Observable

typescript
import { throwError, EMPTY, NEVER } from 'rxjs';

// 즉시 에러 방출
throwError(() => new Error('문제 발생'));

// 값 없이 즉시 complete
EMPTY.subscribe({ complete: () => console.log('끝') });

// 아무것도 방출하지 않고, 완료도 하지 않음
NEVER.subscribe(); // 영원히 대기

throwError는 에러 흐름을 만들 때, EMPTY는 "아무것도 하지 않고 끝내기"에, NEVER는 테스트에서 주로 사용한다.


pipe와 연산자 (Operators)

RxJS의 진짜 힘은 연산자에 있다. Observable 스트림을 변환하고 필터링하고 조합하는 함수들이다. pipe() 메서드 안에 연산자를 체이닝해서 사용한다.

typescript
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    filter((n) => n % 2 === 0),  // 짝수만
    map((n) => n * 10),           // 10을 곱함
  )
  .subscribe(console.log);
// 20, 40

pipe는 왼쪽에서 오른쪽으로 순서대로 적용된다. 배열의 filter().map()과 비슷하지만, 차이점은 값이 하나씩 흘러간다는 것이다. 배열은 filter가 전체를 처리한 뒤 map이 시작되지만, Observable은 값 하나가 filtermap을 모두 통과한 뒤 다음 값이 들어온다.

변환 연산자

map — 값 변환

typescript
import { map } from 'rxjs/operators';

from([1, 2, 3]).pipe(
  map((n) => n * 2),
).subscribe(console.log);
// 2, 4, 6

가장 기본적인 연산자. 배열의 Array.map()과 동일한 역할이다.

switchMap — 내부 Observable로 전환

typescript
import { fromEvent, switchMap } from 'rxjs';

fromEvent(input, 'input').pipe(
  switchMap((event) => fetch(`/api/search?q=${event.target.value}`)),
).subscribe((results) => console.log(results));

switchMap은 새 값이 들어오면 이전 내부 Observable을 취소하고 새 것으로 전환한다. 검색 자동완성에서 핵심적인 역할을 한다. 사용자가 빠르게 타이핑하면 이전 요청이 자동으로 취소되기 때문에, 오래된 응답이 최신 입력을 덮어쓰는 문제(race condition)가 발생하지 않는다.

mergeMap — 모든 내부 Observable을 병렬로 실행

typescript
import { from, mergeMap } from 'rxjs';

from([1, 2, 3]).pipe(
  mergeMap((id) => fetch(`/api/users/${id}`)),
).subscribe(console.log);

switchMap과 달리 이전 것을 취소하지 않는다. 모든 내부 Observable이 동시에 실행되고, 완료되는 순서대로 값이 나온다. 순서가 보장되지 않는다는 점에 주의.

concatMap — 순서 보장, 하나씩 실행

typescript
import { from, concatMap, delay } from 'rxjs';

from([1, 2, 3]).pipe(
  concatMap((n) => of(n).pipe(delay(1000))),
).subscribe(console.log);
// 1초 후 1, 2초 후 2, 3초 후 3

이전 내부 Observable이 완료된 후에 다음 것을 시작한다. 순서가 중요한 작업(파일 순차 업로드 등)에 적합하다.

exhaustMap — 진행 중이면 무시

typescript
import { fromEvent, exhaustMap } from 'rxjs';

fromEvent(button, 'click').pipe(
  exhaustMap(() => saveData()),
).subscribe();

현재 내부 Observable이 진행 중이면 새로 들어오는 값을 무시한다. 폼 제출 버튼의 중복 클릭 방지에 유용하다.

이 네 가지 "Higher-Order Mapping" 연산자의 차이를 정리하면:

연산자새 값이 들어왔을 때용도
switchMap이전 것 취소, 새 것 시작검색, 자동완성
mergeMap이전 것 유지, 새 것도 시작병렬 요청
concatMap이전 것 완료 후 새 것 시작순차 실행
exhaustMap이전 것 진행 중이면 새 것 무시중복 방지

필터링 연산자

filter — 조건에 맞는 값만 통과

typescript
import { filter } from 'rxjs/operators';

from([1, 2, 3, 4, 5]).pipe(
  filter((n) => n > 3),
).subscribe(console.log);
// 4, 5

distinctUntilChanged — 연속 중복 제거

typescript
import { distinctUntilChanged } from 'rxjs/operators';

from([1, 1, 2, 2, 3, 1]).pipe(
  distinctUntilChanged(),
).subscribe(console.log);
// 1, 2, 3, 1

연속으로 같은 값이 들어오면 무시한다. 배열의 [...new Set()]과 다르게, 연속이 아닌 중복(마지막 1)은 통과시킨다. 상태 변화 감지에서 "실제로 값이 바뀌었을 때만" 로직을 실행하는 데 유용하다.

take / takeUntil — 스트림 제한

typescript
import { take, takeUntil, interval, fromEvent } from 'rxjs';

// 처음 3개만 받고 완료
interval(1000).pipe(take(3)).subscribe(console.log);
// 0, 1, 2 → complete

// 클릭할 때까지만 받기
const stop$ = fromEvent(button, 'click');
interval(1000).pipe(takeUntil(stop$)).subscribe(console.log);

takeUntil은 특히 Angular/NestJS에서 컴포넌트/모듈 정리할 때 패턴처럼 사용된다. destroySubject를만들어서takeUntil(this.destroy Subject를 만들어서 `takeUntil(this.destroy)`로 모든 구독을 자동 정리한다.

debounceTime / throttleTime — 시간 기반 필터

typescript
import { debounceTime, throttleTime } from 'rxjs/operators';

// 300ms 동안 새 입력 없으면 마지막 값 방출
fromEvent(input, 'input').pipe(
  debounceTime(300),
).subscribe(handleSearch);

// 1초에 한 번만 방출
fromEvent(window, 'scroll').pipe(
  throttleTime(1000),
).subscribe(handleScroll);

debounceTime은 "조용해진 후" 방출한다. 검색 입력에서 사용자가 타이핑을 멈춘 후에 API를 호출하는 데 적합하다. throttleTime은 "일정 간격마다" 방출한다. 스크롤 이벤트처럼 빈번한 이벤트의 처리 빈도를 제한하는 데 적합하다.

조합 연산자

combineLatest — 모든 스트림의 최신 값 조합

typescript
import { combineLatest } from 'rxjs';

const name$ = fromEvent(nameInput, 'input').pipe(map((e) => e.target.value));
const age$ = fromEvent(ageInput, 'input').pipe(map((e) => e.target.value));

combineLatest([name$, age$]).subscribe(([name, age]) => {
  console.log(`이름: ${name}, 나이: ${age}`);
});

어느 하나라도 새 값을 방출하면, 모든 스트림의 최신 값을 배열로 묶어서 방출한다. 단, 모든 스트림이 최소 한 번은 값을 방출한 후에야 동작한다. 여러 필터 조건을 조합해서 데이터를 요청하는 패턴에 유용하다.

merge — 여러 스트림을 하나로 합치기

typescript
import { merge, fromEvent } from 'rxjs';

const clicks$ = fromEvent(document, 'click');
const keys$ = fromEvent(document, 'keydown');

merge(clicks$, keys$).subscribe(() => {
  console.log('사용자 활동 감지');
});

어느 스트림에서든 값이 나오면 그대로 통과시킨다. 여러 이벤트 소스를 하나로 합치는 데 사용한다.

forkJoin — 모든 스트림이 완료된 후 마지막 값

typescript
import { forkJoin } from 'rxjs';

forkJoin({
  users: from(fetch('/api/users').then((r) => r.json())),
  posts: from(fetch('/api/posts').then((r) => r.json())),
}).subscribe(({ users, posts }) => {
  console.log(users, posts);
});

Promise.all()과 유사하다. 모든 Observable이 complete된 후에 각각의 마지막 값을 객체(또는 배열)로 묶어서 방출한다. 여러 API를 병렬로 호출하고 모든 응답이 도착한 후에 처리하는 패턴에 적합하다.

유틸리티 연산자

tap — 사이드 이펙트 (값을 변경하지 않음)

typescript
import { tap } from 'rxjs/operators';

from([1, 2, 3]).pipe(
  tap((n) => console.log('디버깅:', n)),
  map((n) => n * 2),
).subscribe(console.log);
// 디버깅: 1 → 2
// 디버깅: 2 → 4
// 디버깅: 3 → 6

스트림을 통과하는 값에 영향을 주지 않으면서 로깅이나 디버깅에 사용한다. 파이프라인 중간에 끼워 넣어서 "여기까지 어떤 값이 왔는지" 확인하는 용도로 자주 쓴다.

catchError — 에러 핸들링

typescript
import { catchError, of } from 'rxjs';

from(fetch('/api/data')).pipe(
  catchError((err) => {
    console.error('에러 발생:', err);
    return of([]); // 대체 값으로 복구
  }),
).subscribe(console.log);

에러가 발생하면 새로운 Observable로 대체해서 스트림이 죽지 않도록 한다. try/catch의 Observable 버전이라고 보면 된다. 반드시 새 Observable을 반환해야 한다.

retry — 자동 재시도

typescript
import { retry, timer } from 'rxjs';

from(fetch('/api/unstable')).pipe(
  retry({ count: 3, delay: (_, retryCount) => timer(retryCount * 1000) }),
).subscribe(console.log);

에러가 발생하면 자동으로 다시 구독한다. count로 최대 재시도 횟수를, delay로 재시도 간격을 설정할 수 있다. 불안정한 네트워크 환경에서 API 호출의 안정성을 높이는 데 유용하다.


Subject

일반 Observable은 유니캐스트다. subscribe할 때마다 독립적인 실행이 만들어진다. 이걸 일상적으로 비유하면, 유니캐스트는 "전화 통화"와 같다 — 상대방마다 별도의 통화가 필요하다.

Subject는 멀티캐스트 Observable이다. 하나의 실행을 여러 Observer가 공유한다. "라디오 방송"과 같아서, 한 번 송출하면 여러 청취자가 동시에 듣는다.

typescript
import { Subject } from 'rxjs';

const subject = new Subject<number>();

// 구독자 A
subject.subscribe((v) => console.log('A:', v));

// 구독자 B
subject.subscribe((v) => console.log('B:', v));

// 값 방출
subject.next(1);
// A: 1
// B: 1

subject.next(2);
// A: 2
// B: 2

Subject는 Observable이면서 동시에 Observer다. next()로 값을 밀어넣을 수 있고, subscribe()로 값을 받을 수 있다. 이벤트 버스나 상태 공유에 자주 사용된다.

BehaviorSubject — 현재 값을 가진 Subject

typescript
import { BehaviorSubject } from 'rxjs';

const state$ = new BehaviorSubject<string>('초기값');

// 구독하면 즉시 현재 값을 받음
state$.subscribe((v) => console.log('구독자:', v));
// 구독자: 초기값

state$.next('변경됨');
// 구독자: 변경됨

// 현재 값 동기적으로 접근
console.log(state$.getValue()); // '변경됨'

일반 Subject와 달리 항상 현재 값을 가지고 있다. 새로 구독하면 마지막 값을 즉시 받는다. 상태 관리에 적합하다. getValue()로 subscribe 없이 현재 값에 접근할 수도 있지만, 리액티브 패턴에서는 권장하지 않는다.

ReplaySubject — N개의 과거 값을 기억

typescript
import { ReplaySubject } from 'rxjs';

const replay$ = new ReplaySubject<number>(3); // 최근 3개 기억

replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);

replay$.subscribe((v) => console.log(v));
// 2, 3, 4 (최근 3개)

버퍼 크기를 지정해서 과거 N개의 값을 새 구독자에게 재생해준다. 채팅 기록이나 로그 스트림에서 "최근 N개 메시지"를 보여주는 패턴에 유용하다.

AsyncSubject — 완료 시 마지막 값만

typescript
import { AsyncSubject } from 'rxjs';

const async$ = new AsyncSubject<number>();

async$.subscribe((v) => console.log(v));

async$.next(1);
async$.next(2);
async$.next(3);
async$.complete();
// 3 (complete 후 마지막 값만 방출)

complete()가 호출될 때까지 아무것도 방출하지 않다가, 완료되면 마지막 값 하나만 방출한다. Promise와 가장 비슷한 동작이다.


NestJS에서의 RxJS

NestJS는 내부적으로 RxJS를 사용한다. HTTP 요청의 인터셉터, 가드, 파이프 등의 실행 파이프라인이 Observable 기반으로 동작한다.

인터셉터에서 Observable 활용

NestJS 인터셉터의 intercept 메서드는 Observable을 반환한다. 요청-응답 흐름을 Observable 파이프라인으로 다룰 수 있다.

typescript
import { Injectable, NestInterceptor, ExecutionContext, CallHandler } from '@nestjs/common';
import { Observable } from 'rxjs';
import { tap, map } from 'rxjs/operators';

@Injectable()
export class LoggingInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    const now = Date.now();
    
    return next.handle().pipe(
      tap(() => console.log(`요청 처리 시간: ${Date.now() - now}ms`)),
    );
  }
}

next.handle()이 컨트롤러 핸들러를 실행하고, 그 결과를 Observable로 반환한다. pipe로 연산자를 적용해서 응답을 변환하거나, 로깅하거나, 캐싱할 수 있다.

응답 변환 인터셉터

typescript
@Injectable()
export class TransformInterceptor<T> implements NestInterceptor<T, Response<T>> {
  intercept(context: ExecutionContext, next: CallHandler): Observable<Response<T>> {
    return next.handle().pipe(
      map((data) => ({
        success: true,
        data,
        timestamp: new Date().toISOString(),
      })),
    );
  }
}

모든 응답을 통일된 형식으로 감싸는 패턴이다. map 연산자로 컨트롤러가 반환한 데이터를 변환한다.

타임아웃 인터셉터

typescript
import { timeout, catchError } from 'rxjs/operators';
import { throwError, TimeoutError } from 'rxjs';

@Injectable()
export class TimeoutInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    return next.handle().pipe(
      timeout(5000),
      catchError((err) => {
        if (err instanceof TimeoutError) {
          return throwError(() => new RequestTimeoutException());
        }
        return throwError(() => err);
      }),
    );
  }
}

timeout 연산자로 5초 안에 응답이 없으면 TimeoutError를 발생시킨다. catchError로 이 에러를 잡아서 HTTP 408 응답으로 변환한다.

컨트롤러에서 Observable 반환

NestJS 컨트롤러는 Promise뿐 아니라 Observable도 반환할 수 있다. NestJS가 자동으로 subscribe해서 값을 응답으로 보낸다.

typescript
@Get('users')
getUsers(): Observable<User[]> {
  return this.httpService.get('/users').pipe(
    map((response) => response.data),
    catchError(() => of([])),
  );
}

NestJS의 HttpService(@nestjs/axios)는 Axios를 Observable로 감싸서 제공한다. 이렇게 하면 RxJS 연산자로 응답을 변환하고 에러를 처리할 수 있다.


Hot Observable vs Cold Observable

이 개념은 RxJS를 이해하는 데 중요하다.

Cold Observable은 subscribe할 때마다 새로운 실행이 만들어진다. of(1, 2, 3)이나 from(fetch(...))가 대표적이다. DVD 플레이어와 같다 — 재생 버튼을 누르면 처음부터 시작한다.

typescript
const cold$ = new Observable((subscriber) => {
  console.log('실행 시작');
  subscriber.next(Math.random());
  subscriber.complete();
});

cold$.subscribe((v) => console.log('A:', v)); // 실행 시작, A: 0.123
cold$.subscribe((v) => console.log('B:', v)); // 실행 시작, B: 0.456
// A와 B는 다른 값을 받음 — 각각 독립 실행

Hot Observable은 구독 여부와 상관없이 값을 방출한다. Subject가 대표적이다. 라이브 방송과 같다 — 중간에 채널을 맞추면 그 시점부터 보기 시작한다.

typescript
const hot$ = new Subject<number>();

hot$.next(1); // 아무도 안 듣고 있음 — 사라짐

hot$.subscribe((v) => console.log('A:', v));
hot$.next(2); // A: 2

hot$.subscribe((v) => console.log('B:', v));
hot$.next(3); // A: 3, B: 3

Subject로 next(1)을 호출했을 때 구독자가 없으면 그 값은 사라진다. 구독 시점에 따라 받는 값이 달라진다.


실전 패턴: 검색 자동완성

RxJS의 여러 연산자를 조합하면 복잡한 비동기 로직을 선언적으로 표현할 수 있다. 검색 자동완성은 대표적인 예시다.

typescript
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, filter, catchError } from 'rxjs/operators';

const search$ = fromEvent<InputEvent>(searchInput, 'input').pipe(
  map((event) => (event.target as HTMLInputElement).value.trim()),
  filter((query) => query.length >= 2),       // 2글자 이상만
  debounceTime(300),                          // 300ms 대기
  distinctUntilChanged(),                     // 같은 검색어 무시
  switchMap((query) =>                        // 새 검색이면 이전 요청 취소
    from(fetch(`/api/search?q=${query}`).then((r) => r.json())).pipe(
      catchError(() => of([])),               // 에러 시 빈 배열
    ),
  ),
);

search$.subscribe((results) => renderResults(results));

이 코드가 하는 일을 풀어보면:

  1. 입력 이벤트에서 값을 추출하고 양쪽 공백을 제거한다
  2. 2글자 미만이면 무시한다
  3. 300ms 동안 추가 입력이 없을 때만 진행한다
  4. 이전 검색어와 같으면 무시한다
  5. API를 호출하되, 새 검색어가 들어오면 이전 요청을 취소한다
  6. 에러가 나면 빈 배열로 대체한다

이걸 콜백이나 Promise 체인으로 구현하려면 타이머 관리, 이전 요청 취소, 중복 검사를 모두 수동으로 해야 한다. RxJS는 이런 복잡한 비동기 흐름을 파이프라인 하나로 표현한다.


마블 다이어그램 읽는 법

RxJS 문서에서 자주 볼 수 있는 마블 다이어그램은 Observable의 동작을 시각적으로 표현한다.

text
source:  --1--2--3--4--5--|
filter:  --1-----3-----5--|
map:     --2-----6----10--|
  • --는 시간의 흐름
  • 숫자나 문자는 방출된 값
  • |는 complete
  • X는 error
  • ^는 subscribe 시점

위 다이어그램은 1~5를 방출하는 스트림에서 홀수만 필터링한 뒤 2를 곱하는 과정을 보여준다. 연산자의 동작을 직관적으로 이해하는 데 유용하니, 공식 문서에서 새 연산자를 볼 때 마블 다이어그램부터 확인하면 빠르게 이해할 수 있다.


정리

RxJS는 학습 곡선이 가파르다. 연산자가 100개가 넘고, Observable의 동작 방식이 Promise와 근본적으로 다르기 때문이다. 하지만 핵심 개념은 몇 가지로 요약된다.

  1. Observable은 시간에 걸쳐 값을 방출하는 스트림이다
  2. pipe + 연산자로 스트림을 변환, 필터링, 조합한다
  3. subscribe로 값을 받고, unsubscribe로 정리한다
  4. Subject는 값을 직접 밀어넣을 수 있는 멀티캐스트 Observable이다

실무에서 자주 쓰는 연산자는 map, filter, switchMap, tap, catchError, debounceTime, takeUntil 정도다. 이것만 익혀도 대부분의 비동기 패턴을 처리할 수 있다. 나머지는 필요할 때 찾아서 쓰면 된다.


관련 문서