Future, CompletableFuture, ListenableFuture

Future

Future는 비동기 처리 결과를 표현하기 위해서 사용된다. 비동기 처리가 완료되었는지 확인하고, 처리 완료를 기다리고, 처리 결과를 리턴하는 메소드를 제공한다. 비동기 결과는 처리가 완료되면 get 메소드를 사용해서 얻을 수 있다. get 메소드는 비동기 처리가 완료될 때까지 blocking 된다. 비동기 작업을 취소하려면 cancel 메소드를 사용하면 된다. 비동기 작업이 정상적으로 완료되었는지 혹은 취소되었는지 확인하기 위한 메소드도 제공된다. 또한 비동기 작업이 완료된 경우에는 작업을 취소하는 것은 불가능하다. 만약에 Future를 비동기 작업의 취소를 위해서만 사용하고자 한다면, (비동기 작업의 결과는 필요없음) Future<?> 형식으로 타입을 선언하면 된다. 그리고 비동기 작업의 결과로는 null을 리턴하도록 하면 된다.

Future는 ExecutorService에서 비동기 작업의 리턴값으로 많이 사용된다. 아래는 간단한 예제이다. ExecutorService#submit 메소드로 Callable 객체를 전달하고 있다. Callable 객체의 call() 메소드가 곧바로 실행되는 것은 아니기 때문에 리턴값을 바로 구할 수 없다. 이렇게 미래에 실행되는 Callable의 수행 결과값을 구할 때 사용되는 것이 Future이다.

 interface ArchiveSearcher { String search(String target); }
 class App {
   ExecutorService executor = ...
   ArchiveSearcher searcher = ...
   void showSearch(final String target)
       throws InterruptedException {
     Future<String> future
       = executor.submit(new Callable<String>() {
         public String call() {
             return searcher.search(target);
         }});
     displayOtherThings(); // do other things while searching
     try {
       displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
 }

Memory Consistency Effects

  • 자바독에는 이와 같은 내용이 있다. “Actions taken by the asynchronous computation happen-before actions following the corresponding Future.get() in another thread.” 해석해보면, 비동기 처리를 위한 액션은 다른 스레드에서 Future.get() 호출보다 먼저 발생한다는 것이다.

FutureTask

FutureTask는 Future의 구현체이다. 또한 Runnable의 구현체이기도하다. 따라서 Executor에 의해서 실행될 수 있다. 예를 들어 아래와 같이 코드를 작성할 수 있다.

 FutureTask<String> future =
       new FutureTask<String>(new Callable<String>() {
         public String call() {
           return searcher.search(target);
       }});
executor.execute(future);

String result = future.get();

Future에서 제공하는 주요 메소드

  • V get() : Callable 등 작업의 실행이 완료될 때 까지 블록킹 되며, 완료되면 그 결과값을 리턴한다.
    • CancellationException : 작업이 취소되는 경우에 발생
    • ExecutionException : 작업 도중에 예외가 발생한 경우에 발생
    • InterruptedException : 현재 스레드가 인터럽트된 경우에 발생
  • V get(long timeout, TimeUnit unit) : 지정한 시간 동안 작업의 실행 결과를 기다린다. 지정한 시간 내에 수행이 완료되면 그 결과값을 리턴한다. 대기 시간이 초과되면 TimeoutException을 발생시킨다.
    • CancellationException : 작업이 취소되는 경우에 발생
    • ExecutionException : 작업 도중에 예외가 발생한 경우에 발생
    • InterruptedException : 현재 스레드가 인터럽트된 경우에 발생
    • TimeoutException : 대기 시간이 초과된 경우 발생
  • boolean cancel(boolean mayInterruptIfRunning)
    • 이 작업을 취소하려고 시도한다. 작업이 이미 완료되었거나 혹인 이미 취소되었거나 혹은 다른 이유로 취소 할 수 없는 경우에는 실패할 수 있다. cancel() 메소드가 호출되기 전에 작업이 시작되지 않았던 경우에는, 작업이 실행되지 않는다. 작업이 이미 시작된 경우에는 mayInterruptIfRunning 파라미터로 작업을 실행중인 스레드를 인터럽트할지의 여부를 결정한다.
    • 이 메소드가 리턴된 이후에는 isDone() 메소드는 항상 true를 리턴한다.
    • 이 메소드가 true를 리턴한 경우에는, isCancelled() 메소드는 항상 true를 리턴한다.
    • 작업이 취소되지 않은 경우는 false를 리턴한다. 보통 이미 작업이 완료된 경우이다. 작업이 정상적으로 취소된 경우에는 true를 리턴한다.
    • mayInterruptIfRunning : 이 작업을 실행하는 스레드가 인터럽트되어야 하는 경우에는 true를 넘겨주면 된다. true를 넘겨주지 않으면 진행중인 작업이 완료될 수 있다.
  • boolean isCancelled()
    • 작업이 정상적으로 완료되기 전에 취소가 된 경우에 true를 리턴한다.
  • boolean isDone()
    • 작업이 완료된 경우에 true를 리턴한다. 작업은 정상 종료, 예외 혹은 취소로 인해 완료 될 수 있다. 이 모든 경우에도 isDone() 메소드는 true를 리턴한다.

CompletableFuture

CompletableFuture를 사용하면 Future에 대한 결과를 명시적으로 쓸 수 있다(명시적으로 Future를 완료할 수 있다). 만약에 2개 이상의 스레드에서 CompleteFuture에 결과를 쓰려고 하는 경우(complete, completeExceptionally, cancel)에는 그 중 1개의 스레드만 성공한다. CompletableFuture는 Future, CompletionState 인터페이스를 구현하고 있다. CompletableFuture는 CompletionStage 인터페이스를 다음과 같은 정책으로 구현하고 있다.

  • 비동기 처리가 완료되었을 때 수행되는 의존적인 작업들은 CompletableFuture를 완료한 스레드에 의해 실행될 수 있다. CompletableFuture가 이미 완료된 경우에는 콜백을 등록하는 스레드에서 해당 콜백을 실행한다.
  • 명시적으로 Executor 인자를 받지 않는 비동기 메소드들은 ForkJoinPool.commonPool()에서 스레드를 할당 받아 수행된다. AsynchronousCompletionTask는 async 메소드에 의해 생성된 비동기 task들을 식별하기 위한 마커 인터페이스이다.
  • 모든 CompletionStage 메소드들은 다른 public 메소드와 독립적으로 구현되어 있다. 따라서 서브 클래스에서 특정 메소드를 오버라이드 했다 하더라도 CompletionStage 메소드에는 영향을 주지 않는다.

CompletableFuture는 다음과 같은 정책으로 Future 인터페이스를 구현하고 있다.

  • Future 인터페이스는 비동기 작업을 완료를 시킬수 있는 메소드를 가지고 있지 않다. cancel() 메소드는 completeExceptionally와 같은 효과를 가진다. isCompletedExceptionally() 메소드는 CompleteableFuture가 예외적인 방식으로 완료되었는지 확인하는데 사용될 수 있다.
  • CompletionException에 의해 예외적으로 완료가 된 경우에, get() 또는 get(long, TimeUnit) 메소드는 ExecutionException을 던진다.

CompletableFuture에는 아래와 같은 메소드들이 있다.

  • public static < U > CompletableFuture< U > completedFuture(U value) : 이미 비동기 작업이 완료된 상태의 CompletableFuture를 반환한다.
  • public boolean complete(T value) : 이미 완료가 되지 않은 경우 비동기 결과를 value로 쓴다. get() 메소드를 호출하는 경우 get() 메소드의 결과로 value가 리턴된다. 만약 메소드의 호출로 인해 CompleteableFuture가 완료 상태로 변환된 경우에는 true를 리턴한다.
  • public boolean completeExceptionally(Throwable ex) : 이미 완료되지 않은 경우에는 예외적으로 완료되었음을 쓴다. get() 메소드를 호출하는 경우 예외가 발생한다. 만약 메소드의 호출로 인해 CompleteableFuture가 완료 상태로 변환된 경우에는 true를 리턴한다.

예제 코드

@Test
public void completedFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.completedFuture("test");
    assertThat(future.isDone()).isTrue();

    String result = future.get();
    assertThat(result).isEqualTo("test");
    assertThat(future.isCompletedExceptionally()).isFalse();
    assertThat(future.isDone()).isTrue();
}

@Test
public void complete() throws Exception {
    CompletableFuture<String> future = new CompletableFuture<>();
    assertThat(future.isDone()).isFalse();

    future.complete("test");

    assertThat(future.isDone()).isTrue();
    String result = future.get();
    assertThat(result).isEqualTo("test");
    assertThat(future.isCompletedExceptionally()).isFalse();
    assertThat(future.isDone()).isTrue();
}

@Test
public void completeExceptionally() throws Exception {
    CompletableFuture<String> future = new CompletableFuture<>();

    future.completeExceptionally(new IllegalStateException());

    Throwable throwable = Java6Assertions.catchThrowable(future::get);

    assertThat(throwable).isNotNull();
    assertThat(throwable).isInstanceOf(ExecutionException.class);
    assertThat(future.isCompletedExceptionally()).isTrue();

    ExecutionException exception = (ExecutionException) throwable;
    assertThat(exception.getCause()).isInstanceOf(IllegalStateException.class);
}
  • completedFuture 테스트 : CompletableFuture.completedFuture 메소드를 통해 이미 완료된 CompletableFuture 객체를 생성한다. 따라서 get() 메소드 호출시 바로 결과를 리턴한다.
  • complete 테스트 : 위의 테스트와 유사하지만 완료되지 않은 CompletableFuture 객체를 생성하고 complete 메소드를 통해서 완료시킨다.
  • completeExceptionally 테스트 : completeExceptionally 메소드를 통해서 CompletableFuture 객체를 비정상적으로 완료시킨다. 이 경우 get() 메소드는 ExecutionException를 발생시킨다.

CompletableFuture에는 스태틱 메소드를 통해 CompletableFuture 객체를 생성할 수 있는 메소드들이 있다.

  • public static CompletableFuture< Void > runAsync(Runnable runnable) : 비동기 작업을 수행하고, CompletableFuture 객체를 리턴한다. 리턴되는 CompletableFuture는 비동기 작업이 완료된 후에 완료된다. 비동기 작업은 ForkJoinPool.commonPool()에서 실행된다.
  • public static CompletableFuture< Void > runAsync(Runnable runnable, Executor executor) : 위에 메소드와 같지만 비동기 작업을 ForkJoinPool.commonPool()에서 실행하는게 아니라 Executor에 의해 실행된다.
  • public static < U > CompletableFuture< U > supplyAsync(Supplier< U > supplier) : runAsync(Runnable runnable)와 똑같다. 하지만 파라미터로 Supplier를 받기 때문에 비동기 수행 결과를 받을 수 있다.
  • public static < U > CompletableFuture< U > supplyAsync(Supplier< U > supplier, Executor executor) : runAsync(Runnable runnable, Executor executor)와 같다. 하지만 파라미터로 Supplier를 받기 때문에 비동기 수행결과를 받을 수 있다.

@Test
public void runAsync() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> sleep(1000));
    future.thenAccept(v -> log.info("thenAccept"));
    assertThat(future.isDone()).isFalse();
    Thread.sleep(1100);
}

