티스토리 뷰

Java

[Java] CompletableFuture

snail voyager 2023. 11. 27. 23:53
728x90
반응형

Future의 단순 활용

비동기 계산을 모델링하는데 사용

Future는 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공

시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 작업을 수행

Callable 객체 내부로 감싼 다음에 ExecutorService에 제출

ExecutorService executor = Executors.newCachedThreadPool();	//스레드 풀 생성
Future<Double> future = executor.submit(new Callable<Double>() {	//Callable을 스레드풀에 제출
	public Double call() {
    	return doSomeLongComputation();	//시간이 오래걸리는 작업을 다른 스레드로 비동기 실행
    }
});
doSomethingElse();	//비동기 작업을 수행하는 동안 다른 작업 수행

try {
	Double result = future.get(1, TimeUnit.SECONDS);	//비동기 작업 결과를 가져옴. 완료 전이라면 1초까지만 기다림
} catch (Exception e) {

}

Future 제한

Future 인터페이스가 비동기 계산이 끝났는지 확인할 수 있는 메서드, 끝나길 기다리는 메서드, 결과 회수 메서드 등

제공하지만 여러 Future의 결과가 있을 때 이들의 의존성은 표현하기 어려움

 

동기 API와 비동기 API

  • 동기 API에서는 메서드를 호출한 다음에 메서드가 계산을 완료할 때까지 기다렸다가
    메서드가 반환되면 호출자는 반환된 값으로 계속 다른 동작을 수행
    호출자와 피호출자가 각각 다른 스레드라도 호출자는 피호출자의 동작 완료를 기다림
    동기 API를 Blocking call
  • 비동기 API에서는 메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 다른 스레드에 할당
    다른 스레드에 할당된 계산 결과는 콜백 메서드를 호출해서 전달하거나
    계산 결과가 끝날 때까지 기다리는 메서드를 추가로 호출하면서 전달
    비동기 API를 Non Blocking call

 

비동기 API 구현

public Future<Double> getPriceAsync(String product) {
	CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread( () -> {
    		double price = calculatePrice(product);	//비동기로 수행
            futurePrice.complete(price);	//계산이 완료되면 Future에 설정
    }).start();
    return futurePrice;	//계산 결과를 기다리지 않고 Future 반환
}

비동기 API 사용

Shop shop = new Shop("Prada");
Future<Double> futurePrice = shop.getPriceAsync("prada back");	//비동기 작업 수행
doSomethingElse();	//다른 작업 수행
try {
	double price = futurePrice.get();	//가격정보가 반환될 때까지 Block
} catch (Exception e) {
	throw new RuntimeException(e);
}

에러 처리 방법

completeExceptionally 메서드를 이용해서 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달

public Future<Double> getPriceAsync(String product) {
	CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread( () -> {
           try {
    		 double price = calculatePrice(product);	//비동기로 수행
            	futurePrice.complete(price);	//계산이 완료되면 Future에 설정
            } catch (Exception e) {
            	futurePrice.completeExceptionally(e);	//에러를 포함시켜 Future를 종료
            }
    }).start();
    return futurePrice;	//계산 결과를 기다리지 않고 Future 반환
}

exceptionally()

CompletableFuture exceptionally() 메서드를 사용하여 예외를 처리하면 해당 CompletableFuture가 완료될 때 예외가 발생했을 경우 예외를 처리하고 대체 값을 반환할 수 있습니다. 그러나 exceptionally() 메서드는 예외를 처리하기 위한 콜백을 제공하는 것이지, CompletableFuture의 결과를 변경하지는 않습니다.

따라서 exceptionally() 메서드에서 예외를 처리한 후에도 get() 메서드를 호출하면 예외가 발생할 수 있습니다. 이는 get() 메서드가 CompletableFuture의 결과를 반환하는 것이 아니라, 작업이 완료될 때까지 대기하다가 예외가 발생했을 경우 해당 예외를 다시 던지기 때문입니다.

public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Error occurred in task");
        });

        CompletableFuture<String> exceptionHandledFuture = future.exceptionally(throwable -> {
            System.out.println("Exception handled: " + throwable.getMessage());
            return "Default value";
        });

        try {
            String result = exceptionHandledFuture.get();
            System.out.println("Result: " + result);
        } catch (Exception e) {
            System.out.println("Exception occurred: " + e.getMessage());
        }
    }
}

