ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Java8. CompletableFuture
    BackEnd/Java 2022. 1. 23. 22:20
    반응형

      CompletableFuture는 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게 하는 인터페이스입니다.

     

      Future를 사용해서도 어느정도는 가능했지만 다음과 같은 문제점이 있었습니다.

    • Future를 외부에서 완료 시킬 수 없다. 취소하거나 get()에 타임아웃을 설정할 수는 있다.
    • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    • 여러 Future를 조합할 수 없다.
    • 예외 처리용 API를 제공하지 앟는다.

     

    CompletableFuture API

    • runAsync(): 비동기로 작업을 실행합니다. (리턴 값이 없는 경우)
    • supplyAsync(): 비동기로 작업을 실행합니다. (리턴 값이 있는 경우)

     

      원하는 Executor(ThreadPool)를 사용해서 실행할 수도 있지만, 기본은 ForkJoinPool.commonPool()을 사용합니다.

    • thenApply(Function): 리턴 값을 받아서 다른 값으로 바꾸는 Callback입니다.
    • thenAccept(Consumer): 리턴 값을 받아서 또 다른 작업을 처리하는 Callback입니다.
    • thenRun(Runnable): 리턴 값을 받지 않고 다른 작업을 처리하는 Callback입니다.
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureClass {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        // return 값이 없는 경우
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
        }).thenRun(() -> {
          // 리턴 값을 받지 않고 다른 작업을 처리합니다.
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
        });
        future.get();
    
        // return 값이 있는 경우
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
          return "CompletableFuture";
        }).thenApply((s) -> {
          // 리턴 값을 받아서 다른 값으로 변환합니다.
          System.out.println(Thread.currentThread().getName()); // [결과]: main
          return s.toUpperCase();
        });
        System.out.println(completableFuture.get()); // [결과]: COMPLETABLEFUTURE
    
        // 원하는 Executor(ThreadPool)을 사용합니다.
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: pool-1-thread-1
          return "CompletableFuture";
        }, executorService).thenAcceptAsync((s) -> {
          // 리턴 값을 받아서 또 다른 작업을 처리합니다.
          System.out.println(Thread.currentThread().getName()); // [결과]: pool-1-thread-2
          System.out.println(s + " Callback"); // [결과]: CompletableFuture Callback
        }, executorService);
        executorService.shutdown();
      }
    
    }
    • thenCompose(): 두 작업이 서로 이어서 실행하도록 조합합니다.
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureClass {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
          return "Hello";
        });
    
        CompletableFuture<String> future = hello.thenCompose(CompletableFutureClass::getWorld);
        System.out.println(future.get()); // [결과]: Hello World!
    
      }
    
      private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5
          return message + " World!";
        });
      }
    
    }
    • thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백을 실행합니다.
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureClass {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5
          return "Hello";
        });
    
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
          return "World";
        });
    
        CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
        System.out.println(future.get()); // Hello World
      }
    }
    • allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백을 실행합니다.
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.stream.Collectors;
    
    public class CompletableFutureClass {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
          return "Hello";
        });
    
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5
          return "World";
        });
    
        List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
        CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
    
        CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray)
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
    
        results.get().forEach(System.out::println);
        /* [결과]
           Hello
           World
        */
      }
    }
    • anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 대해 콜백을 실행합니다.
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureClass {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
          return "Hello";
        });
    
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-5
          return "World";
        });
    
        CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
        future.get(); // [결과]: Hello
      }
    }
    • exceptionally(Function): 예외를 처리합니다.
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureClass {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
          if(true) throw new IllegalArgumentException();
          return "Hello";
        }).exceptionally(ex -> {
          System.out.println(ex); // [결과]: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
          return "Error!";
        });
    
        System.out.println(hello.get()); // [결과]: Error!
      }
    }
    • handle(BiFunction): 정상인 경우와 예외인 경우 모두 처리합니다.
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureClass {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
          System.out.println(Thread.currentThread().getName()); // [결과]: ForkJoinPool.commonPool-worker-19
          if(true) throw new IllegalArgumentException();
          return "Hello";
        }).handle((result, ex) -> {
          if(ex != null) {
            System.out.println(ex); // [결과]: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
            return "Error!";
          }
          return result;
        });
    
        System.out.println(hello.get()); // [결과]: Error!
      }
    }
    반응형

    'BackEnd > Java' 카테고리의 다른 글

    Java 9. JPMS  (0) 2024.12.02
    Java8. parallelSort  (0) 2022.01.25
    Callable과 Future  (0) 2022.01.23
    Multi-Thread Programming  (0) 2022.01.23
    Java8. Date-Time API  (0) 2022.01.22

    댓글

Designed by Tistory.