@Test
public void runAsync_addCallbackAfterComplete() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> sleep(1000));
    Thread.sleep(1100);
    assertThat(future.isDone()).isTrue();
    future.thenAccept(v -> log.info("thenAccept"));
}

@Test
public void supplyAsync() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "result";
    });
    future.thenAccept(v -> log.info("thenAccept"));
    assertThat(future.isDone()).isFalse();
    Thread.sleep(1100);
    assertThat(future.isDone()).isTrue();
    assertThat(future.get()).isEqualTo("result");
}

@Test
public void supplyAsync_addCallbackAfterComplete() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "result";
    });
    Thread.sleep(1100);
    assertThat(future.isDone()).isTrue();
    assertThat(future.get()).isEqualTo("result");
    future.thenAccept(v -> log.info("thenAccept"));
}

private void sleep(long time) {
    try {
        log.info("sleep : {}", time);
        Thread.sleep(time);
    } catch (InterruptedException ignored) {
    }
}
  • runAsync : 비동기 작업을 수행하고, 콜백으로 로그를 출력한다. 비동기 작업은 1초동안 스레드를 sleep 시킨다. 따라서 1100ms가 지난 이후에 CompletableFuture 객체는 완료된 상태가 된다. 아래는 로그 결과이다. 로그 결과를 보면 알 수 있듯이 콜백 메소드는 비동기 작업을 완료한 스레드에서 실행된다.