handle()

handle() 메서드를 사용하여 예외를 처리하고 최종 결과를 반환합니다. 이를 통해 get() 메서드를 호출하는 시점에서는 예외가 발생해도 원래의 예외가 아닌 처리된 예외가 발생하도록 할 수 있습니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 비동기 작업: 예외 발생
    throw new RuntimeException("Error occurred in async task");
});

CompletableFuture<String> resultFuture = future.handle((result, throwable) -> {
    if (throwable != null) {
        // 예외 발생 시 처리
        System.out.println("Error occurred: " + throwable.getMessage());
        return "Default value"; // 예외가 발생했을 때 대체 값 반환
    } else {
        // 예외가 발생하지 않은 경우
        return result; // 정상적인 결과 반환
    }
});

// 최종 결과 사용
String result = resultFuture.get(); // 예외가 발생해도 이 시점에서는 발생하지 않음
System.out.println("Final result: " + result);

팩토리 메서드 supplyAsync

supplyAsync 메서드는 Supplier를 인수로 받아서 CompletableFuture를 반환

CompletableFuture는 Supplier를 실행해서 비동기적으로 결과를 생성

ForkJoinPool의 Executor 중 하나가 Supplier를 실행

두번째 인수를 받는 오버로드 버전 메서드를 이용해서 다른 Executor를 지정

에러 관리는 completeExceptionally와 동일

public Future<Double> getPriceAsync(String product) {
	return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

 

비블록 코드 만들기

4개 상점에 순차적으로 정보를 요청하는 메서드 (4032 msecs)

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
        new Shop("LetsSaveBig"),
        new Shop("MyFavoriteShop"),
        new Shop("BuyItAll"));

