Table of Contents
[스프링인액션] 리액터 개요
개요
스프링인액션 10장을 읽고 리액터에 대한 기초 개념을 공부하였다.
- 리액티브 프로그래밍 이해
- 프로젝트 리액터
- 리액티브 데이터 오퍼레이션
에 대해 작성하였다.
리액티브 프로그래밍
어플리케이션을 개발할 때 두 가지 형태의 코드를 작성할 수 있다.
- 명령형 : 순차적으로 진행되는 작업. 각 작업은 한 번에 하나씩 처리된다.
- 리액티브 : 일련의 작업들이 정의되지만, 병렬로 실행 가능하다.
대부분의 프로그래밍 언어는 스레드를 통해 동시 작업을 처리하지만, 리액티브 프로그래밍은 사용 가능한 데이터가 존재할 때마다 처리할 수 있다.
→ 확장성과 성능 관점에서 리액티브 프로그래밍이 유리하다.
리액티브 스트림 정의
리액티브 스트림은 차단되지 않는 백 프레셔(backpressure)를 갖는 비동기 스트림 처리의 표준을 제공하는 것이 목적이다.
어떤 크기의 데이터셋이건 비동기 처리를 지원하고, 실시간으로 데이터를 처리하며, 백 프레셔를 사용하여 데이터 전달 폭주를 막는다.
리액티브 스트림 인터페이스
- Publisher : 발행자
- Subscriber : 구독자
- Subscription : 구독
- Processor : 프로세서
Publisher(발행자)
하나의 Subscription(구독)당 하나의 Subscriber(구독자)에 전송하는 데이터를 생성한다.
1public interface Publisher<T> {
2 void subscribe(Subscriber<? super T> subscriber);
3}
- subscribe() : Subscriber가 Publisher를 구독 신청할 수 있는 메서드
Subscriber(구독자)
Subscriber(구독자)가 구독 신청되면 Publisher(발행자)로부터 이벤트를 수신할 수 있다.
1public interface Subscriber<T> {
2 void onSubscribe(Subscription sub);
3 void onNext(T item);
4 void onError(Throwable ex);
5 void onComplete();
6}
- onSubscribe() : Publisher가 subscribe를 호출한 뒤에 호출되는 메서드이다. 이 메서드의 인자인 Subscription 객체를 Subscriber에 전달한다. - 이때 Subscription 객체는 Subscription.request() 메서드를 통해 받아온 객체이다.
- onNext() : Subscription.request() 에서 요청하는 데이터를 Publisher에게 전달하는 메서드
Subscription(구독)
데이터를 어떻게 전송하는 지에 대해 정의한다.
1public interface Subscription {
2 void request(long n);
3 void cancel();
4}
- request() : 전송되는 데이터를 요청한다. 해당 메서드의 인자는 Subscriber가 받고자 하는 데이터 항목의 수이다. - Subscriber가 처리할 수 있는 것보다 많은 데이터를 Publisher가 전송하는 것을 막아준다. (백프레셔)
- cancel() : 구독을 취소한다는 것을 나타낸다.
Processor(프로세서)
Subscriber 인터페이스와 Publisher 인터페이스를 결합한 것이다.
1public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
processor는 Subscriber 역할로 데이터를 수신, 처리하고, Publisher 역할로 처리 결과를 Subscriber에게 발행한다.
전반적인 플로우
- Publisher.subscribe() 메서드를 통해 Subscriber가 Publisher에게 구독을 요청한다.
- Publisher가 onSubscribe(subscription) 을 호출하여 Subscriber에게 Subscription을 전달한다.
- Subscriber는 Subscription.request()를 통해 데이터 항목 수를 요청한다.
- Publisher는 Subscription을 통해 Subscriber.onNext()로 데이터를 전달한다.
- 전달이 성공적이면 onComplete, 오류가 발생한다면 onError를 호출한다.
리액터 기본 개념 및 구성
리액터의 두 가지 핵심 타입으로는 Mono와 Flux가 있는데, 모두 리액티브 스트림의 Publisher 인터페이스를 구현한 것이다.
- Mono : 하나의 데이터 항목만 가지는 데이터셋에 최적화된 리액티브 타입
- Flux : 0, 1 또는 다수의 데이터를 가지는 파이프라인
리액티브 플로우
리액티브 플로우는 마블 다이어그램으로 나타낸다.
Mono의 기본적인 플로우를 보여주는 마블 다이어그램
Flux의 기본적인 플로우를 보여주는 마블 다이어그램
Mono는 0 또는 하나의 데이터 항목과 에러를 가지지만 Flux는 다수의 데이터를 가지고 있는 것을 알 수 있다.
리액터 빌드 명세 추가
1implementation 'io.projectreactor:reactor-core'
2testImplementation 'io.projectreactor:reactor-test'
스프링 부트가 dependency 관리를 자동으로 해주기 때문에 버전을 명시할 필요는 없다.
해당 dependency를 추가함으로써 Mono와 Flux를 사용하여 리액티브 파이프라인을 생성할 수 있다.
리액티브 오퍼레이션 적용
Flux와 Mono가 제공하는 오퍼레이션은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.
500 개의 오퍼레이션이 존재하는데, 다음과 같이 분류할 수 있다.
- 생성 오퍼레이션 (creation)
- 조합 오퍼레이션 (combination)
- 변환 오퍼레이션 (transformation)
- 로직 오퍼레이션 (logic)
리액티브 타입 생성
생성 오퍼레이션을 통해 데이터를 발행하는 리액티브 publisher를 생성할 수 있다.
객체로부터 생성
1@Test
2public void createAFlux_just() {
3 Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana");
4 fruitFlux.subscribe(
5 f -> System.out.println("Here's some fruit: " + f)
6 );
7}
Flux나 Mono의 just() 오퍼레이션을 사용하여 리액티브 타입을 생성할 수 있다.
Subscriber를 추가하기 위해 subscribe() 메서드를 호출한다. - 호출 즉시 데이터가 전달
컬렉션으로부터 생성
1@Test
2public void createAFlux_fromArray() {
3 String[] fruits = new String[]{"Apple", "Orange", "Grape", "Banana"};
4 Flux<String> fruitFlux = Flux.fromArray(fruits);
5}
배열로부터 Flux를 생성하려면 fromArray() 오퍼레이션을 호출한다.
1@Test
2public void createAFlux_fromIterable() {
3 List<String> fruitList = new ArrayList<>();
4 fruitList.add("Apple");
5 fruitList.add("Orange");
6 fruitList.add("Grape");
7 fruitList.add("Banana");
8 Flux<String> fruitFlux = Flux.fromIterable(fruitList);
9}
List, Set, Iterable의 다른 구현 컬렉션으로부터 Flux를 생성하려면 fromIterable() 오퍼레이션을 호출한다.
1@Test
2public void createAFlux_fromStream() {
3 Stream<String> fruitStream = Stream.of("Apple", "Orange", "Grape", "Banana");
4 Flux<String> fruitFlux = Flux.fromStream(fruitStream);
5}
자바 Stream 객체를 사용하여 Flux를 생성하려면 fromStream() 오퍼레이션을 호출한다.
테스트 수행
1StepVerifier.create(fruitFlux)
2 .expectNext("Apple")
3 .expectNext("Orange")
4 .expectNext("Grape")
5 .expectNext("Banana")
6 .verifyComplete();
Flux가 지정되면 StepVerifier는 해당 리액티브 타입(fruitFlux)을 구독한 후 전달되는 데이터에 대해 assertion을 적용한다. 그리고 마지막으로 fruitFlux가 완전한지 검사를 수행한다. 해당 테스트코드는 상단의 모든 Flux 데이터를 검사할 수 있다.
카운터 생성
1@Test
2public void createAFlux_range() {
3 Flux<Integer> intervalFlux = Flux.range(1, 5);
4
5 StepVerifier.create(intervalFlux)
6 .expectNext(1)
7 .expectNext(2)
8 .expectNext(3)
9 .expectNext(4)
10 .expectNext(5)
11 .verifyComplete();
12}
range() 오퍼레이션을 통해 1~5까지의 카운터 Flux를 생성하고 StepVerifier로 검사를 수행한다.
1@Test
2public void createAFlux_interval() {
3 Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
4
5 StepVerifier.create(intervalFlux)
6 .expectNext(0L)
7 .expectNext(1L)
8 .expectNext(2L)
9 .expectNext(3L)
10 .expectNext(4L)
11 .verifyComplete();
12}
interval() 오퍼레이션을 통해 매초마다 값을 증가시키는 카운터를 생성한다.
interval() 는 최대값이 지정되지 않으면 무한으로 실행되기 때문에 take() 오퍼레이션을 사용하여 5개의 항목까지만 실행되도록 제한하였다.
리액티브 타입 조합
조합 오퍼레이션을 통해 두 개의 리액티브 타입을 결합하거나 하나의 Flux를 두 개 이상의 리액티브 타입으로 분할할 수 있다.
리액티브 타입 결합
1@Test
2public void mergeFluxes() {
3 Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
4 .delayElements(Duration.ofMillis(500));
5 Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
6 .delaySubscription(Duration.ofMillis(250))
7 .delayElements(Duration.ofMillis(500));
8
9 Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
10
11 StepVerifier.create(mergedFlux)
12 .expectNext("Garfield")
13 .expectNext("Lasagna")
14 .expectNext("Kojak")
15 .expectNext("Lollipops")
16 .expectNext("Barbossa")
17 .expectNext("Apples")
18 .verifyComplete();
19}
하나의 Flux를 다른 Flux와 결합하기 위해 mergeWith() 오퍼레이션을 사용한다.
delayElements() 오퍼레이션을 사용하여 500 밀리초마다 일정한 속도로 방출되도록 설정했기 때문에 두 Flux값은 번갈아가며 mergeFlux에 넣어진다. 또한 delaySubscription() 오퍼레이션으로 foodFlux가 250 밀리초 뒤에 구독을 시작했기 때문에 characterFlux 데이터가 먼저 방출되었다.
→ mergeWith() 는 소스 Flux 값들이 완벽하게 번갈아 방출되게 보장할 수 없다.
1@Test
2public void zipFluxes() {
3 Flux<String> characterFlux = Flux
4 .just("Garfield", "Kojak", "Barbossa");
5 Flux<String> foodFlux = Flux
6 .just("Lasagna", "Lollipops", "Apples");
7
8 Flux<Tuple2<String, String>> zippedFlux =
9 Flux.zip(characterFlux, foodFlux);
10
11 StepVerifier.create(zippedFlux)
12 .expectNextMatches(p ->
13 p.getT1().equals("Garfield") &&
14 p.getT2().equals("Lasagna"))
15 .expectNextMatches(p ->
16 p.getT1().equals("Kojak") &&
17 p.getT2().equals("Lollipops"))
18 .expectNextMatches(p ->
19 p.getT1().equals("Barbossa") &&
20 p.getT2().equals("Apples"))
21 .verifyComplete();
22}
zip() 오퍼레이션을 사용한다면 각 Flux 소스로부터 한 항목씩 번갈아 가져와 새로운 Flux를 생성한다.
순서대로 방출되기 때문에 Tuple2라는 두 개의 다른 객체를 전달하는 컨테이너 객체를 이용했다.
리액티브 타입 방출 순서 변경
1@Test
2public void firstFlux() {
3 Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
4 .delaySubscription(Duration.ofMillis(100));
5 Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
6
7 Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
8
9 StepVerifier.create(firstFlux)
10 .expectNext("hare")
11 .expectNext("cheetah")
12 .expectNext("squirrel")
13 .verifyComplete();
14}
first() 오퍼레이션을 통해 먼저 값을 방출하는 Flux 값을 선택할 수 있다.
리액티브 타입 변환
변환 오퍼레이션을 통해 리액티브 스트림을 통해 전달되는 데이터를 변환하거나 필터링할 수 있다.
리액티브 타입 필터링
1@Test
2public void skipAFew() {
3 Flux<String> countFlux = Flux.just(
4 "one", "two", "skip a few", "ninety nine", "one hundred")
5 .skip(3);
6
7 StepVerifier.create(countFlux)
8 .expectNext("ninety nine", "one hundred")
9 .verifyComplete();
10}
skip() 오퍼레이션을 통해 지정된 수(3)의 항목을 건너뛰고 나머지 항목을 방출할 수 있다.
특정 시간이 경과할 때까지 기다렸다가 Flux 항목을 방출하려면 skip(Duration.ofSeconds(4))
의 형태로 사용할 수 있다.
1@Test
2public void take() {
3 Flux<String> nationalParkFlux = Flux.just(
4 "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
5 .take(3);
6
7 StepVerifier.create(nationalParkFlux)
8 .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
9 .verifyComplete();
10}
take() 오퍼레이션을 사용하면 지정된 수(3)의 항목만을 방출할 수 있다. - take() 오퍼레이션의 반대
skip() 오퍼레이션과 마찬가지로 특정 시간이 경과할 때까지 기다렸다가 Flux 항목을 방출하려면 take(Duration.ofSeconds(4))
의 형태로 사용할 수 있다.
1@Test
2public void filter() {
3 Flux<String> nationalParkFlux = Flux.just(
4 "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
5 .filter(np -> !np.contains(" "));
6
7 StepVerifier.create(nationalParkFlux)
8 .expectNext("Yellowstone", "Yosemite", "Zion")
9 .verifyComplete();
10}
filter() 오퍼레이션을 통해 조건식(공백이 없는 문자)에 해당하는 항목만 선택적으로 방출할 수 있다.
1@Test
2public void distinct() {
3 Flux<String> animalFlux = Flux.just(
4 "dog", "cat", "bird", "dog", "bird", "anteater")
5 .distinct();
6
7 StepVerifier.create(animalFlux)
8 .expectNext("dog", "cat", "bird", "anteater")
9 .verifyComplete();
10}
distinct() 오퍼레이션을 통해 발행된 적이 없는(고유한) Flux 항목만 방출할 수 있다.
리액티브 데이터 매핑
1@Test
2public void map() {
3 Flux<Player> playerFlux = Flux
4 .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
5 .map(n -> {
6 String[] split = n.split("\\s");
7 return new Player(split[0], split[1]);
8 });
9
10 StepVerifier.create(playerFlux)
11 .expectNext(new Player("Michael", "Jordan"))
12 .expectNext(new Player("Scottie", "Pippen"))
13 .expectNext(new Player("Steve", "Kerr"))
14 .verifyComplete();
15}
map() 오퍼레이션을 통해 발행된 항목을 다른 타입으로 매핑(변환)시킬 수 있다.
각 항목이 Flux로부터 발행될 때 동기적으로(순차적으로) 매핑이 수행된다.
1@Test
2public void flatMap() {
3 Flux<Player> playerFlux = Flux
4 .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
5 .flatMap(n -> Mono.just(n)
6 .map(p -> {
7 String[] split = p.split("\\s");
8 return new Player(split[0], split[1]);
9 })
10 .subscribeOn(Schedulers.parallel())
11 );
12
13 List<Player> playerList = Arrays.asList(
14 new Player("Michael", "Jordan"),
15 new Player("Scottie", "Pippen"),
16 new Player("Steve", "Kerr"));
17
18 StepVerifier.create(playerFlux)
19 .expectNextMatches(p -> playerList.contains(p))
20 .expectNextMatches(p -> playerList.contains(p))
21 .expectNextMatches(p -> playerList.contains(p))
22 .verifyComplete();
23}
flatMap() 오퍼레이션을 subscribeOn()과 함께 사용하면 리액터 타입 변환을 비동기적으로 수행할 수 있다. 작업이 병행으로 수행되므로 어떤 작업이 먼저 끝날 지 보장이 되지 않는다.
※ flatMap() 오퍼레이션은 수행 도중 생성되는 임시 Flux를 사용하여 변환을 수행하므로 비동기 변환이 가능하다.
Schedulers의 동시성 모델 종류
Schedulers 메서드 | 설명 |
---|---|
.immediate() | 현재 스레드에서 구독 실행 |
.single() | 단일의 재사용 가능한 스레드에서 구독 실행. 모든 호출자에 대해 동일한 스레드를 재사용한다. |
.newSingle() | 매 호출마다 전용 스레드에서 구독 실행 |
.elastic() | 무한하고 신축성 있는 풀에서 가져온 작업 스레드에서 구독 실행. 필요 시 새로운 작업 스레드 생성 |
.parallel() | 고정된 크기의 풀에서 가져온 작업 스레드에서 구독 실행. CPU 코어의 개수가 크기가 된다. |
리액티브 데이터 버퍼링
1@Test
2public void bufferAndFlatMap() {
3 Flux.just(
4 "apple", "orange", "banana", "kiwi", "strawberry")
5 .buffer(3)
6 .flatMap(x ->
7 Flux.fromIterable(x)
8 .map(y -> y.toUpperCase())
9 .subscribeOn(Schedulers.parallel())
10 .log()
11 ).subscribe();
12
13 StepVerifier
14 .create(bufferedFlux)
15 .expectNext(Arrays.asList("apple", "orange", "banana"))
16 .expectNext(Arrays.asList("kiwi", "strawberry"))
17 .verifyComplete();
18}
buffer() 오퍼레이션을 통해 데이터 스트림을 분할하여 지정된 최대 크기의 리스트로 된 Flux를 생성한다.
flatMap()을 같이 사용하면 각 List 컬렉션을 별도의 스레드에서 병행으로 계속 처리할 수 있다.
❗ 모든 항목을 List로 모을 경우에는 인자 없이 buffer()
를 호출한다.
1@Test
2public void collectList() {
3 Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
4
5 Mono<List<String>> fruitListMono = fruitFlux.collectList();
6
7 StepVerifier.create(fruitListMono)
8 .expectNext(Arrays.asList(
9 "apple", "orange", "banana", "kiwi", "strawberry"))
10 .verifyComplete();
11}
collection() 오퍼레이션을 통해 입력한 Flux가 방출한 모든 메시지를 가지는 List 타입의 Mono를 생성할 수 있다.
1@Test
2public void collectMap() {
3 Flux<String> animalFlux = Flux.just(
4 "aardvark", "elephant", "koala", "eagle", "kangaroo");
5
6 Mono<Map<Character, String>> animalMapMono = animalFlux.collectMap(a -> a.charAt(0));
7
8 StepVerifier
9 .create(animalMapMono)
10 .expectNextMatches(map -> {
11 return
12 map.size() == 3 &&
13 map.get('a').equals("aardvark") &&
14 map.get('e').equals("eagle") &&
15 map.get('k').equals("kangaroo");
16 })
17 .verifyComplete();
18}
collectionMap() 오퍼레이션은 Map을 포함하는 Mono를 생성할 수 있다.
Flux가 방출된 메시지가 Map의 항목으로 지정되며, 각 항목의 키는 collectMap에서 설정한 값을 따른다.
로직 오퍼레이션 수행
Mono나 Flux가 발행한 항목이 어떤 조건과 일치하는지 알기 위해 사용한다.
모든 항목 확인
1@Test
2public void all() {
3 Flux<String> animalFlux = Flux.just(
4 "aardvark", "elephant", "koala", "eagle", "kangaroo");
5
6 Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
7 StepVerifier.create(hasAMono)
8 .expectNext(true)
9 .verifyComplete();
10
11 Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
12 StepVerifier.create(hasKMono)
13 .expectNext(false)
14 .verifyComplete();
15}
all() 오퍼레이션을 통해 모든 메시지가 조건에 충족하는지 확인할 수 있다.
결과는 Boolean 타입의 Mono로 생성되는데, 조건에 충족할 경우 true, 조건에 충족하지 않는다면 false를 방출한다.
최소한 하나의 항목만 확인
1@Test
2public void any() {
3 Flux<String> animalFlux = Flux.just(
4 "aardvark", "elephant", "koala", "eagle", "kangaroo");
5
6 Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("a"));
7
8 StepVerifier.create(hasAMono)
9 .expectNext(true)
10 .verifyComplete();
11
12 Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
13 StepVerifier.create(hasZMono)
14 .expectNext(false)
15 .verifyComplete();
16}
any() 오퍼레이션을 통해 최소한 하나의 메시지가 조건을 충족하는지 확인할 수 있다.
최소한 하나의 메시지가 조건에 충족한다면 true, 모두 조건에 충족하지 않는다면 false를 방출한다.
Posts in this Series
- [스프링인액션] JMX로 스프링 모니터링
- [스프링인액션] 스프링 관리
- [스프링인액션] 스프링 액추에이터 사용
- [스프링인액션] 실패와 지연 처리
- [스프링인액션] 클라우드 구성 관리
- [스프링인액션] 마이크로서비스 이해
- [스프링인액션] 리액티브 데이터 퍼시스턴스
- [스프링인액션] 리액티브 API 개발
- [스프링인액션] 리액터 개요
- [스프링인액션] 스프링 통합 플로우 사용
- [스프링인액션] 비동기 메시지 전송하기 - Kafka
- [스프링인액션] 비동기 메시지 전송하기 - RabbitMQ
- [스프링인액션] 비동기 메시지 전송하기 - JMS
- [스프링인액션] REST API 사용하기
- [스프링인액션] REST API 생성하기
- [스프링인액션] 구성 속성 사용
- [스프링인액션] 스프링 시큐리티
- [스프링인액션] 데이터로 작업하기
- [스프링인액션] 스프링 초기 설정