-
Java8. CompletableFutureBackEnd/Java 2022. 1. 23. 22:20반응형
CompletableFuture는 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게 하는 인터페이스입니다.
Future를 사용해서도 어느정도는 가능했지만 다음과 같은 문제점이 있었습니다.
- Future를 외부에서 완료 시킬 수 없다. 취소하거나 get()에 타임아웃을 설정할 수는 있다.
- 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
- 여러 Future를 조합할 수 없다.
- 예외 처리용 API를 제공하지 앟는다.
CompletableFuture API
- runAsync(): 비동기로 작업을 실행합니다. (리턴 값이 없는 경우)
- supplyAsync(): 비동기로 작업을 실행합니다. (리턴 값이 있는 경우)
원하는 Executor(ThreadPool)를 사용해서 실행할 수도 있지만, 기본은 ForkJoinPool.commonPool()을 사용합니다.
- thenApply(Function): 리턴 값을 받아서 다른 값으로 바꾸는 Callback입니다.
- thenAccept(Consumer): 리턴 값을 받아서 또 다른 작업을 처리하는 Callback입니다.
- thenRun(Runnable): 리턴 값을 받지 않고 다른 작업을 처리하는 Callback입니다.
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CompletableFutureClass { public static void main(String[] args) throws ExecutionException, InterruptedException { // return 값이 없는 경우 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 }).thenRun(() -> { // 리턴 값을 받지 않고 다른 작업을 처리합니다. System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 }); future.get(); // return 값이 있는 경우 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 return "CompletableFuture"; }).thenApply((s) -> { // 리턴 값을 받아서 다른 값으로 변환합니다. System.out.println(Thread.currentThread().getName()); // [결과]: main return s.toUpperCase(); }); System.out.println(completableFuture.get()); // [결과]: COMPLETABLEFUTURE // 원하는 Executor(ThreadPool)을 사용합니다. ExecutorService executorService = Executors.newFixedThreadPool(4); CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: pool-1-thread-1 return "CompletableFuture"; }, executorService).thenAcceptAsync((s) -> { // 리턴 값을 받아서 또 다른 작업을 처리합니다. System.out.println(Thread.currentThread().getName()); // [결과]: pool-1-thread-2 System.out.println(s + " Callback"); // [결과]: CompletableFuture Callback }, executorService); executorService.shutdown(); } }
- thenCompose(): 두 작업이 서로 이어서 실행하도록 조합합니다.
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureClass { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 return "Hello"; }); CompletableFuture<String> future = hello.thenCompose(CompletableFutureClass::getWorld); System.out.println(future.get()); // [결과]: Hello World! } private static CompletableFuture<String> getWorld(String message) { return CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5 return message + " World!"; }); } }
- thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백을 실행합니다.
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureClass { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5 return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 return "World"; }); CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w); System.out.println(future.get()); // Hello World } }
- allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백을 실행합니다.
import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class CompletableFutureClass { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5 return "World"; }); List<CompletableFuture<String>> futures = Arrays.asList(hello, world); CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]); CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); results.get().forEach(System.out::println); /* [결과] Hello World */ } }
- anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 대해 콜백을 실행합니다.
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureClass { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5 return "World"; }); CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println); future.get(); // [결과]: Hello } }
- exceptionally(Function): 예외를 처리합니다.
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureClass { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 if(true) throw new IllegalArgumentException(); return "Hello"; }).exceptionally(ex -> { System.out.println(ex); // [결과]: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException return "Error!"; }); System.out.println(hello.get()); // [결과]: Error! } }
- handle(BiFunction): 정상인 경우와 예외인 경우 모두 처리합니다.
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureClass { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19 if(true) throw new IllegalArgumentException(); return "Hello"; }).handle((result, ex) -> { if(ex != null) { System.out.println(ex); // [결과]: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException return "Error!"; } return result; }); System.out.println(hello.get()); // [결과]: Error! } }
반응형'BackEnd > Java' 카테고리의 다른 글
Java 9. JPMS (0) 2024.12.02 Java8. parallelSort (0) 2022.01.25 Callable과 Future (0) 2022.01.23 Multi-Thread Programming (0) 2022.01.23 Java8. Date-Time API (0) 2022.01.22