DataBase/Redis

Redis Pub/Sub 기능 사용하기 (feat. NestJS)

왈왈디 2025. 3. 2. 23:51
728x90

Publish-Subscribe 패턴

Redis의 pub/sub을 살펴보기 전에 pub/sub이 어떤 기능인지부터 살펴보자.

 

publish-subscribe (pub/sub)은

메시지 발행자(publisher)는 특정 채널에 메시지를 보내고,

해당 채널을 구독한 모든 구독자(subscriber)들은 그 메시지를 수신하는 방식이다.

이는, 발행자가 구독자에게 직접 메시지를 보내는 일반적인 메시징 패턴과 구분된다.

구독자가 없다면 메시지는 사라지고 복구되지 않는다.

 

이러한 구조는 발행자와 구독자가 서로를 알 필요 없이 독립적으로 동작할 수 있게 하여(decoupling),

시스템의 확장성과 유연성을 높인다.

 

Redis Pub/Sub

"channel11" 과 "ch:00" 채널을 구독하려는 클라이언트는 아래와 같이 입력한다.

SUBSCRIBE channel11 ch:00

 

이후 다른 클라이언트가 이 채널들에 메시지를 발행하면

이 채널들을 구독한 모든 클라이언트들에게 메시지가 보내진다.

 

구독자 클라이언트들은 메시지가 발행된 순서대로 메시지를 받게 된다.

 

하나 이상의 채널을 구독한 클라이언트는 다른 명령을 실행할 수 없지만,

추가적인 구독이나 구독 취소는 가능하다.

 

구독 및 구독 취소에 대한 응답은 메시지 형태로 전달되어,

클라이언트가 일관된 메시지 스트림을 읽을 수 있도록 한다.

그리고 응답의 첫번째 요소는 메시지의 타입을 의미한다.

 

RESP2(레디스 통신 프로토콜) 클라이언트에게 구독 중 허용되는 커맨드는 아래와 같다.

그러나 RESP3 에서는 구독 중 모든 커맨드가 허용된다.

 

Redis의 Pub/Sub은 최대 1회의 메시지 전달 보장(at-most-once delivery) 구조다.

즉, 메시지는 1회 전달되거나 전달이 되지 않을 수 있다(메시지 손실).

메시지가 다시 보내지는 경우는 없다.

메시지의 안정적인 전달이 중요한 경우에는 다른 메시징 시스템이나 패턴을 고려해야 한다.

 

Pub/Sub 기능은 주로 실시간 메시지 방송, 이벤트 알림, 채팅 시스템 등에서 활용된다.

 

 NestJS로 Redis Pub/Sub 기반 마이크로 서비스 만들기

먼저 필요한 패키지를 설치한다.

npm i --save ioredis

 

그리고 Redis transporter를 사용하기 위해

main.ts에서 createMicroservice() 메서드 옵션을 아래와 같이 설정한다.

// main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.REDIS,
  options: {
    host: 'localhost',
    port: 6379,
  },
});

(Transport enum은 @nestjs/microservices 패키지에 포함되어 있다.)

 

options의 프로퍼티는 선택한 transporter에 따라 달라진다.

Redis 트랜스포터의 옵션은 아래와 같다.

host 커넥션 url
port 커넥션 포트
retryAttempts 메시지 재시도 횟수 (default: 0)
retryDelay 메시지 재시도 간 지연시간 (ms) (default: 0)
wildcards Redis 와일드 카드 구독을 허용하여, 트랜스포터가 psubscribe/pmessage 사용할 지 여부 (default: false)

 

클라이언트

Redis ClientProxy 인스턴스를 생성하는 방법은 여러가지다.

그 중 한 가지는 ClientsModule를 사용하는 것이다.

 

ClientsModule로 클라이언트를 생성하기 위해서는 

import하고 register() 메서드를 사용해서 위 createMicroservice() 메서드와 동일한 옵션을 적용해야 한다.

