Thumbnail image

Table of Contents

[스프링인액션] 리액터 개요

개요


스프링인액션 10장을 읽고 리액터에 대한 기초 개념을 공부하였다.

  • 리액티브 프로그래밍 이해
  • 프로젝트 리액터
  • 리액티브 데이터 오퍼레이션

에 대해 작성하였다.

리액티브 프로그래밍


어플리케이션을 개발할 때 두 가지 형태의 코드를 작성할 수 있다.

  • 명령형 : 순차적으로 진행되는 작업. 각 작업은 한 번에 하나씩 처리된다.
  • 리액티브 : 일련의 작업들이 정의되지만, 병렬로 실행 가능하다.

대부분의 프로그래밍 언어는 스레드를 통해 동시 작업을 처리하지만, 리액티브 프로그래밍은 사용 가능한 데이터가 존재할 때마다 처리할 수 있다.
확장성과 성능 관점에서 리액티브 프로그래밍이 유리하다.

리액티브 스트림 정의

리액티브 스트림은 차단되지 않는 백 프레셔(backpressure)를 갖는 비동기 스트림 처리의 표준을 제공하는 것이 목적이다.
어떤 크기의 데이터셋이건 비동기 처리를 지원하고, 실시간으로 데이터를 처리하며, 백 프레셔를 사용하여 데이터 전달 폭주를 막는다.

리액티브 스트림 인터페이스

  • Publisher : 발행자
  • Subscriber : 구독자
  • Subscription : 구독
  • Processor : 프로세서

Publisher(발행자)

하나의 Subscription(구독)당 하나의 Subscriber(구독자)에 전송하는 데이터를 생성한다.

1public interface Publisher<T> {
2    void subscribe(Subscriber<? super T> subscriber);
3}
  • subscribe() : Subscriber가 Publisher를 구독 신청할 수 있는 메서드

Subscriber(구독자)

Subscriber(구독자)가 구독 신청되면 Publisher(발행자)로부터 이벤트를 수신할 수 있다.

1public interface Subscriber<T> {
2    void onSubscribe(Subscription sub);
3    void onNext(T item);
4    void onError(Throwable ex);
5    void onComplete();
6}
  • onSubscribe() : Publisher가 subscribe를 호출한 뒤에 호출되는 메서드이다. 이 메서드의 인자인 Subscription 객체를 Subscriber에 전달한다. - 이때 Subscription 객체는 Subscription.request() 메서드를 통해 받아온 객체이다.
  • onNext() : Subscription.request() 에서 요청하는 데이터를 Publisher에게 전달하는 메서드

Subscription(구독)

데이터를 어떻게 전송하는 지에 대해 정의한다.

1public interface Subscription {
2    void request(long n);
3    void cancel();
4}
  • request() : 전송되는 데이터를 요청한다. 해당 메서드의 인자는 Subscriber가 받고자 하는 데이터 항목의 수이다. - Subscriber가 처리할 수 있는 것보다 많은 데이터를 Publisher가 전송하는 것을 막아준다. (백프레셔)
  • cancel() : 구독을 취소한다는 것을 나타낸다.

Processor(프로세서)

Subscriber 인터페이스와 Publisher 인터페이스를 결합한 것이다.

1public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

processor는 Subscriber 역할로 데이터를 수신, 처리하고, Publisher 역할로 처리 결과를 Subscriber에게 발행한다.

전반적인 플로우

출처 : https://engineering.linecorp.com/ko/blog/reactive-streams-with-armeria-1/

출처 : https://engineering.linecorp.com/ko/blog/reactive-streams-with-armeria-1/

  1. Publisher.subscribe() 메서드를 통해 Subscriber가 Publisher에게 구독을 요청한다.
  2. Publisher가 onSubscribe(subscription) 을 호출하여 Subscriber에게 Subscription을 전달한다.
  3. Subscriber는 Subscription.request()를 통해 데이터 항목 수를 요청한다.
  4. Publisher는 Subscription을 통해 Subscriber.onNext()로 데이터를 전달한다.
  5. 전달이 성공적이면 onComplete, 오류가 발생한다면 onError를 호출한다.

리액터 기본 개념 및 구성


