![Thumbnail image](/images/spring-basic.png)
Table of Contents
[스프링인액션] 비동기 메시지 전송하기 - RabbitMQ
개요
스프링인액션 8장을 읽고 RabbitMQ와 AMQP를 사용한 비동기 메시징을 공부하였다.
- RabbitMQ의 기본 개념 및 설정
- RabbitMQ를 이용한 메시지 전송
- RabbitMQ를 이용한 메시지 수신
에 대해 알 수 있다.
RabbitMQ 사용
RabbitMQ는 JMS보다 진보된 메시지 라우팅 전략을 제공한다.
AMQP(Advenced Message Queing Protocol)을 따르는 오픈소스 메시지 브로커 중 하나이다.
RabbitMQ 구조
거래소와 큐 간의 관계
RabbitMQ 거래소로 전송되는 메시지는 라우팅 키와 바인딩을 기반으로 하나 이상의 큐에 전달된다.
거래소 타입 | 설명 |
---|---|
기본(Default) | 브로커가 자동으로 생성. 라우팅 키와 이름이 같은 큐로 메시지를 전달한다. 모든 큐는 자동으로 기본 거래소와 연결된다. |
다이렉트(Direct) | 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달한다. |
토픽(Topic) | 바인딩 키가 해당 메시지의 라우팅 키와 일치하는 하나 이상의 큐에 메시지를 전달한다. |
팬아웃(Fanout) | 바인딩 키나 라우팅 키에 상관없이 모든 연결된 큐에 메시지를 전달한다. |
헤더(Header) | 토픽 거래소와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다. |
데드 레터(Dead Letter) | 전달 불가능한(거래소, 큐 바인딩과 일치하지 않는) 모든 메시지를 보관하는 거래소이다. |
RabbitMQ 설정
스프링 부트의 AMQP를 프로젝트 빌드에 추가한다.
1implementation 'org.springframework.boot:spring-boot-starter-amqp'
AMQP 연결 팩토리와 RabbitTemplate 빈을 자동으로 구성해준다.
실무환경에서의 RabbitMQ 설정
스프링은 RabbitMQ 브로커가 localhost의 5672 포트를 리스닝하고 인증 정보가 필요없기 때문에 로컬에서 실행되는 개발 환경일 경우 설정값을 따로 지정해 줄 필요가 없다.
하지만 실무 환경에서는 브로커를 어떻게 사용하는지 application.yml에 속성 값을 지정해 줄 필요가 있다.
ActiveMQ 브로커의 위치와 인증 정보를 구성하는 속성
속성 | 설명 |
---|---|
spring.rabbitmq.addresses | 쉼표로 구분된 리스트 형태의 RabbitMQ 브로커 주소 |
spring.rabbitmq.host | 브로커의 호스트(기본값 localhost) |
spring.rabbitmq.port | 브로커의 포트(기본값 5672) |
spring.rabbitmq.username | 브로커를 사용하기 위한 사용자(선택) |
spring.rabbitmq.password | 브로커를 사용하기 위한 암호(선택) |
RabbitTemplate을 사용하여 메시지 전송
RabbitTemplate는 JmsTemplate과 유사한 메서드를 제공하지만 미세한 차이가 존재한다.
- JmsTemplate : 지정된 큐나 토픽에만 메시지 전송
- RabbitTemplate : 거래소와 라우팅 키의 형태로 메시지 전송
RabbitTemplate는 다음과 같은 메서드들을 가지고 있다.
- send() : 원시 메시지 전송
- convertAndSend() : 객체로부터 변환된 메시지 전송하거나 인자에 MessagePostProcessor를 포함하여 전송에 앞서 후처리되는 메시지 전송 가능
send() 메서드로 메시지 전송
1private RabbitTemplate rabbit;
2
3public void sendOrder(Order order) {
4 MessageConverter converter = rabbit.getMessageConverter();
5 MessageProperties props = new MessageProperties();
6 Message message = converter.toMessage(order, props);
7 rabbit.send("tacocloud.order", message);
8}
MessageConverter로 Order 객체를 Message 객체로 변환한다.
메시지 속성 제공 : MessageProperties - 메시지 속성 설정하지 않을 경우에는 기본 인스턴스 전달
거래소 기본값 설정
1spring:
2 rabbitmq:
3 template:
4 exchange: tacocloud.orders
5 routing-key: kitchens.central
기본 거래소 이름은 빈 문자열인 “”이며, RabbitMQ 브로커가 자동으로 생성하는 기본 거래소이다.
기본 라우팅 키도 거래소와 동일하게 “”이다.
기본값을 변경할 경우 해당 yaml 파일처럼 지정해준다. (exchange, routing-key 속성)
메시지 변환기 구성
메시지 변환기 | 하는 일 |
---|---|
Jackson2JsonMessageConverter | Jackson2JSONProcessor를 사용해서 객체를 JSON으로 상호 변환한다. |
MarshallingMessageConverter | 스프링 Marshaller와 Unmarshaller를 사용해서 변환한다. |
SerializerMessageConverter | 스프링의 Serializer와 Deserializer를 사용해서 String과 객체를 변환한다. |
SimpleMessageConverter | String, byte 배열, Serializable 타입을 변환한다. |
ContentTypeDelegatingMessageConverter | contentType 헤더를 기반으로 다른 메시지 변환기에 변환을 위임한다. |
기본적으로는 SimpleMessageConverter가 사용되며, 메시지 변환기를 변경해야 할 경우에는 MessageConverter 타입의 빈을 구성하면 된다.
메시지 변환기 변경 예제
1@Bean
2public MessageConverter messageConverter() {
3 return new Jackson2JsonMessageConverter();
4}
메시지 속성 설정
send() 메서드에서 속성 설정
1public void sendOrder(Order order) {
2 MessageConverter converter = rabbit.getMessageConverter();
3 MessageProperties props = new MessageProperties();
4 props.setHeader("X_ORDER_SOURCE", "WEB");
5 Message message = converter.toMessage(order, props);
6 rabbit.send("tacocloud.order", message);
7}
MessageProperties 인스턴스를 통해 커스텀 헤더를 설정할 수 있다.
convertAndSend() 메서드에서 속성 설정
1public void sendOrder(Order order) {
2 rabbit.convertAndSend("tacocloud.order.queue", order,
3 message -> {
4 MessageProperties props = message.getMessageProperties();
5 props.setHeader("X_ORDER_SOURCE", "WEB");
6 return message;
7 });
8}
convertAndSend() 메서드에서는 MessagePostProcessor를 사용하여 구현한다.
Message 객체의 MessageProperties를 가져온 후 setHeader()를 호출하여 헤더를 설정한다.
RabbitMQ로부터 메시지 수신
RabbitMQ 큐로부터의 메시지 수신도 JMS과 다르지 않다.
- 풀 모델(pull model) 기반 : RabbitTemplate를 통해 큐로부터 메시지 수신
- 푸시 모델(push model) 기반 : @RabbitListener가 지정된 메서드로 메시지 푸시
RabbitTemplate을 사용하여 메시지 수신
RabbitTemplate는 다음과 같은 메서드들을 가지고 있다.
- receive() : 원시 메시지 수신
- receiveAndConvert() : 메시지를 수신한 후 메시지 변환기를 사용하여 도메인 객체로 변환하고 반환
receive() 사용
1private RabbitTemplate rabbit;
2private MessageConverter converter;
3
4public Order receiveOrder() {
5 Message message = rabbit.receive("tacocloud.orders", 30000);
6 return message != null ? (Order) converter.fromMessage(message) : null;
7}
타입아웃 값인 30초가 지났을 경우에 Message 객체 혹은 null이 반환된다. (30초간 대기)
Message 객체 반환되었을 경우 : MessageConverter를 사용하여 Message 객체를 Order 객체로 변환
구성을 통해 타임아웃 값 지정
1spring:
2 rabbitmq:
3 template:
4 receive-timeout: 30000
receive() 메서드의 타임아웃 값을 제거하고 구성 파일을 통해 타임아웃 값을 설정할 수 있다.
receiveAndConvert() 사용
1private RabbitTemplate rabbit;
2
3public Order receiveOrder() {
4 return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
5}
메시지 변환기 대신 receiveAndConvert() 메서드를 이용하면 손쉽게 메시지 변환이 가능하다.
캐스팅 대신 특정 객체 수신
1public Order receiveOrder() {
2 return (Order) rabbit.receiveAndConvert("tacocloud.order.queue",
3 new ParameterizedTypeReference<Order>() {});
4}
ParameterizedTypeReference를 인자로 전달하여 직접 Order 객체를 수신하게 할 수 있다.
※ 메시지 변환기가 SmartMessageConverter 인터페이스를 구현한 클래스이어야 한다.
리스너 사용
스프링은 RabbitListener라는 RabbitMQ 리스너를 제공한다.
메시지가 큐에 도착할 때 메서드가 자동 호출되도록 하기 위해서는 @RabbitListener 어노테이션을 RabbitMQ 빈의 메서드에 지정해야 한다.
리스너 선언
1private KitchenUI ui;
2
3@RabbitListener(queues = "tacocloud.order.queue")
4public void receiveOrder(Order order) {
5 ui.displayOrder(order);
6}
리스너 선언은 어노테이션을 RabbitListener로 지정했다는 것 이외에는 JmsListener와 동일하다.
Posts in this Series
- [스프링인액션] JMX로 스프링 모니터링
- [스프링인액션] 스프링 관리
- [스프링인액션] 스프링 액추에이터 사용
- [스프링인액션] 실패와 지연 처리
- [스프링인액션] 클라우드 구성 관리
- [스프링인액션] 마이크로서비스 이해
- [스프링인액션] 리액티브 데이터 퍼시스턴스
- [스프링인액션] 리액티브 API 개발
- [스프링인액션] 리액터 개요
- [스프링인액션] 스프링 통합 플로우 사용
- [스프링인액션] 비동기 메시지 전송하기 - Kafka
- [스프링인액션] 비동기 메시지 전송하기 - RabbitMQ
- [스프링인액션] 비동기 메시지 전송하기 - JMS
- [스프링인액션] REST API 사용하기
- [스프링인액션] REST API 생성하기
- [스프링인액션] 구성 속성 사용
- [스프링인액션] 스프링 시큐리티
- [스프링인액션] 데이터로 작업하기
- [스프링인액션] 스프링 초기 설정