![Thumbnail image](/images/spring-basic.png)
Table of Contents
[스프링인액션] 스프링 통합 플로우 사용
개요
스프링인액션 9장을 읽고 통합 패턴을 사용하는 법을 공부하였다.
- 실시간으로 데이터 처리
- 통합 플로우 정의
- 자바 DSL 정의 사용
- 다른 외부 시스템과 통합
에 대해 작성하였다.
통합 플로우 선언
어플리케이션은 통합 플로우를 통해 외부 리소스나 어플리케이션 자체에 데이터를 수신/전송할 수 있다.
스프링 통합 중 “채널 어뎁터”는 파일을 읽거나 쓰는 컴포넌트이다.
스프링 통합 빌드 명세
1implementation 'org.springframework.boot:spring-boot-starter-integration'
2implementation 'org.springframework.integration:spring-integration-file'
스프링 통합 플로우 개발을 위해 스프링 부트 스타터인 스프링 통합을 추가해준다.
또한 파일 시스템으로부터 통합 플로우로 파일을 읽거나, 데이터를 쓰기 위해 스프링 통합 파일 엔드포인트 모듈도 추가해준다.
게이트웨이 생성
파일에 데이터를 쓸 수 있도록 어플리케이션에서 통합 플로우로 데이터를 전송하는 게이트웨이를 생성한다.
메서드 호출을 메시지로 변환
1@MessagingGateway(defaultRequestChannel="textInChannel")
2public interface FileWriterGateway {
3 void writeToFile(@Header(FileHeaders.FILENAME) String filename,String data);
4}
- @MessagingGateway : 해당 인터페이스의 구현체를 런타임 시 생성하라고 스프링에게 전달
- defaultRequestChannel : 메서드 호출로 생성된 메시지가 이 속성의 채널로 전송
writeToFile() 메서드의 인자
- @Header : filename에 전달되는 값이 메시지 헤더에 있다는 것을 명시
- data : 해당 매개변수 값은 메시지 페이로드로 전달
통합 플로우 구성
통합 플로우 구성에는 3가지 방법이 존재한다.
- XML 구성
- 자바 구성
- DSL을 이용한 자바 구성
통합 플로우 구성 예시
파일-쓰기 통합 플로우
자바 구성
1@Configuration
2public class FileWriterIntegrationConfig {
3
4 @Bean
5 @Transformer(inputChannel="textInChannel", outputChannel="fileWriterChannel")
6 public GenericTransformer<String, String> upperCaseTransformer() {
7 return text -> text.toUpperCase();
8 }
9
10 @Bean
11 @ServiceActivator(inputChannel="fileWriterChannel")
12 public FileWritingMessageHandler fileWriter() {
13 FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("/tmp/sia5/files"));
14 handler.setExpectReply(false);
15 handler.setFileExistsMode(FileExistsMode.APPEND);
16 handler.setAppendNewLine(true);
17 return handler;
18 }
19}
GenericTransformer (변환기 역할) : @Transformer을 통해 textInChannel의 메시지를 받아서 fileWirterChannel로 쓰는 통합 플로우 변환기라는 것을 명시한다.
FileWritingMessageHandler (파일 아웃바운드 채널 어뎁터 역할) : @ServiceActivator을 통해 fileWriterChannel로부터 메시지를 받아 FileWritingMessageHandler의 인스턴스로 정의된 서비스에 넘겨 메시지 페이로드를 지정된 디렉터리의 파일에 쓴다.
DSL을 이용한 자바 구성
1@Configuration
2public class FileWriterIntegrationConfig {
3
4 @Bean
5 public IntegrationFlow fileWriterFlow() {
6 return IntegrationFlows
7 .from(MessageChannels.direct("textInChannel")) // 입력 채널
8 .<String, String>transform(t -> t.toUpperCase()) // 변환기 선언
9 .channel(MessageChannels.direct("fileWriterChannel")) // 파일-쓰기 채널
10 .handle(Files // 아웃바운드 채널 어뎁터
11 .outboundAdapter(new File("/tmp/sia5/files"))
12 .fileExistsMode(FileExistsMode.APPEND)
13 .appendNewLine(true))
14 .get();
15 }
16
17}
통합 플로우의 각 컴포넌트를 별도의 빈으로 선언하지 않고 전체 플로우를 하나의 빈으로 선언한다.
구조
- textInChannel이라는 이름의 채널로부터 메시지 수신
- 메시지 페이로드를 대문자로 바꾸는 변환기 실행
- 변환기의 메시지를 아웃바운드 채널 어댑터로 전달
- 변환된 아웃바운드 채널 어댑터에서 변환된 메시지 처리
- get() 메서드를 호출하여 IntegrationFlow 인스턴스를 가져온다.
DSL 구성 또한 채널을 별도로 구성하지 않는다면 채널이 자동으로 생성된다.
스프링 통합 컴포넌트
컴포넌트 역할
- 채널 : 한 요소로부터 다른 요소로 메시지 전달
- 필터 : 조건에 맞는 메시지만 플로우 통과
- 변환기 : 메시지 값을 변경하거나 메시지 페이로드의 타입 변환
- 라우터 : 여러 채널 중 하나로 메시지 전달 (주로 메시지 헤더 기반)
- 분배기 : 들어오는 메시지를 두 개 이상의 메시지로 분할
- 집적기 : 별개의 채널로부터 전달되는 다수의 메시지를 하나로 결합
- 서비스 액티베이터 : 자바 메서드에 메시지를 넘긴 후 반환값을 출력 채널로 전송
- 채널 어댑터 : 외부 시스템에 채널 연결. 외부 시스템으로부터 입력을 받거나 쓸 수 있다.
- 게이트웨이 : 인터페이스를 통해 통합 플로우로 데이터 전달.
메시지 채널
통합 파이프라인을 통해 메시지가 이동하는 수단.
→ “통합 플로우의 서로 다른 컴포넌트 간에 데이터를 전달하는 통로”
스프링 통합이 제공하는 채널 구현체
- PublishSubscribeChannel : 하나 이상의 컨슈머로 전달
- QueueChannel : FIFO 방식으로 컨슈머가 가져갈 때까지 큐에 저장
- PriorityChannel : QueueChannel와 비슷하지만, 메시지의 priority 헤더 기반으로 메시지 획득
- RendezvousChannel : QueueChannel와 비슷하지만, 컨슈머가 메시지를 수신할 때까지 전송자가 채널을 차단
- DirectChannel : PublishSubscribeChannel과 비슷하지만, 전송자와 동일한 스레드인 단일 컨슈머에게 메시지 전송 - 트랜잭션 지원
- ExecutorChannel : DirectChannel과 비슷하지만, TaskExecutor를 통해 메시지 전송(다른 스레드) - 트랜잭션 지원 X
- FluxMessageChannel : Flux를 기반으로 하는 Reactive Streams Publisher 채널
자바 구성과 DSL 구성 모두 입력 채널은 자동으로 생성되며, 기본적으로 DirectChannel이 사용된다.
필터
보통 통합 파이프라인 중간에 위치하며, 플로우의 전 단계로부터 다음 단계로의 메시지 전달을 허용하거나 불허한다.
자바 구성 예시
1@Filter(inputChannel="numberChannel", outputChannel="evenNumberChannel")
2public boolean evenNumberFilter(Integer number) {
3 return number % 2 == 0;
4}
@Filter 어노테이션을 통해 필터를 선언할 수 있다.
DSL 구성 예시
1@Bean
2public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
3 return IntegrationFlows
4 .<Integer>filter((p) -> p % 2 == 0)
5 .get();
6}
변환기
메시지 값의 변경이나 타입을 변환하는 일을 수행한다.
자바 구성 예시
1@Bean
2@Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel")
3public GenericTransformer<Integer, String> romanNumTransformer() {
4 return RomanNumbers::toRoman;
5}
@Transformer 어노테이션을 통해 변환기 임을 선언한다.
numberChannel 채널로부터 Integer 값을 수신하고 toRoman()을 사용하여 변환을 수행한다.
DSL 구성 예시
1@Bean
2public IntegrationFlow transformerFlow() {
3 return IntegrationFlows
4 .transform(RomanNumbers::toRoman)
5 .get();
6}
라우터
메시지에 적용된 전달 조건을 기반으로 서로 다른 채널로 메시지를 전달한다.
자바 구성 예시
1@Bean
2@Router(inputChannel="numberChannel")
3public AbstractMessageRouter evenOddRouter() {
4 return new AbstractMessageRouter() {
5 @Override
6 protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
7 Integer number = (Integer) message.getPayload();
8 if (number % 2 == 0) {
9 return Collections.singleton(evenChannel());
10 }
11 return Collections.singleton(oddChannel());
12 }
13 };
14}
numberChannel이름의 채널로부터 메시지를 받아 페이로드를 검사하여
짝수일 때는 evenChannel 채널을 반환, 홀수일 때는 oddChannel 채널이 반환된다.
DSL 구성 예시
1@Bean
2public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
3 return IntegrationFlows
4 .<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping
5 .subFlowMapping("EVEN",
6 sf -> sf.<Integer, Integer>transform(n -> n * 10)
7 .handle((i, h) -> { ... })
8 )
9 .subFlowMapping("ODD",
10 sf -> sf.transform(RomanNumbers::toRoman)
11 .handle((i, h) -> { ... })
12 )
13 )
14 .get();
15}
람다식을 사용하여 짝수일 때는 “EVEN”, 홀수일 떄는 “ODD”값을 반환하고, 이 값들을 통해 메시지를 처리하는 하위 플로우를 결정한다.
분배기
통합 플로우에서 하나의 메시지를 여러개로 분할하여 독립적으로 처리한다.
자바 구성 예시
1@Bean
2@Splitter(inputChannel="poChannel" outputChannel="splitOrderChannel")
3public OrderSplitter orderSplitter() {
4 return new OrderSplitter();
5}
6
7@Bean
8@Router(inputChannel="splitOrderChannel")
9public MessageRouter splitOrderRouter() {
10 PayloadTypeRouter router = new PayloadTypeRouter();
11 router.setChannelMapping(BillingInfo.class.getName(), "billingInfoChannel");
12 router.setChannelMapping(List.class.getName(), "lineItemsChannel");
13 return router;
14}
poChannel 채널로 도착한 주문 메시지는 OrderSplitter로 분할된다.
분할된 메시지는 PayloadTypeRouter에 의해 각 페이로드 타입을 기반으로 서로 다른 채널에 메시지를 전달한다. BillingInfo 파입의 페이로드는 billingInfoChannel로, List 타입의 페이로드는 lineItemsChannel에 전달된다.
DSL 구성 예시
1@Bean
2public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
3 return IntegrationFlows
4 .split(orderSplitter())
5 .<Object, String>route(
6 p -> {
7 if (p.getClass().isAssignableFrom(BillingInfo.class)) {
8 return "BILLING_INFO";
9 } else {
10 return "LINE_ITEMS";
11 }
12 }, mapping -> mapping
13 .subFlowMapping("BILLING_INFO",
14 sf -> sf.<BillingInfo> handle((billingInfo, h) -> { ... }))
15 .subFlowMapping("LINE_ITEMS",
16 sf -> sf.split().<LineItem> handle((lineItem, h) -> { ... }))
17 )
18 .get();
19}
서비스 엑티베이터
입력 채널로부터 메시지를 수신하고 이 메시지를 MessageHandler 인터페이스를 구현한 클래스(빈)에 전달한다.
MessageHandler 빈 선언
1@Bean
2@ServiceActivator(inputChannel="someChannel")
3public MessageHandler sysoutHandler() {
4 return message -> {
5 System.out.println("Message Payload: " + message.getPayload());
6 };
7}
someChannel 채널로부터 받은 메시지를 처리하는 서비스 액티베이터를 지정한다.
메시지의 페이로드를 표준 출력 시스템으로 내보낸다.
새로운 페이로드로 반환 시
1@Bean
2@ServiceActivator(inputChannel="orderChannel", outputChannel="completeChannel")
3public GenericHandler<Order> orderHandler(OrderRepository orderRepo) {
4 return (payload, headers) -> {
5 return orderRepo.save(payload);
6 };
7}
새로운 페이로드를 반환하려면 GenericHandler를 구현해야 한다.
Order 타입의 메시지 페이로드를 처리하는 GenericHandler를 구현하여 메시지가 도착하면 repository에 저장한다.
MessageHandler 빈 선언(DSL)
1@Bean
2public IntegrationFlow someFlow() {
3 return IntegrationFlows
4 .handle(msg -> {
5 System.out.println("Message Payload: " + msg.getPayload());
6 })
7 .get();
8}
새로운 페이로드로 반환 시(DSL)
1@Bean
2public IntegrationFlow orderFlow(OrderRepository orderRepo) {
3 return IntegrationFlows
4 .<Order>handle((payload, headers) -> {
5 return orderRepo.save(payload);
6 })
7 .get();
8}
GenericHandler를 플로우 맨 끝에 사용할 경우에는 출력 채널이 없다는 에러가 발생하기 때문에 null을 반환해야 한다.
게이트웨이
어플리케이션이 통합 플로우로 데이터를 제출하고 선택적으로 플로우의 응답 결과를 받을 수 있다.
자바 구성 예시
1@Component
2@MessageGateway(defaultRequestChannel="inChannel",
3 defaultReplyChannel="outChannel")
4public interface UpperCaseGateway {
5 String uppercase(String in);
6}
스프링 통합이 런타임 시에 자동으로 구현체를 제공하기 때문에 인터페이스를 구현할 필요가 없다.
DSL 구성 예시
1@Bean
2public IntegrationFlow uppercaseFlow() {
3 return IntegrationFlows
4 .from("inChannel")
5 .<String, String>transform(s -> s.toUpperCase())
6 .channel("outChannel")
7 .get();
8}
채널 어댑터
통합플로우의 입구와 출구를 나타낸다. 데이터는 인바운드 채널 어댑터를 통해 들어오고, 아웃바운드 채널 어댑터를 통해 통합 플로우에서 나간다.
인바운드 채널 어댑터 구성
1@Bean
2@InboundChannelAdapter(poller=@Poller(fixedRate="1000"), channel="numberChannel")
3public MessageSource<Integer> numberSource(AtomicInteger source) {
4 return () -> {
5 return new GenericMessage<>(source.getAndIncrement());
6 };
7}
@InboundChannelAdapter 어노테이션을 통해 인바운드 채널 어댑터 빈으로 선언된다.
AtomicInteger로부터 numberChannel 채널로 매초마다 한번씩 데이터를 전달한다.
인바운드 채널 어댑터 구성(DSL)
1@Bean
2public IntegrationFlow someFlow(AtomicInteger integerSource) {
3 return IntegrationFlows
4 .from(integerSource, "getAndIncrement",
5 c -> c.poller(Pollers.fixedRate(1000)))
6 .get();
7}
DSL 구성에서는 from() 메서드를 통해 인바운드 채널 어댑터를 구성한다.
엔드포인트 모듈
스프링 통합은 커스텀 채널 어댑터를 생성할 수 있게 채널 어댑터가 포함된 엔드포인트 모듈을 제공한다.
Module | Inbound Adapter | Outbound Adapter | Inbound Gateway | Outbound Gateway |
---|---|---|---|---|
AMQP | Inbound Channel Adapter | Outbound Channel Adapter | Inbound Gateway | Outbound Gateway |
Events | Receiving Spring Application Events | Sending Spring Application Events | N | N |
Feed | Feed Inbound Channel Adapter | N | N | N |
File | Reading Files and ’tail’ing Files | Writing files | N | Writing files |
FTP(S) | FTP Inbound Channel Adapter | FTP Outbound Channel Adapter | N | FTP Outbound Gateway |
Gemfire | Inbound Channel Adapter and Continuous Query Inbound Channel Adapter | Outbound Channel Adapter | N | N |
HTTP | HTTP Namespace Support | HTTP Namespace Support | Http Inbound Components | HTTP Outbound Components |
JDBC | Inbound Channel Adapter and Stored Procedure Inbound Channel Adapter | Outbound Channel Adapter and Stored Procedure Outbound Channel Adapter | N | Outbound Gateway and Stored Procedure Outbound Gateway |
JMS | Inbound Channel Adapter and Message-driven Channel Adapter | Outbound Channel Adapter | Inbound Gateway | Outbound Gateway |
JMX | Notification-listening Channel Adapter and Attribute-polling Channel Adapter and Tree-polling Channel Adapter | Notification-publishing Channel Adapter and Operation-invoking Channel Adapter | N | Operation-invoking Outbound Gateway |
JPA | Inbound Channel Adapter | Outbound Channel Adapter | N | Updating Outbound Gateway and Retrieving Outbound Gateway |
Apache Kafka | Message Driven Channel Adapter and Inbound Channel Adapter | Outbound Channel Adapter | Inbound Gateway | Outbound Gateway |
Mail-receiving Channel Adapter | Mail-sending Channel Adapter | N | N | |
MongoDB | MongoDB Inbound Channel Adapter | MongoDB Outbound Channel Adapter | N | N |
MQTT | Inbound (Message-driven) Channel Adapter | Outbound Channel Adapter | N | N |
R2DBC | R2DBC Inbound Channel Adapter | R2DBC Outbound Channel Adapter | N | N |
Redis | Redis Inbound Channel Adapter, Redis Queue Inbound Channel Adapter, Redis Store Inbound Channel Adapter, Redis Stream Inbound Channel Adapter | Redis Outbound Channel Adapter, Redis Queue Outbound Channel Adapter, RedisStore Outbound Channel Adapter, Redis Stream Outbound Channel Adapter | Redis Queue Inbound Gateway | Redis Outbound Command Gateway and Redis Queue Outbound Gateway |
Resource | Resource Inbound Channel Adapter | N | N | N |
RMI | N | N | Inbound RMI | Outbound RMI |
RSocket | N | N | RSocket Inbound Gateway | RSocket Outbound Gateway |
SFTP | SFTP Inbound Channel Adapter | SFTP Outbound Channel Adapter | N | SFTP Outbound Gateway |
STOMP | STOMP Inbound Channel Adapter | STOMP Outbound Channel Adapter | N | N |
Stream | Reading from Streams | Writing to Streams | N | N |
Syslog | Syslog Inbound Channel Adapter | N | N | N |
TCP | TCP Adapters | TCP Adapters | TCP Gateways | TCP Gateways |
UDP | UDP Adapters | UDP Adapters | N | N |
WebFlux | WebFlux Inbound Channel Adapter | WebFlux Outbound Channel Adapter | Inbound WebFlux Gateway | Outbound WebFlux Gateway |
Web Services | N | N | Inbound Web Service Gateways | Outbound Web Service Gateways |
Web Sockets | WebSocket Inbound Channel Adapter | WebSocket Outbound Channel Adapter | N | N |
XMPP | XMPP Messages and XMPP Presence | XMPP Messages and XMPP Presence | N | N |
ZeroMQ | ZeroMQ Inbound Channel Adapter | ZeroMQ outbound Channel Adapter | N | N |
출처 : https://docs.spring.io/spring-integration/reference/html/endpoint-summary.html
Posts in this Series
- [스프링인액션] JMX로 스프링 모니터링
- [스프링인액션] 스프링 관리
- [스프링인액션] 스프링 액추에이터 사용
- [스프링인액션] 실패와 지연 처리
- [스프링인액션] 클라우드 구성 관리
- [스프링인액션] 마이크로서비스 이해
- [스프링인액션] 리액티브 데이터 퍼시스턴스
- [스프링인액션] 리액티브 API 개발
- [스프링인액션] 리액터 개요
- [스프링인액션] 스프링 통합 플로우 사용
- [스프링인액션] 비동기 메시지 전송하기 - Kafka
- [스프링인액션] 비동기 메시지 전송하기 - RabbitMQ
- [스프링인액션] 비동기 메시지 전송하기 - JMS
- [스프링인액션] REST API 사용하기
- [스프링인액션] REST API 생성하기
- [스프링인액션] 구성 속성 사용
- [스프링인액션] 스프링 시큐리티
- [스프링인액션] 데이터로 작업하기
- [스프링인액션] 스프링 초기 설정