리액터의 두 가지 핵심 타입으로는 Mono와 Flux가 있는데, 모두 리액티브 스트림의 Publisher 인터페이스를 구현한 것이다.

  • Mono : 하나의 데이터 항목만 가지는 데이터셋에 최적화된 리액티브 타입
  • Flux : 0, 1 또는 다수의 데이터를 가지는 파이프라인

리액티브 플로우

리액티브 플로우는 마블 다이어그램으로 나타낸다.

Mono의 기본적인 플로우를 보여주는 마블 다이어그램



Flux의 기본적인 플로우를 보여주는 마블 다이어그램

Mono는 0 또는 하나의 데이터 항목과 에러를 가지지만 Flux는 다수의 데이터를 가지고 있는 것을 알 수 있다.

리액터 빌드 명세 추가

1implementation 'io.projectreactor:reactor-core'
2testImplementation 'io.projectreactor:reactor-test'

스프링 부트가 dependency 관리를 자동으로 해주기 때문에 버전을 명시할 필요는 없다.
해당 dependency를 추가함으로써 Mono와 Flux를 사용하여 리액티브 파이프라인을 생성할 수 있다.

리액티브 오퍼레이션 적용


Flux와 Mono가 제공하는 오퍼레이션은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.

500 개의 오퍼레이션이 존재하는데, 다음과 같이 분류할 수 있다.

  • 생성 오퍼레이션 (creation)
  • 조합 오퍼레이션 (combination)
  • 변환 오퍼레이션 (transformation)
  • 로직 오퍼레이션 (logic)

리액티브 타입 생성

생성 오퍼레이션을 통해 데이터를 발행하는 리액티브 publisher를 생성할 수 있다.

객체로부터 생성

1@Test
2public void createAFlux_just() {
3    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana");
4    fruitFlux.subscribe(
5      f -> System.out.println("Here's some fruit: " + f)
6    );
7}

Flux나 Mono의 just() 오퍼레이션을 사용하여 리액티브 타입을 생성할 수 있다.
Subscriber를 추가하기 위해 subscribe() 메서드를 호출한다. - 호출 즉시 데이터가 전달

컬렉션으로부터 생성

1@Test
2public void createAFlux_fromArray() {
3    String[] fruits = new String[]{"Apple", "Orange", "Grape", "Banana"};
4    Flux<String> fruitFlux = Flux.fromArray(fruits);
5}

배열로부터 Flux를 생성하려면 fromArray() 오퍼레이션을 호출한다.

1@Test
2public void createAFlux_fromIterable() {
3    List<String> fruitList = new ArrayList<>();
4    fruitList.add("Apple");
5    fruitList.add("Orange");
6    fruitList.add("Grape");
7    fruitList.add("Banana");
8    Flux<String> fruitFlux = Flux.fromIterable(fruitList);
9}

List, Set, Iterable의 다른 구현 컬렉션으로부터 Flux를 생성하려면 fromIterable() 오퍼레이션을 호출한다.

1@Test
2public void createAFlux_fromStream() {
3    Stream<String> fruitStream = Stream.of("Apple", "Orange", "Grape", "Banana");
4    Flux<String> fruitFlux = Flux.fromStream(fruitStream);
5}

자바 Stream 객체를 사용하여 Flux를 생성하려면 fromStream() 오퍼레이션을 호출한다.

테스트 수행

1StepVerifier.create(fruitFlux)
2            .expectNext("Apple")
3            .expectNext("Orange")
4            .expectNext("Grape")
5            .expectNext("Banana")
6            .verifyComplete();

Flux가 지정되면 StepVerifier는 해당 리액티브 타입(fruitFlux)을 구독한 후 전달되는 데이터에 대해 assertion을 적용한다. 그리고 마지막으로 fruitFlux가 완전한지 검사를 수행한다. 해당 테스트코드는 상단의 모든 Flux 데이터를 검사할 수 있다.

카운터 생성

 1@Test
 2public void createAFlux_range() {
 3    Flux<Integer> intervalFlux = Flux.range(1, 5);
 4
 5    StepVerifier.create(intervalFlux)
 6                .expectNext(1)
 7                .expectNext(2)
 8                .expectNext(3)
 9                .expectNext(4)
10                .expectNext(5)
11                .verifyComplete();
12}