public List<String> findPrices(String product) {
    return shops.stream()
            .map(shop -> String.format("%s price is $.2f",
                    shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

병렬 스트림으로 요청 (1180 msecs)

public List<String> findPricesAsParallel(String product) {
    return shops.parallelStream()
            .map(shop -> String.format("%s price is %.2f",
                    shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

CompletableFuture로 비동기 호출 (2005 msecs)

  • List<CompletableFuture<String>> 얻은 후 메서드의 반환 형식 List<String> 에 맞도록 두번째 스트림으로 추출
  • CompletableFuture 에 join을 호출해서 모든 동작이 끝나기를 기다림
  • join 메서드는 Future 인터페이스의 get 메서드와 같은 의미를 갖지만 join은 아무 예외도 발생하지 않는다
  • 스트림 연산은 게으른 특성이 있으므로 하나의 파이프라인으로 처리했다면 모든 요청 동작이 순차적으로 작동됨
  • 두가지 버전 모두 Runtime.getRuntime().availableProcessors()가 반환하는 스레드수를 사용하면서 비슷한 결과
  • CompletableFuture는 다양한 Executor를 지정할 수 있다는 장점
  • Executor로 스레드 풀 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있음
public List<String> findPricesAsCompletableFuture(String product) {
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + "price is " + shop.getPrice(product)))
            .collect(Collectors.toList());
    return priceFutures.stream()	//두개의 파이프라인으로 나눈 것이 핵심
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

커스텀 Executor 사용하기

  • 상점 수보다 많은 스레드를 갖는 것은 낭비
  • 정보를 검색하려는 상점 수만큼 스레드를 갖도록 Executor를 설정
  • 스레드 수가 너무 많으면 오히려 서버가 크래시될 수 있음
  • 하나의 Executor에서 사용할 스레드의 최대 개수는 100이하로 설정하는 것이 바람직
  • 일반 스레드가 실행 중이면 자바 프로그램은 종료되지 않음
  • 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있음
  • 두 스레드의 성능은 같음
  • supplyAsync 두번째 인수로 Executor를 전달
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    }
});

CompletableFuture.supplyAsync(() -> shop.getName() + "price is " + shop.getPrice(product), executor);
  • I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 구현하기 간단하고 효율적일 수 있다
  • 작업이 I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공하며
    대기/계산(W/C)의 비율에 적합한 스레드 수를 설정할 수 있다.
    스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할지 예측하기 어려운 문제도 있다.

 

비동기 작업 파이프라인 만들기

Discount 서비스를 이용하는 메서드 (5개 상점 가격정보 5s + 할인 서비스 5s = 10028msecs)

public List<String> findPricesAsSync() {
    return shops.stream()
            .map(shop -> shop.getPrice(product))	//각 상점에서 할인 전 가격 얻기
            .map(Quote::parse)		//상점에서 반환한 문자열을 Quote 객체로 변환
            .map(Discount::applyDiscount)	//Discount 서비스를 이용해서 각 Quote에 할인을 적용
            .collect(Collectors.toList());
}

CompletableFuture 비동기적으로 구현한 메서드 (5개 상점 가격정보 1s + 할인 서비스 1s = 2035msecs)

  • thenApply 메서드는 CompletableFuture가 끝날 때까지 블록하지 않는다
  • thenApply 메서드는 CompletableFuture의 결과를 변환(transform)하는 메서드
  • CompletableFuture가 완료될 때 해당 결과를 받아서 다른 형태로 변환한 후에 그 변환된 결과를 반환
  • CompletableFuture가 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있다
  • 세번째 변환 과정에서 두가지 CompletableFuture로 이루어진 연쇄적으로 수행되는 두개의 비동기 동작
  • thenCompose 메서드는 첫번째 연산의 결과를 두번째 연산으로 전달
  • 첫번째 CompletableFuture에 thenCompose 메서드를 호출하고 Function에 넘겨주는 식으로
    두 CompletableFuture를 조합
  • thenCompose 메서드도 Async로 끝나는 버전이 존재 (다른 작업이 다른 스레드에서 실행되도록 스레드 풀로 작업 제출)
  • Async로 끝나지 않는 메서드는 이전 작업을 수행한 스레드와 같은 스레드에서 작업을 실행함
  • 여기서는 첫번째 CompletableFuture에 의존하므로 두 CompletableFuture를 하나로 조합하든 Async 메서드를 사용하든 실행시간에는 영향을 미치지 않음
  • 스레드 전환 오버헤드가 적게 발생하면서 효율성이 좀 더 좋은 thenCompose 사용
public List<String> findPricesAsAsync() {
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))	//Stream<CompletableFuture<String>> 반환
            .map(future -> future.thenApply(Quote::parse))	//CompletableFuture<Quote>로 변환
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))	//Stream<CompletableFuture<String>> 반환
            .collect(Collectors.toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

독립 CompletableFuture와 비독립 CompletableFuture 합치기

  • 독립적으로 실행된 두개의 CompletableFuture 결과를 합쳐야하는 상황
  • thenCombine 메서드를 사용
  • BiFunction을 두번째 인수로 받는다
  • thenCombineAsync 메서드는 BiFunction이 정의하는 조합 동작이 스레드 풀로 제출되면서 별도의 태스크에서 비동기적으로 수행
  • 아래 예제에서 합치는 연산은 단순한 곱셈으로 별도의 태스크에서 수행하지 않고 thenCombie 메서드 사용
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))   //제품가격 정보를 요청하는 첫번째 태스크 생성
        .thenCombine(CompletableFuture.supplyAsync(
                () -> exchangeService.getRate(Money.EUR, Money.USD)),   //환율 정보를 요청하는 독립적인 두번째 태스크 생성
                (price, rate) -> price * rate);     //두 결과를 곱해서 정보를 합치기

Future, CompletableFuture

CompletableFuture는 람다 표현식을 사용

람다 덕분에 다양한 동기 태스크, 비동기 태스크를 활용해서 복잡한 연산 수행 방법을 효과적으로 쉽게 정의할 수 있는
선언형 API를 만들 수 있다

Timeout 사용하기

onTimeout 메서드는 지정된 시간이 지난 후에 CompletableFuture를 TimeoutException으로 완료하면서

또다른 CompletableFuture를 반환할 수 있도록 내부적으로 ScheduledThreadExecutor를 활용