13:54:41.605 [ForkJoinPool.commonPool-worker-1] INFO learning.CompletableFutureLearningTest - sleep : 1000
13:54:42.614 [ForkJoinPool.commonPool-worker-1] INFO learning.CompletableFutureLearningTest - thenAccept
  • runAsync_addCallbackAfterComplete : 비동기 작업을 수행한다. 그리고 콜백 메소드로 로그를 출력하도록 한다. 하지만 콜백을 비동기 작업이 끝난 후 등록한다. 즉 CompletableFuture 객체가 이미 완료된 상태에 콜백 메소드를 등록했다. 이 경우 콜백 메소드는 콜백 메소드를 등록하는 스레드에서 실행 된다. 아래 로그 결과를 보면 알 수 있듯이 thenAccept로 전달한 콜백이 main 스레드에서 실행되었음을 알 수 있다.
13:56:53.757 [ForkJoinPool.commonPool-worker-1] INFO learning.CompletableFutureLearningTest - sleep : 1000
13:56:54.920 [main] INFO learning.CompletableFutureLearningTest - thenAccept

나머지 테스트는 코드만 보더라도 쉽게 이해가 될 거 같으니깐 설명은 생략하도록 하겠다.

@Test
public void complete_addCallback() throws Exception {
    CompletableFuture<String> future = new CompletableFuture<>();
    future.thenAccept(s -> {
        log.info("thenAccept");
        assertThat(s).isEqualTo("result");
    });


    Thread thread = new Thread(() -> {
        sleep(1000);
        future.complete("result");
    }, "async test thread");
    thread.start();
    thread.join();
}

