Thumbnail image

Table of Contents

[스프링인액션] 비동기 메시지 전송하기 - Kafka

개요


스프링인액션 8장을 읽고 스프링이 제공하는 비동기 메시징을 공부하였다.

  • Kafka 구조 및 설정
  • Kafka 이용한 메시지 전송
  • Kafka 이용한 메시지 수신

에 대해 알 수 있다.

카프카 사용


아파치 카프카는 가장 새로운 메시징 시스템이다.
카프카는 높은 확장성을 제공하는 클러스터로 실행되도록 설계되었는데, 클러스터의 모든 카프카 인스턴스에 걸쳐 토픽을 파티션으로 분할하여 메시지를 관리한다.

→ RabbitMQ가 거래소와 큐를 사용해서 메시지를 처리하는 반면, 카프카는 토픽만 사용한다.

카프카의 구조

각 브로커는 토픽의 파티션의 리더로 동작

각 브로커는 토픽의 파티션의 리더로 동작

카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제되는 형태이다.
클러스터의 각 노드는 하나 이상의 토픽에 대한 리더(leader)로 동작하며, 토픽 데이터를 관리하고 클러스터의 다른 노드로 데이터를 복제한다.
각 토픽은 여러 개의 파티션으로 분할될 수 있다.

카프카 설정

카프카를 프로젝트 빌드에 추가한다.
JMS나 RabbitMQ와 달리 카프카는 스프링 부트 스타터가 없다.

1implementation 'org.springframework.kafka:spring-kafka:2.1.8.RELEASE'

이를 통해 KafkaTemplate을 사용할 수 있다.

실무환경에서의 카프카 설정

카프카는 기본적으로 localhost의 9092 포트를 사용한다. 로컬에서 실행되는 개발 환경일 경우 로컬의 카프카 브로커를 사용하면 좋다.
하지만 실무 환경에서는 카프카의 호스트나 포트를 application.yaml 파일에 구성해야 한다.

카프카 서버 설정

1spring:
2  kafka:
3    bootstrap-servers:
4      - kafka.tacocloud.com:9092
5      - kafka.tacocloud.com:9092
6      - kafka.tacocloud.com:9092

spring.kafka.bootstrap-servers 속성을 통해 카프카 클러스터로의 초기 연결에 사용되는 하나 이상의 카프카 서버 위치를 설정할 수 있다. 이 속성은 복수형이며, 여러 서버를 지정할 수 있다.

KafkaTemplate을 사용하여 메시지 전송


KafkaTemplate 메서드들은 JMS나 RabbitMQ의 메서드들과 유사하지만 다른 부분도 존재한다.

  1. convertAndSend() 메서드 존재 여부
    KafkaTemplate은 제너릭 타입을 사용하고 메시지를 전송할 때 직접 도메인 타입을 처리할 수 있기 때문에 convertAndSend() 메서드가 없다.

  2. send()와 sendDefault() 메서드의 매개변수
    카프카에서 메시지를 전송할 때는 (1) 메시지가 전송될 토픽, (2) 토픽 데이터를 쓰는 파티션, (3) 레코드 전송 키, (4) 타임스탬프(기본값 System.currentTimeMillis()), (5) 페이로드를 매개변수로 지정할 수 있다.

여기서 필수값은 메시지가 전송될 토픽과 페이로드이다.

send() 메서드로 메시지 전송

1private KafkaTemplate<String, Order> kafkaTemplate;
2
3public void sendOrder(Order order) {
4  kafkaTemplate.send("tacocloud.orders.topic", order);
5}

send() 메서드를 이용하여 “tacocloud.orders.topic"이라는 이름의 토픽으로 Order 객체를 전송한다.

sendDefault() 메서드로 메시지 전송

1private KafkaTemplate<String, Order> kafkaTemplate;
2
3public void sendOrder(Order order) {
4  kafkaTemplate.sendDefault(order);
5}

기본 토픽으로 Order 객체를 전송하고 싶으면 토픽 이름을 인자로 전달하지 않고 send() 메소드 대신 sendDefault() 메서드를 사용한다.

기본 토픽 설정

1spring:
2  kafka:
3    template:
4      default-topic: tacocloud.orders.topic

기본 토픽은 spring.kafka.template.default-topic 속성을 이용하면 된다.

카프카 리스너 작성


KafkaTemplate는 메시지를 수신하는 메서드를 일체 제공하지 않는다.
따라서 스프링을 사용하여 카프카 토픽의 메시지를 가져오는 유일한 방법은 메시지 리스너를 작성하는 것이다.

리스너를 이용하여 메시지 수신

1private KitchenUI ui;
2
3@KafkaListener(topics="tacocloud.orders.topic")
4public void handle(Order order) {
5  ui.displayOrder(order);
6}

@KafkaListener 어노테이션을 지정하여 “tacocloud.orders.topic” 이름의 토픽에 메시지가 도착할 때 자동 호출되도록 설정하였다.

추가적인 메타데이터가 필요한 경우

ConsumerRecord나 Message 객체를 인자로 받아 메시지의 메타데이터를 받을 수 있다.

ConsumerRecord 사용

1private KitchenUI ui;
2
3@KafkaListener(topics="tacocloud.orders.topic")
4public void handle(Order order, ConsumerRecord<String, Order> record) {
5  log.info("파티션 : {}, timestamp : {}", record.partition(), record.timestamp());
6  
7  ui.displayOrder(order);
8}

ConsumerRecord를 통해 수신된 메시지의 파티션과 타입스탬프를 로깅하는 코드이다.

Message 객체 사용

 1private KitchenUI ui;
 2
 3@KafkaListener(topics="tacocloud.orders.topic")
 4public void handle(Order order, Message<Order> message) {
 5  MessageHeaders headers = message.getHeaders();
 6  log.info("Received from partition {} with timestamp {}",
 7      headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
 8      headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
 9  ui.displayOrder(order);
10}

Message 객체를 통해 메시지의 파티션과 타입스탬프를 로깅하는 코드이다.

Posts in this Series