range() 오퍼레이션을 통해 1~5까지의 카운터 Flux를 생성하고 StepVerifier로 검사를 수행한다.

 1@Test
 2public void createAFlux_interval() {
 3    Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
 4
 5    StepVerifier.create(intervalFlux)
 6                .expectNext(0L)
 7                .expectNext(1L)
 8                .expectNext(2L)
 9                .expectNext(3L)
10                .expectNext(4L)
11                .verifyComplete();
12}

interval() 오퍼레이션을 통해 매초마다 값을 증가시키는 카운터를 생성한다.
interval() 는 최대값이 지정되지 않으면 무한으로 실행되기 때문에 take() 오퍼레이션을 사용하여 5개의 항목까지만 실행되도록 제한하였다.

리액티브 타입 조합

조합 오퍼레이션을 통해 두 개의 리액티브 타입을 결합하거나 하나의 Flux를 두 개 이상의 리액티브 타입으로 분할할 수 있다.

리액티브 타입 결합

 1@Test
 2public void mergeFluxes() {
 3  Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
 4                                   .delayElements(Duration.ofMillis(500));
 5  Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
 6                              .delaySubscription(Duration.ofMillis(250))
 7                              .delayElements(Duration.ofMillis(500));
 8  
 9  Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
10
11  StepVerifier.create(mergedFlux)
12      .expectNext("Garfield")
13      .expectNext("Lasagna")
14      .expectNext("Kojak")
15      .expectNext("Lollipops")
16      .expectNext("Barbossa")
17      .expectNext("Apples")
18      .verifyComplete();
19}

하나의 Flux를 다른 Flux와 결합하기 위해 mergeWith() 오퍼레이션을 사용한다.
delayElements() 오퍼레이션을 사용하여 500 밀리초마다 일정한 속도로 방출되도록 설정했기 때문에 두 Flux값은 번갈아가며 mergeFlux에 넣어진다. 또한 delaySubscription() 오퍼레이션으로 foodFlux가 250 밀리초 뒤에 구독을 시작했기 때문에 characterFlux 데이터가 먼저 방출되었다.

→ mergeWith() 는 소스 Flux 값들이 완벽하게 번갈아 방출되게 보장할 수 없다.

 1@Test
 2public void zipFluxes() {
 3  Flux<String> characterFlux = Flux
 4      .just("Garfield", "Kojak", "Barbossa");
 5  Flux<String> foodFlux = Flux
 6      .just("Lasagna", "Lollipops", "Apples");
 7  
 8  Flux<Tuple2<String, String>> zippedFlux = 
 9      Flux.zip(characterFlux, foodFlux);
10  
11  StepVerifier.create(zippedFlux)
12        .expectNextMatches(p -> 
13            p.getT1().equals("Garfield") && 
14            p.getT2().equals("Lasagna"))
15        .expectNextMatches(p -> 
16            p.getT1().equals("Kojak") && 
17            p.getT2().equals("Lollipops"))
18        .expectNextMatches(p -> 
19            p.getT1().equals("Barbossa") && 
20            p.getT2().equals("Apples"))
21        .verifyComplete();
22}

zip() 오퍼레이션을 사용한다면 각 Flux 소스로부터 한 항목씩 번갈아 가져와 새로운 Flux를 생성한다.
순서대로 방출되기 때문에 Tuple2라는 두 개의 다른 객체를 전달하는 컨테이너 객체를 이용했다.



리액티브 타입 방출 순서 변경

 1@Test
 2public void firstFlux() {
 3  Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
 4        .delaySubscription(Duration.ofMillis(100));
 5  Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
 6  
 7  Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
 8  
 9  StepVerifier.create(firstFlux)
10      .expectNext("hare")
11      .expectNext("cheetah")
12      .expectNext("squirrel")
13      .verifyComplete();
14}

first() 오퍼레이션을 통해 먼저 값을 방출하는 Flux 값을 선택할 수 있다.

리액티브 타입 변환

변환 오퍼레이션을 통해 리액티브 스트림을 통해 전달되는 데이터를 변환하거나 필터링할 수 있다.

리액티브 타입 필터링

 1@Test
 2public void skipAFew() {
 3  Flux<String> countFlux = Flux.just(
 4      "one", "two", "skip a few", "ninety nine", "one hundred")
 5      .skip(3);
 6 
 7  StepVerifier.create(countFlux)
 8      .expectNext("ninety nine", "one hundred")
 9      .verifyComplete();
10}

