A Tutorial To Completable Future in Java

Motivation for Completable Future

Remember the guy Future?

One which used to do tasks for us in a separate thread and return a future object where we can get the result.

Let’s refresh the memory with a small code snippet showing Future at work.

ExecutorService executorService = Executors.newFixedThreadPool(5);

Callable customer = () -> {
return findCustomerDetails();
};

Future<String> customerFuture = executorService.submit(customer);

String customerStr = customerFuture.get();

Here, our code will wait until findCustomerDetails() works and return the result or the timeout happens. There is no way, to explicitly complete this future.

Let’s assume, we have to design a booking flow for parking. The flow looks like as follow:

Booking Flow

Future does not provide any way for call back after the completion of getCustomerDetails and getParkingDetails, so that we can start booking operation just after.

We would ideally want to do these three operation chained together, and such chaining is not provided by Future APIs.

Similarly, Exception Handling has to be done outside the future APIs.

Completable Future At Work

We would like to approach completable future through examples.

private static void demoCompletableFuture() {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("Task Running inside completable Future");
        });
    }

Output :: Task Running inside completable Future

In the example above, the runAsync API of completable future takes runnable, which is provided in lambda. The example above does not show any difference between CompletableFuture as well as Future. But we will get to it.

With this introduction and motivation, it’s time to formally define Completable Future.

Definition

Completable Future implements Future. In a way, Completable Future is a future that can be explicitly completed. It can also act as an CompletionStage by providing way to support dependent functions. It also provides way to callback or in other words, support actions that can be triggered on it’s completion.

Salient Features

  • When multiple threads wanted to complete, cancel or completeExceptionally , only one of them succeeds
  • Actions supplied for dependent completions of non-async methods may be performed by the thread used by current completable future or any other caller of completion method
  • Async methods without having an explicit Executor argument are performed using the ForkJoinPool.commonPool()
  • In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException

Usage

No Arguement

CompletableFuture<String> completableFuture = new CompletableFuture<>();
 String value = completableFuture.get();

The above statement will cause program to run forever. Because there is nothing for the completable future to do.

RunAsync

private static void demoCompletableFuture() {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("Task Running inside completable Future");
        });
    }

Output :: Task Running inside completable Future

This has already been explained above. In the example above, the runAsync API of completable future takes runnable, which is provided in lambda.

SupplyAsync

private static void demoSupplyAsync() throws InterruptedException, ExecutionException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Task Running inside completable Future");
return "I am the returned result";
});
String result = completableFuture.get();
System.out.println(result);
}

Supply Async takes a supplier. Reminder: Supplier is a kind of functional interface that does not take any input but provides an output.

Here, we are giving Completable future a supplier lambda and returning a result which we then get as we would have done in Future.

The output of this program is as follow:

Task Running inside completable Future
I am the returned result

ThenAccept

private static void demoThenAccept() {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Task Running inside completable Future");
return "returned result";
});
CompletableFuture<Void> secondCompletableFuture = completableFuture.thenAccept(result -> {
System.out.println("I am the "+result);
});
}

Let’s say you want to do some action after the completion of a Completable Future. There is nothing to be returned in that action, then you would use thenAccept as shown in the example above. ThenAccept takes input from the previous Completable Future and works on it.

The output of this program is as follow:

Task Running inside completable Future
I am the returned result

ThenRun

private static void demoThenRun() {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Task Running inside completable Future");
return "returned result";
});
CompletableFuture<Void> secondCompletableFuture = completableFuture.thenRun(() -> {
System.out.println("I don't get any input from the previous completableFuture. I am just triggered after it.");
});
}

Suppose you want to trigger some action after the completion of a completable future. And you don’t have any input from the completable feature as well as no output to be returned.

Output would be:

Task Running inside completable Future
I don't get any input from the previous completableFuture. I am just triggered after it.

ThenCombine

private static void demoThenCombine() throws InterruptedException, ExecutionException {
long timeStampStart = System.currentTimeMillis();
CompletableFuture<String> completableFutureFirst = CompletableFuture.supplyAsync(() -> {
System.out.println("Task Running inside completable Future");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "one plus ";
});
CompletableFuture<String> completableFutureSecond = CompletableFuture.supplyAsync(() -> {
System.out.println("Task Running inside completable Future");
return "two";
});
CompletableFuture<String> completableFutureThird = completableFutureFirst.thenCombine(completableFutureSecond, (resultOne, resultTwo) -> {
return (resultOne + resultTwo + " is three");
});
String result = completableFutureThird.get();
long timeStampEnd = System.currentTimeMillis();
System.out.println("Time taken in the Completable Future operation is " + (timeStampEndtimeStampStart));
System.out.println("Result is :: "+result);
}

ThenCombine allows to trigger a task after both the CompletableFuture is completed. Here, for example, we took 500ms to complete completableFutureFirst and after that, as soon as completableFutureSecond is completed, completableFutureThird is triggered.

Output is:

Task Running inside completable Future
Task Running inside completable Future
Time taken in the Completable Future operation is 563
Result is :: one plus two is three

Complete

private static void demoCompletableFutureComplete() throws InterruptedException, ExecutionException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
//String value = completableFuture.get();
//The above statement will cause program to run forever.
// Because there is nothing for the completable future to do.
completableFuture.complete("Completing the completable future with this default text");
String value = completableFuture.get();
System.out.println("Value After Complete: "+value);
}

For whatsoever reason, let’s say we have to complete the future, then we need to use completeAPI. Complete API will complete the future with whatever value we give in the arguement.