name 값으로는 injection token으로 사용될 값을 넣어줘야 한다.

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.REDIS,
        options: {
          host: 'localhost',
          port: 6379,
        }
      },
    ]),
  ]
  ...
})

 

그 외에도 ClientProxyFactory 또는 @Client()를 사용하는 방법이 있다. (참고 자료)

 

컨텍스트

복잡한 경우에는 요청에 대한 추가 정보를 확인해야 한다.

레디스 트랜스포터를 사용하는 경우, RedisContext에 접근할 수 있다.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RedisContext) {
  console.log(`Channel: ${context.getChannel()}`);
}

(@Payload(), @Ctx(), RedisContext는 @nestjs/microservices 패키지에 포함되어 있다.)

 

와일드 카드

트랜스포터가 psubscribe, pmessage와 같은 기능을 사용하도록 하려면

와일드 카드 기능을 활성화하기 위해 wildcards 옵션을 true로 설정해야 한다.

const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.REDIS,
  options: {
    // Other options
    wildcards: true,
  },
});

클라이언트 인스턴스 생성 시에도 놓치지 않게 주의해야 한다.

 

이 옵션이 활성화되면 메시지와 이벤트 패턴에서 와일드 카드를 사용할 수 있다.

예를 들어, noticiations로 시작하는 모든 채널을 구독하려면 아래와 같이 할 수 있다.

@EventPattern('notifications.*')

 

인스턴스 상태 업데이트

커넥션이나 드라이버 인스턴스의 상태에 대한 실시간 업데이트를 받기 위해

status stream을 구독할 수 있다.

이 stream은 선택된 특정 드라이버의 상태 업데이트를 제공한다.

레디스의 경우 connected, disconnected, reconnecting 이벤트가 발생한다.

this.client.status.subscribe((status: RedisStatus) => {
  console.log(status);
});

(RedisStatus는 @nestjs/microservices 패키지에 포함되어 있다.)

 

유사하게 서버의 status stream을 구독할 수도 있다.

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: RedisStatus) => {
  console.log(status);
});

 

레디스 이벤트 수신

마이크로 서비스의 내부 이벤트를 수신하고 싶을 수 있다.

예를 들어, 에러가 발생했을 때 추가 작업을 트리거하기 위해

아래와 같이 on() 메서드를 사용하여 에러 이벤트를 수신할 수 있다.

this.client.on('error', (err) => {
  console.error(err);
});

 

유사하게 서버의 내부 이벤트도 수신할 수 있다.

server.on<RedisEvents>('error', (err) => {
  console.error(err);
});

(RedisEvents는 @nestjs/microservices 패키지에 포함되어 있다.)

 

내부 드라이버 접근

좀 더 고도화된 작업을 위해, 내부 드라이버 인스턴스에 접근해야 할 때가 있다.

직접 수동으로 커넥션을 종료하거나, 드라이버별 메서드를 사용할 때 유용하다.

하지만 대부분의 경우 직접 드라이버에 접근할 필요가 없음을 유의해야 한다.

 

NestJS에서 기본 드라이버 인스턴스에 접근하려면 unwrap() 메서드를 사용할 수 있다.

이 메서드는 기본 드라이버 인스턴스를 반환하며,

제네릭 타입 매개변수로 예상되는 드라이버 인스턴스의 타입을 지정해야 한다.

const [pub, sub] =
  this.client.unwrap<[import('ioredis').Redis, import('ioredis').Redis]>();

 

마찬가지로, 서버의 기본 드라이버 인스턴스에 접근하는 방식은 아래와 같다.

const [pub, sub] =
  server.unwrap<[import('ioredis').Redis, import('ioredis').Redis]>();

 

Redis 트랜스포터는 다른 트랜스포터와 달리 두 개의 ioredis 인스턴스로 구성된 튜플을 반환한다:

첫 번째는 메시지 발행에 사용되며, 두 번째는 메시지 구독에 사용된다.

 

참고 자료

728x90

'DataBase > Redis' 카테고리의 다른 글

[Redis] Redis란? 설치 방법과 활용법  (0) 2023.09.12