skip() 오퍼레이션을 통해 지정된 수(3)의 항목을 건너뛰고 나머지 항목을 방출할 수 있다.
특정 시간이 경과할 때까지 기다렸다가 Flux 항목을 방출하려면 skip(Duration.ofSeconds(4))의 형태로 사용할 수 있다.

 1@Test
 2public void take() {
 3  Flux<String> nationalParkFlux = Flux.just(
 4      "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
 5      .take(3);
 6 
 7  StepVerifier.create(nationalParkFlux)
 8      .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
 9      .verifyComplete();
10}

take() 오퍼레이션을 사용하면 지정된 수(3)의 항목만을 방출할 수 있다. - take() 오퍼레이션의 반대
skip() 오퍼레이션과 마찬가지로 특정 시간이 경과할 때까지 기다렸다가 Flux 항목을 방출하려면 take(Duration.ofSeconds(4))의 형태로 사용할 수 있다.

 1@Test
 2public void filter() {
 3  Flux<String> nationalParkFlux = Flux.just(
 4      "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
 5      .filter(np -> !np.contains(" "));
 6 
 7  StepVerifier.create(nationalParkFlux)
 8      .expectNext("Yellowstone", "Yosemite", "Zion")
 9      .verifyComplete();
10}

filter() 오퍼레이션을 통해 조건식(공백이 없는 문자)에 해당하는 항목만 선택적으로 방출할 수 있다.

 1@Test
 2public void distinct() {
 3  Flux<String> animalFlux = Flux.just(
 4      "dog", "cat", "bird", "dog", "bird", "anteater")
 5      .distinct();
 6 
 7  StepVerifier.create(animalFlux)
 8      .expectNext("dog", "cat", "bird", "anteater")
 9      .verifyComplete();
10}

distinct() 오퍼레이션을 통해 발행된 적이 없는(고유한) Flux 항목만 방출할 수 있다.

리액티브 데이터 매핑

 1@Test
 2public void map() {
 3  Flux<Player> playerFlux = Flux
 4    .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
 5    .map(n -> {
 6      String[] split = n.split("\\s");
 7      return new Player(split[0], split[1]);
 8    });
 9  
10  StepVerifier.create(playerFlux)
11      .expectNext(new Player("Michael", "Jordan"))
12      .expectNext(new Player("Scottie", "Pippen"))
13      .expectNext(new Player("Steve", "Kerr"))
14      .verifyComplete();
15}

map() 오퍼레이션을 통해 발행된 항목을 다른 타입으로 매핑(변환)시킬 수 있다.
각 항목이 Flux로부터 발행될 때 동기적으로(순차적으로) 매핑이 수행된다.

 1@Test
 2public void flatMap() {
 3  Flux<Player> playerFlux = Flux
 4    .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
 5    .flatMap(n -> Mono.just(n)
 6        .map(p -> {
 7            String[] split = p.split("\\s");
 8            return new Player(split[0], split[1]);
 9          })
10        .subscribeOn(Schedulers.parallel())
11      );
12  
13  List<Player> playerList = Arrays.asList(
14      new Player("Michael", "Jordan"), 
15      new Player("Scottie", "Pippen"), 
16      new Player("Steve", "Kerr"));
17
18  StepVerifier.create(playerFlux)
19      .expectNextMatches(p -> playerList.contains(p))
20      .expectNextMatches(p -> playerList.contains(p))
21      .expectNextMatches(p -> playerList.contains(p))
22      .verifyComplete();
23}

flatMap() 오퍼레이션을 subscribeOn()과 함께 사용하면 리액터 타입 변환을 비동기적으로 수행할 수 있다. 작업이 병행으로 수행되므로 어떤 작업이 먼저 끝날 지 보장이 되지 않는다.
※ flatMap() 오퍼레이션은 수행 도중 생성되는 임시 Flux를 사용하여 변환을 수행하므로 비동기 변환이 가능하다.



Schedulers의 동시성 모델 종류

Schedulers 메서드 설명
.immediate() 현재 스레드에서 구독 실행
.single() 단일의 재사용 가능한 스레드에서 구독 실행. 모든 호출자에 대해 동일한 스레드를 재사용한다.
.newSingle() 매 호출마다 전용 스레드에서 구독 실행
.elastic() 무한하고 신축성 있는 풀에서 가져온 작업 스레드에서 구독 실행. 필요 시 새로운 작업 스레드 생성
.parallel() 고정된 크기의 풀에서 가져온 작업 스레드에서 구독 실행. CPU 코어의 개수가 크기가 된다.

리액티브 데이터 버퍼링

 1@Test
 2public void bufferAndFlatMap() {
 3    Flux.just(
 4        "apple", "orange", "banana", "kiwi", "strawberry")
 5        .buffer(3)
 6        .flatMap(x -> 
 7          Flux.fromIterable(x)
 8            .map(y -> y.toUpperCase())
 9            .subscribeOn(Schedulers.parallel())   
10            .log()
11        ).subscribe();
12
13    StepVerifier
14        .create(bufferedFlux)
15        .expectNext(Arrays.asList("apple", "orange", "banana"))
16        .expectNext(Arrays.asList("kiwi", "strawberry"))
17        .verifyComplete();
18}

buffer() 오퍼레이션을 통해 데이터 스트림을 분할하여 지정된 최대 크기의 리스트로 된 Flux를 생성한다.
flatMap()을 같이 사용하면 각 List 컬렉션을 별도의 스레드에서 병행으로 계속 처리할 수 있다.

❗ 모든 항목을 List로 모을 경우에는 인자 없이 buffer()를 호출한다.

 1@Test
 2public void collectList() {
 3  Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
 4  
 5  Mono<List<String>> fruitListMono = fruitFlux.collectList();
 6  
 7  StepVerifier.create(fruitListMono)
 8               .expectNext(Arrays.asList(
 9                            "apple", "orange", "banana", "kiwi", "strawberry"))
10               .verifyComplete();
11}

collection() 오퍼레이션을 통해 입력한 Flux가 방출한 모든 메시지를 가지는 List 타입의 Mono를 생성할 수 있다.

 1@Test
 2public void collectMap() {
 3  Flux<String> animalFlux = Flux.just(
 4      "aardvark", "elephant", "koala", "eagle", "kangaroo");
 5  
 6  Mono<Map<Character, String>> animalMapMono = animalFlux.collectMap(a -> a.charAt(0));
 7  
 8  StepVerifier
 9      .create(animalMapMono)
10      .expectNextMatches(map -> {
11        return
12            map.size() == 3 &&
13            map.get('a').equals("aardvark") &&
14            map.get('e').equals("eagle") &&
15            map.get('k').equals("kangaroo");
16      })
17      .verifyComplete();
18}

collectionMap() 오퍼레이션은 Map을 포함하는 Mono를 생성할 수 있다.
Flux가 방출된 메시지가 Map의 항목으로 지정되며, 각 항목의 키는 collectMap에서 설정한 값을 따른다.

로직 오퍼레이션 수행

Mono나 Flux가 발행한 항목이 어떤 조건과 일치하는지 알기 위해 사용한다.

모든 항목 확인

 1@Test
 2public void all() {
 3  Flux<String> animalFlux = Flux.just(
 4      "aardvark", "elephant", "koala", "eagle", "kangaroo");
 5  
 6  Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
 7  StepVerifier.create(hasAMono)
 8              .expectNext(true)
 9              .verifyComplete();
10  
11  Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
12  StepVerifier.create(hasKMono)
13              .expectNext(false)
14              .verifyComplete();
15}

all() 오퍼레이션을 통해 모든 메시지가 조건에 충족하는지 확인할 수 있다.
결과는 Boolean 타입의 Mono로 생성되는데, 조건에 충족할 경우 true, 조건에 충족하지 않는다면 false를 방출한다.

최소한 하나의 항목만 확인

 1@Test
 2public void any() {
 3  Flux<String> animalFlux = Flux.just(
 4      "aardvark", "elephant", "koala", "eagle", "kangaroo");
 5  
 6  Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("a"));
 7  
 8  StepVerifier.create(hasAMono)
 9              .expectNext(true)
10              .verifyComplete();
11  
12  Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
13  StepVerifier.create(hasZMono)
14              .expectNext(false)
15              .verifyComplete();
16}

any() 오퍼레이션을 통해 최소한 하나의 메시지가 조건을 충족하는지 확인할 수 있다.
최소한 하나의 메시지가 조건에 충족한다면 true, 모두 조건에 충족하지 않는다면 false를 방출한다.

Posts in this Series