위에 테스트에서는 CompletableFuture 객체를 생성하고 콜백을 등록했다. 콜백에서는 단순히 로깅을 출력했다. 그리고 별도의 쓰레드에서 CompletableFuture 객체를 완료시켰다. (complete 메소드 사용) 그러면 콜백 메소드는 어떤 스레드에서 실행이 될까? 정답은 CompletableFuture를 완료시키는 스레드에서 실행이된다. 즉 CompletableFuture#complete 메소드를 호출한 스레드에서 콜백이 실행된다. 아래는 테스트를 실행시켰을 때 출력되는 로그이다.

14:04:21.394 [async test thread] INFO learning.CompletableFutureLearningTest - sleep : 1000
14:04:22.401 [async test thread] INFO learning.CompletableFutureLearningTest - thenAccept

CompletionState 인터페이스에는 아래와 같은 메소드들이 있다.

  • public CompletableFuture< Void > thenRun(Runnable action) : action은 비동기 작업이 정상적으로 끝난 뒤에 수행된다. action은 비동기 작업을 수행한 스레드에서 실행된다. CompletableFuture가 예외적으로 완료된 경우에는 action이 실행되지 않는다.
@Test
public void thenRun() throws Exception {
    Runnable runnable = Mockito.mock(Runnable.class);

    CompletableFuture<String> future = new CompletableFuture<>();
    future.thenRun(runnable);

    future.complete("result");

    verify(runnable).run();
}

@Test
public void thenRun_ShouldNotInvoked_WhenFutureIsExceptionallyCompleted() throws Exception {
    Runnable runnable = Mockito.mock(Runnable.class);

    CompletableFuture<String> future = new CompletableFuture<>();
    future.thenRun(runnable);

    future.completeExceptionally(new IllegalStateException());

    verify(runnable, never()).run();
}

위의 테스트 코드에서 알 수 있듯이 future가 완료되면 thenRun을 통해 전달한 콜백이 실행된다. 반면에 future가 예외적으로 완료된 경우에는 thenRun으로 전달한 콜백이 실행되지 않는다.

  • public CompletableFuture< Void > thenRunAsync(Runnable action) : thenRun과 유사하지만, action을 비동기 작업을 수행한 스레드에서 실행하지 않는다. ForJoinPool.commonPool에서 쓰레드를 할당 받아 action을 수행한다.
  • public < U > CompletionStage< U > thenApply(Function<? super T,? extends U> fn) : thenRun과 유사하지만 비동기 결과를 인자로 받을 수 있다. 비동기 결과를 받고 새로운 결과를 리턴하고자 할 때 사용할 수 있다.
  • public < U > CompletionStage< U > thenApplyAsyn(Function<? super T,? extends U> fn) : thenRunAsync과 유사하지만 비동기 결과를 인자로 받을 수 있다. 비동기 결과를 받고 새로운 결과를 리턴하고자 할 때 사용할 수 있다.
  • public CompletionStage< Void > thenAccept(Consumer<? super T> action) : thenRun과 유사하지만 비동기 결과를 인자로 받을 수 있다.
  • public CompletionStage< Void > thenAcceptAsync(Consumer<? super T> action) : thenRunAsync과 유사하지만 비동기 결과를 인자로 받을 수 있다.
  • public < U > CompletableFuture< U > thenCompose(Function<? super T, ? extends CompletionStage< U » fn) : thenApply와 유사하지만 콜백 메소드의 리턴 타입이 CompletionStage 하위 타입인 경우에 사용한다. (flatMap과 유사하다)
  • public < U > CompletableFuture< U > thenComposeAsync(Function<? super T, ? extends CompletionStage< U » fn) : thenApplyAsync와 유사하지만 콜백 메소드의 리턴 타입이 CompletionStage 하위 타입인 경우에 사용한다.
  • public CompletableFuture< T > exceptionally(Function<Throwable, ? extends T> fn) : CompletableFuture가 예외적으로 완료된 경우 실행될 콜백을 등록할 수 있다. 만약에 예외가 발생하지 않는 경우 콜백 메소드는 호출되지 않으며, 리턴되는 CompletableFuture는 원래 CompletableFuture와 같은 값을 가진다.
@Test
public void exceptionally() throws Exception {
    Function<Throwable, String> function = Mockito.mock(Function.class);
    IllegalStateException ex = new IllegalStateException();

    CompletableFuture<String> future = new CompletableFuture<>();
    future.exceptionally(function);
    future.completeExceptionally(ex);

    verify(function).apply(ex);
}

@Test
public void exceptionally_shouldNotInvoked_WhenFutureIsNormallyCompleted() throws Exception {
    Function<Throwable, String> function = Mockito.mock(Function.class);
    IllegalStateException ex = new IllegalStateException();

    CompletableFuture<String> future = new CompletableFuture<>();
    future.exceptionally(function);
    future.completeExceptionally(ex);

    verify(function, never()).apply(ex);
}

첫번째 테스트의 경우 CompletableFuture 객체가 completeExceptionally 메소드 호출로 완료되었기 때문에 exceptionally로 등록한 콜백 메소드가 실행된다. 반면에 두번째 테스트의 경우 CompletableFuture 객체가 complete 메소드 호출로 완료되었기 때문에 exceptionally로 등록한 콜백 메소드가 실행되지 않는다.

@Test
public void exceptionally_test() throws Exception {
    CompletableFuture<String> future = new CompletableFuture<>();
    CompletableFuture<String> future2 = future.exceptionally(e -> "Catch Exception");

    future.completeExceptionally(new IllegalStateException());

    // future
    assertThat(future.isCompletedExceptionally()).isTrue();
    Throwable throwable = catchThrowable(future::get);
    assertThat(throwable).isNotNull();
    assertThat(throwable).isInstanceOf(ExecutionException.class);

    // future2
    assertThat(future2.isCompletedExceptionally()).isFalse();
    assertThat(future2.get()).isEqualTo("Catch Exception");
}

@Test
public void exceptionally_test2() throws Exception {
    CompletableFuture<String> future = new CompletableFuture<>();
    CompletableFuture<String> future2 = future.exceptionally(e -> "Catch Exception");

    future.complete("Normal Complete");

	// future2
    assertThat(future2.isCompletedExceptionally()).isFalse();
    assertThat(future2.get()).isEqualTo("Normal Complete");
}

위 테스트를 통해 알 수 있는 재밌는 사실은, CompletableFuture가 예외적으로 완료되어 exceptionally로 등록된 콜백이 실행되는 경우 리턴되는 CompletableFuture 객체는 정상적인 완료가 된다. 위에 테스트에서 future는 completeExceptionally 메소드를 통해 예외적으로 완료되었다. 따라서 future.isCompletedExceptionally()는 true로 리턴된다. 하지만 future.exceptionally를 통해 리턴된 future2는 정상적인 완료가 된다. 그렇기 때문에 future2.isCompletedExceptionally()는 false를 리턴한다.

이 외에도 더 다양한 메소드들있다. 더 자세한건 코드나 자바독을 참고하길 바란다. CompletionState 인터페이스는 하나의 비동기 처리가 완료되었을 때, 다른 의존적인 작업을 수행할 수 있도록 해주는 메소드들을 가지고 있다. 그리고 CompletionState 인터페이스의 메소드들은 함수형 스타일로 의존적인 비동기 작업들을 메소드 체인 형식으로 작성할 수 있다.

ListenableFuture

Spring에서 제공하는 클래스로 콜백 메소드를 추가할 수 있도록 Future를 확장한 인터페이스이다. 만약에 future가 이미 완료된 상태에서 콜백이 추가된다면 콜백은 즉시 호출된다.

ListenableFuture의 구현체로는 AsyncResult, CompletableToListenableFutureAdapter, ListenableFutureAdapter, ListenableFutureTask, SettableListenableFuture 등이 있다.

  • void addCallback(ListenableFutureCallback<? super T> callback) : ListenableFutureCallback를 등록한다.
  • void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) : 자바 8의 람다를 사용할 수 있도록 성공 콜백과 실패 콜백을 각각 등록한다.
  • completable() : ListenableFuture 객체를 CompletableFuture로 변환해서 리턴한다.

SettableListenableFuture는 ListenableFuture의 구현체로 set 메소드 또는 setException 메소드로 결과를 등록할 수 있다.

  • public boolean set(@Nullable T value) : Future의 결과를 등록한다. 만약에 결과가 성공적으로 등록되었다면 true를 리턴한다. 만약 결과가 이미 등록되었거나 cancell된 경우에는 false를 리턴한다.
  • public boolean setException(Throwable exception) : Future에 예외를 등록한다. 만약 예외가 성공적으로 등록되었다면 true를 리턴한다. 만약에 Future의 결과가 이미 등록되었거나 cancell된 경우에는 false를 리턴한다.
@Test
public void should_invoke_successCallback_whenSetResult() throws Exception {
    Runnable successCallback = Mockito.mock(Runnable.class);
    Runnable failCallback = Mockito.mock(Runnable.class);


    SettableListenableFuture<String> future = new SettableListenableFuture<>();

    future.addCallback(new ListenableFutureCallback<String>() {
        @Override
        public void onFailure(Throwable ex) {
            failCallback.run();
        }

        @Override
        public void onSuccess(@Nullable String result) {
            successCallback.run();
        }
    });

    future.set("success");
    verify(successCallback).run();
    verify(failCallback, never()).run();
}

@Test
public void should_invoke_failCallback_whenSetException() throws Exception {
    Runnable successCallback = Mockito.mock(Runnable.class);
    Runnable failCallback = Mockito.mock(Runnable.class);


    SettableListenableFuture<String> future = new SettableListenableFuture<>();

    future.addCallback(new ListenableFutureCallback<String>() {
        @Override
        public void onFailure(Throwable ex) {
            failCallback.run();
        }

        @Override
        public void onSuccess(@Nullable String result) {
            successCallback.run();
        }
    });

    future.setException(new IllegalArgumentException());

    verify(successCallback, never()).run();
    verify(failCallback).run();
}

위에는 간단한 테스트 코드이다. 자세한 설명은 생략하도록 하겠다. (매우 쉬운 테스트라 쉽게 이해 가능할 것이다)

future.addCallback(result -> successCallback.run(), ex -> failCallback.run());

addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) 메소드를 사용하면 위와 같이 자바 8의 람다를 사용해서 콜백 메소드를 등록할 수 있다. 이처럼 콜백을 등록하면 훨씬 코드가 읽기 쉬워 진다.

콜백 메소드는 어떤 스레드에서 실행이 될까? ListenableFuture의 결과 혹은 예외를 등록하는 스레드에서 실행된다. 하지만 이미 완료된 ListenableFuture에 콜백 메소드를 추가하는 경우 addCallback 메소드를 호출한 스레드에서 즉시 콜백이 실행된다.

@Test
public void should_invoke_successCallback_whenSetResult2() throws Exception {
    Runnable successCallback = Mockito.mock(Runnable.class);
    Runnable failCallback = Mockito.mock(Runnable.class);

    SettableListenableFuture<String> future = new SettableListenableFuture<>();
    future.addCallback(result -> {
        log.info("success callback run");
        successCallback.run();
    }, ex -> {
        log.info("fail callback run");
        failCallback.run();
    });

    Thread thread = new Thread(() -> future.set("result"), "async thread");
    thread.start();
    thread.join();

    verify(successCallback).run();
    verify(failCallback, never()).run();
}

위처럼 테스트 코드를 작성하면 successCallback은 SettableListenableFuture에 결과를 등록하는 스레드에서 실행된다.

아래는 테스트 코드 실행 결과이다.

17:36:28.002 [async thread] INFO learning.ListenableFutureLeaningTest - success callback run

이미 완료된 future에 콜백 메소드를 등록하는 경우 addCallback 메소드를 호출한 스레드에서 콜백을 바로 실행한다. 테스트를 위해서 아래와 같이 코드를 작성해보았다. 실행해보니 콜백 메소드가 메인 스레드에서 바로 실행되었다. 즉 addCallback 메소드를 호출하는 순간 SettableListenableFuture가 이미 완료된 경우 바로 콜백 메소드가 호출된다.

@Test
public void should_invoke_successCallback_immediately_whenResultIsAlreadySet() throws Exception {
    Runnable successCallback = Mockito.mock(Runnable.class);
    Runnable failCallback = Mockito.mock(Runnable.class);

    SettableListenableFuture<String> future = new SettableListenableFuture<>();
    future.set("result");
    future.addCallback(result -> {
        log.info("success callback run");
        successCallback.run();
    }, ex -> {
        log.info("fail callback run");
        failCallback.run();
    });

    verify(successCallback).run();
    verify(failCallback, never()).run();
}

위 코드 실행 결과이다.

17:39:20.402 [main] INFO learning.ListenableFutureLeaningTest - success callback run

ListenableFuture를 CompletableFuture로 바꾸고 싶다면 completable() 메소드를 호출하면 된다. 아래는 completable() 코드이다.

default CompletableFuture<T> completable() {
    CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
    addCallback(completable::complete, completable::completeExceptionally);
    return completable;
}

참고