계산 파이프라인을 연결하고 TimeoutException이 발생했을 때 사용자가 쉽게 이해할 수 있는 메시지를 제공

Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
        .thenCombine(CompletableFuture.supplyAsync(
                () -> exchangeService.getRate(Money.EUR, Money.USD)),  
                (price, rate) -> price * rate)
        ))
        .orTimeout(3, TimeUnit.SECONDS);	//3초 뒤에 작업이 완료되지 않으면 Future가 TimeoutException 발생

서비스가 1초 이내에 완료되어야 하지만 그렇지 않다고 전체 계산을 Exception으로 처리하진 않고

미리 정의한 계산을 이용해 연산을 이어갈 수 있음. completeOnTimeout 메서드 이용

completeOnTimeout 메서드는 CompletableFuture를 반환하므로 이 결과를 다른 CompletableFuture 메서드와 연결 가능

Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
        .thenCombine(CompletableFuture.supplyAsync(
                () -> exchangeService.getRate(Money.EUR, Money.USD)) 
                .completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),	//1초 안에 결과를 제공하지 않으면 기본 값 활용
                (price, rate) -> price * rate
        ))
        .orTimeout(3, TimeUnit.SECONDS);

 

CompletableFuture의 종료에 대응하는 방법

Future 스트림을 반환하도록 findPrices 메서드 리팩터링

public Stream<CompletableFuture<String>> findPricesStream(String product) {
	return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

thenAccept 메서드는 연산 결과를 소비하는 Consumer를 인수로 받는다

thenAcceptAsync 메서드는 CompletableFuture가 완료된 스레드가 아니라 새로운 스레드를 이용해서 Consumer를 실행

CompletableFuture가 완료되는 즉시 응답하기 위해 thenAcceptAsync를 사용하지 않는다

thenAccept 메서드는 CompletableFuture<Void>를 반환

가장 느린 상점에서 응답을 받아서 반환된 가격을 출력하려면 스트림의 모든 CompletableFuture<Void>를 배열로 추가하고 실행 결과를 기다림

CompletableFuture[] futures = findPricesStream("myPhone")
	.map(f -> f.thenAccept(System.out::println)
    	.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();

팩토리 메서드 allOf는 CompletableFuture 배열을 입력받아 CompletableFuture<Void>를 반환

여러 개의 CompletableFuture가 모두 완료될 때까지 대기하는 CompletableFuture를 생성

그 후에 thenRun() 또는 thenAccept() 등의 메서드를 사용하여 모든 CompletableFuture가 완료된 후에 실행할 작업을 정의

전달된 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료된다

allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 원래 스트림의 모든 CompletableFuture의 실행 완료를 기다릴 수 있다.

    public static List<String> getReturnValueAsync() {
        List<Integer> dataList = createDataList(800);
        List<List<Integer>> chunks = chunkData(dataList, 100);

        ExecutorService executor = Executors.newFixedThreadPool(16);

        //비동기로 API를 호출해서 List를 리턴
        List<CompletableFuture<List<String>>> futures = chunks.stream()
                .map(data -> CompletableFuture.supplyAsync(() -> processApiReturnValue(data), executor))
                .toList();

        //모든 CompletableFuture가 완료될 때까지 기다린 후 실행 결과를 반환
        //allOf는 모든 작업이 완료될 때까지 기다리는 CompletableFuture를 반환, 모든 CompletableFuture를 조합하여 결과를 수집
        //allOf가 반환하는 CompletableFuture에 join을 호출하면 원래 스트림의 모든 CompletableFuture의 실행 완료를 기다릴 수 있다
        CompletableFuture<List<String>> combinedFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .flatMap(List::stream)
                        .toList());
        try {
            return combinedFutures.get().stream().toList();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            executor.shutdown();
        }
    }

 

배열의 CompletableFuture 중 하나의 작업이 끝나길 기다리는 상황을 원하면 anyOf 메서드 사용

anyOf 메서드는 CompletableFuture 배열을 입력으로 받아서 CompletableFuture<Object>를 반환

728x90
반응형

'Java' 카테고리의 다른 글

[Java] 리팩터링 테스트  (0) 2023.12.16
[Java] 리팩터링 원칙  (0) 2023.12.03
[Java] 리팩터링  (1) 2023.11.26
[Java] DateTimeFormatter uuuu vs yyyy  (1) 2023.11.23
[Java] Java 동시성  (0) 2023.11.20
반응형
300x250