Output in this case would be:

Value After Complete: Completing the completable future with this default text

CompleteExceptionally

private static void demoCompleteExceptionally() throws InterruptedException, ExecutionException {
CompletableFuture<String> completableFutureFirst = CompletableFuture.supplyAsync(() -> {
System.out.println("Task Running inside completable Future");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "oops";
});
completableFutureFirst.completeExceptionally(new RuntimeException("Task in CompletableFuture Failed"));
String result = completableFutureFirst.get();
System.out.println("Result :: "+result);
}

Assume something wrong happened with the task in CompletableFuture and you want to complete it and throw and exception so that the client of your API does appropriate handling of the exception. In such cases, you would use completeExceptionally.

Output for this is as follow:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Task in CompletableFuture Failed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at com.company.CompletableFutureAPIUsage.demoCompleteExceptionally(CompletableFutureAPIUsage.java:38)
at com.company.CompletableFutureAPIUsage.main(CompletableFutureAPIUsage.java:20)
Caused by: java.lang.RuntimeException: Task in CompletableFuture Failed
at com.company.CompletableFutureAPIUsage.demoCompleteExceptionally(CompletableFutureAPIUsage.java:36)
… 1 more

Cancel

private static void demoCancel() throws InterruptedException, ExecutionException {
CompletableFuture<String> completableFutureFirst = CompletableFuture.supplyAsync(() -> {
System.out.println("Task Running inside completable Future");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result";
});
completableFutureFirst.cancel(true);
boolean isCancelled = completableFutureFirst.isCancelled();
System.out.println("Completable Future is cancelled :: "+isCancelled);
String result = completableFutureFirst.get();
System.out.println("Result :: "+result);
}
view raw demoCancel.java hosted with ❤ by GitHub

You can cancel the future if you don’t need it anymore. Further, there is an option to cancel it if it’s running. CompletableFuture.get() throws an exception when you get in a cancelled completable future. isCancelled() API lets you check if it is cancelled or not.

Output is:

Task Running inside completable Future
Completable Future is cancelled :: true
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263)
at com.company.CompletableFutureAPIUsage.demoCancel(CompletableFutureAPIUsage.java:37)
at com.company.CompletableFutureAPIUsage.main(CompletableFutureAPIUsage.java:22)


Github link for the programs shown above

https://github.com/rohitsingh20122992/completableFuture


Comparison of Iterative, Future as well as CompletableFuture

package com.company;
import java.util.concurrent.*;
public class Main {
/**
* To book a parking, we need customer details and parking details.
* And we don't need any booking return object for the client, in other word,
* we can make booking process async/
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws Exception {
//Simple Iterative Approach
showIterativeProcess();
//Using Future
showFutureProcess();
//Using Completable Future
showCompletableFuture();
}
private static void showCompletableFuture() throws InterruptedException, ExecutionException {
System.out.println("===============Completable Future process starts================");
long timeStampStart = System.currentTimeMillis();
CompletableFuture<String> booking = CompletableFuture.supplyAsync(() -> findCustomerDetails())
.thenCombineAsync(CompletableFuture.supplyAsync(() -> findParkingDetails()), (customer, parking) -> book(customer, parking));
System.out.println(booking.get());
long timeStampEnd = System.currentTimeMillis();
System.out.println("Time taken in the Completable Future operation is " + (timeStampEndtimeStampStart));
System.out.println("==========================================");
System.out.println();
System.out.println();
}
private static void showFutureProcess() throws InterruptedException, java.util.concurrent.ExecutionException {
System.out.println("===============Future process starts================");
long timeStampStart = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(5);
Callable customer = () -> {
return findCustomerDetails();
};
Callable parking = () -> {
return findParkingDetails();
};
Future<String> customerFuture = executorService.submit(customer);
Future<String> parkingFuture = executorService.submit(parking);
String customerStr = customerFuture.get();
String parkingStr = parkingFuture.get();
Callable booking = () -> {
return book(customerStr, parkingStr);
};
Future<String> bookingFuture = executorService.submit(booking);
System.out.println(bookingFuture.get());
long timeStampEnd = System.currentTimeMillis();
System.out.println("Time taken in the Future operation is " + (timeStampEndtimeStampStart));
System.out.println("==========================================");
System.out.println();
System.out.println();
}
private static void showIterativeProcess() {
System.out.println("===============Iterative process starts================");
long timeStampStart = System.currentTimeMillis();
String customerName = findCustomerDetails();
String parkingName = findParkingDetails();
String booking = book(customerName, parkingName);
long timeStampEnd = System.currentTimeMillis();
System.out.println(booking);
System.out.println("Time taken in the iterative operation is " + (timeStampEndtimeStampStart));
System.out.println("==========================================");
System.out.println();
System.out.println();
}
private static String findCustomerDetails() {
System.out.println("****** Finding customer Details ******* Sleeping for 500 ms");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("****** Found customer Details *******");
return "John";
}
private static String findParkingDetails() {
System.out.println("****** Finding parking Details ******* Sleeping for 400 ms");
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("****** Found parking Details *******");
return "Mall Road Parking";
}
private static String book(String customer, String parking) {
System.out.println("****** booking parking ******* Sleeping for 700 ms");
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("****** Parking booked *******");
return (customer + " has booked a parking at " + parking);
}
}
view raw Main.java hosted with ❤ by GitHub

We have covered all the major concepts. We will try to cover remaining ones in later post.

Reference

Oracle

Callicoder

Baeldung

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com

Leave a Reply

Up ↑

%d bloggers like this: