Publish-Subscribe 패턴
Redis의 pub/sub을 살펴보기 전에 pub/sub이 어떤 기능인지부터 살펴보자.
publish-subscribe (pub/sub)은
메시지 발행자(publisher)는 특정 채널에 메시지를 보내고,
해당 채널을 구독한 모든 구독자(subscriber)들은 그 메시지를 수신하는 방식이다.
이는, 발행자가 구독자에게 직접 메시지를 보내는 일반적인 메시징 패턴과 구분된다.
구독자가 없다면 메시지는 사라지고 복구되지 않는다.
이러한 구조는 발행자와 구독자가 서로를 알 필요 없이 독립적으로 동작할 수 있게 하여(decoupling),
시스템의 확장성과 유연성을 높인다.
Redis Pub/Sub
"channel11" 과 "ch:00" 채널을 구독하려는 클라이언트는 아래와 같이 입력한다.
SUBSCRIBE channel11 ch:00
이후 다른 클라이언트가 이 채널들에 메시지를 발행하면
이 채널들을 구독한 모든 클라이언트들에게 메시지가 보내진다.
구독자 클라이언트들은 메시지가 발행된 순서대로 메시지를 받게 된다.
하나 이상의 채널을 구독한 클라이언트는 다른 명령을 실행할 수 없지만,
추가적인 구독이나 구독 취소는 가능하다.
구독 및 구독 취소에 대한 응답은 메시지 형태로 전달되어,
클라이언트가 일관된 메시지 스트림을 읽을 수 있도록 한다.
그리고 응답의 첫번째 요소는 메시지의 타입을 의미한다.
RESP2(레디스 통신 프로토콜) 클라이언트에게 구독 중 허용되는 커맨드는 아래와 같다.
그러나 RESP3 에서는 구독 중 모든 커맨드가 허용된다.
Redis의 Pub/Sub은 최대 1회의 메시지 전달 보장(at-most-once delivery) 구조다.
즉, 메시지는 1회 전달되거나 전달이 되지 않을 수 있다(메시지 손실).
메시지가 다시 보내지는 경우는 없다.
메시지의 안정적인 전달이 중요한 경우에는 다른 메시징 시스템이나 패턴을 고려해야 한다.
Pub/Sub 기능은 주로 실시간 메시지 방송, 이벤트 알림, 채팅 시스템 등에서 활용된다.
NestJS로 Redis Pub/Sub 기반 마이크로 서비스 만들기
먼저 필요한 패키지를 설치한다.
npm i --save ioredis
그리고 Redis transporter를 사용하기 위해
main.ts에서 createMicroservice() 메서드 옵션을 아래와 같이 설정한다.
// main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
},
});
(Transport enum은 @nestjs/microservices 패키지에 포함되어 있다.)
options의 프로퍼티는 선택한 transporter에 따라 달라진다.
Redis 트랜스포터의 옵션은 아래와 같다.
host | 커넥션 url |
port | 커넥션 포트 |
retryAttempts | 메시지 재시도 횟수 (default: 0) |
retryDelay | 메시지 재시도 간 지연시간 (ms) (default: 0) |
wildcards | Redis 와일드 카드 구독을 허용하여, 트랜스포터가 psubscribe/pmessage 사용할 지 여부 (default: false) |
클라이언트
Redis ClientProxy 인스턴스를 생성하는 방법은 여러가지다.
그 중 한 가지는 ClientsModule를 사용하는 것이다.
ClientsModule로 클라이언트를 생성하기 위해서는
import하고 register() 메서드를 사용해서 위 createMicroservice() 메서드와 동일한 옵션을 적용해야 한다.
name 값으로는 injection token으로 사용될 값을 넣어줘야 한다.
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
}
},
]),
]
...
})
그 외에도 ClientProxyFactory 또는 @Client()를 사용하는 방법이 있다. (참고 자료)
컨텍스트
복잡한 경우에는 요청에 대한 추가 정보를 확인해야 한다.
레디스 트랜스포터를 사용하는 경우, RedisContext에 접근할 수 있다.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RedisContext) {
console.log(`Channel: ${context.getChannel()}`);
}
(@Payload(), @Ctx(), RedisContext는 @nestjs/microservices 패키지에 포함되어 있다.)
와일드 카드
트랜스포터가 psubscribe, pmessage와 같은 기능을 사용하도록 하려면
와일드 카드 기능을 활성화하기 위해 wildcards 옵션을 true로 설정해야 한다.
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.REDIS,
options: {
// Other options
wildcards: true,
},
});
클라이언트 인스턴스 생성 시에도 놓치지 않게 주의해야 한다.
이 옵션이 활성화되면 메시지와 이벤트 패턴에서 와일드 카드를 사용할 수 있다.
예를 들어, noticiations로 시작하는 모든 채널을 구독하려면 아래와 같이 할 수 있다.
@EventPattern('notifications.*')
인스턴스 상태 업데이트
커넥션이나 드라이버 인스턴스의 상태에 대한 실시간 업데이트를 받기 위해
status stream을 구독할 수 있다.
이 stream은 선택된 특정 드라이버의 상태 업데이트를 제공한다.
레디스의 경우 connected, disconnected, reconnecting 이벤트가 발생한다.
this.client.status.subscribe((status: RedisStatus) => {
console.log(status);
});
(RedisStatus는 @nestjs/microservices 패키지에 포함되어 있다.)
유사하게 서버의 status stream을 구독할 수도 있다.
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: RedisStatus) => {
console.log(status);
});
레디스 이벤트 수신
마이크로 서비스의 내부 이벤트를 수신하고 싶을 수 있다.
예를 들어, 에러가 발생했을 때 추가 작업을 트리거하기 위해
아래와 같이 on() 메서드를 사용하여 에러 이벤트를 수신할 수 있다.
this.client.on('error', (err) => {
console.error(err);
});
유사하게 서버의 내부 이벤트도 수신할 수 있다.
server.on<RedisEvents>('error', (err) => {
console.error(err);
});
(RedisEvents는 @nestjs/microservices 패키지에 포함되어 있다.)
내부 드라이버 접근
좀 더 고도화된 작업을 위해, 내부 드라이버 인스턴스에 접근해야 할 때가 있다.
직접 수동으로 커넥션을 종료하거나, 드라이버별 메서드를 사용할 때 유용하다.
하지만 대부분의 경우 직접 드라이버에 접근할 필요가 없음을 유의해야 한다.
NestJS에서 기본 드라이버 인스턴스에 접근하려면 unwrap() 메서드를 사용할 수 있다.
이 메서드는 기본 드라이버 인스턴스를 반환하며,
제네릭 타입 매개변수로 예상되는 드라이버 인스턴스의 타입을 지정해야 한다.
const [pub, sub] =
this.client.unwrap<[import('ioredis').Redis, import('ioredis').Redis]>();
마찬가지로, 서버의 기본 드라이버 인스턴스에 접근하는 방식은 아래와 같다.
const [pub, sub] =
server.unwrap<[import('ioredis').Redis, import('ioredis').Redis]>();
Redis 트랜스포터는 다른 트랜스포터와 달리 두 개의 ioredis 인스턴스로 구성된 튜플을 반환한다:
첫 번째는 메시지 발행에 사용되며, 두 번째는 메시지 구독에 사용된다.
참고 자료
'DataBase > Redis' 카테고리의 다른 글
[Redis] Redis란? 설치 방법과 활용법 (0) | 2023.09.12 |
---|