Skip to content

Commit 99124a6

Browse files
author
Milad Mofidi
committed
-Adding exception handling for CompletableFuture services with handle() and exceptionally() and whenComplete() methods.
-Adding new custom thread pool for our completable future methods
1 parent a5f3c44 commit 99124a6

File tree

7 files changed

+452
-8
lines changed

7 files changed

+452
-8
lines changed

Diff for: pom.xml

+13
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@
4040
<version>5.8.1</version>
4141
<scope>test</scope>
4242
</dependency>
43+
<!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
44+
<dependency>
45+
<groupId>org.mockito</groupId>
46+
<artifactId>mockito-core</artifactId>
47+
<version>5.1.1</version>
48+
<scope>test</scope>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.mockito</groupId>
52+
<artifactId>mockito-junit-jupiter</artifactId>
53+
<version>4.0.0</version>
54+
<scope>test</scope>
55+
</dependency>
4356

4457
</dependencies>
4558

Diff for: src/main/java/org/example/service/completablefuture_approach/CompletableFutureHelloWorld.java

+57-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import org.example.service.HelloWorldService;
44

55
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
68

79
import static org.example.util.CommonUtil.delay;
810
import static org.example.util.CommonUtil.startTimer;
@@ -43,7 +45,7 @@ public String helloWorldWithMultipleAsyncCall(){
4345
timeTaken();
4446
return result;
4547
}
46-
public String helloWorldWith3AsyncCall(){
48+
public String helloWorld_with_3_async_calls(){
4749
startTimer();
4850
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> helloWorldService.hello());
4951
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> helloWorldService.world());
@@ -60,6 +62,60 @@ public String helloWorldWith3AsyncCall(){
6062
return result;
6163
}
6264

65+
public String helloWorld_with_3_async_calls_with_log(){
66+
startTimer();
67+
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> helloWorldService.hello());
68+
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> helloWorldService.world());
69+
CompletableFuture<String> hiCompletableFuture = CompletableFuture.supplyAsync(() -> {
70+
delay(1000);
71+
return " Hi CompletableFuture!";
72+
});
73+
String result = hello
74+
.thenCombine(world, (h,w) ->{
75+
log("thenCombine h/w");
76+
return h+w;
77+
})
78+
.thenCombine(hiCompletableFuture, (previous,current) -> {
79+
log("thenCombine previous/current");
80+
return previous+current;
81+
})
82+
.thenApply(s -> {
83+
log("thenApply h/w");
84+
return s.toUpperCase();
85+
})
86+
.join();
87+
timeTaken();
88+
return result;
89+
}
90+
91+
//In this approach we are not going to use the CommonForkJoin ThreadPool, we are going to custom thread pool for this method, you can see thr thread pool name in the console log when you run the corresponding test method
92+
public String helloWorld_with_3_async_calls_with_custom_threadPool(){
93+
startTimer();
94+
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
95+
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> helloWorldService.hello(),executorService);
96+
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> helloWorldService.world(),executorService);
97+
CompletableFuture<String> hiCompletableFuture = CompletableFuture.supplyAsync(() -> {
98+
delay(1000);
99+
return " Hi CompletableFuture!";
100+
},executorService);
101+
String result = hello
102+
.thenCombine(world, (h,w) ->{
103+
log("thenCombine h/w");
104+
return h+w;
105+
})
106+
.thenCombine(hiCompletableFuture, (previous,current) -> {
107+
log("thenCombine previous/current");
108+
return previous+current;
109+
})
110+
.thenApply(s -> {
111+
log("thenApply h/w");
112+
return s.toUpperCase();
113+
})
114+
.join();
115+
timeTaken();
116+
return result;
117+
}
118+
63119
public String helloWorld_4_async_calls() {
64120
startTimer();
65121
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> this.helloWorldService.hello());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package org.example.service.completablefuture_approach;
2+
3+
import org.example.service.HelloWorldService;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
7+
import static org.example.util.CommonUtil.delay;
8+
import static org.example.util.CommonUtil.startTimer;
9+
import static org.example.util.CommonUtil.timeTaken;
10+
import static org.example.util.LoggerUtil.log;
11+
12+
/**
13+
* @author milad mofidi
14+
15+
* user: miladm on 2/17/2023
16+
*/
17+
public class CompletableFutureHelloWorldException
18+
{
19+
HelloWorldService helloWorldService;
20+
21+
public CompletableFutureHelloWorldException(HelloWorldService helloWorldService)
22+
{
23+
this.helloWorldService = helloWorldService;
24+
}
25+
26+
public String helloWorld_3_async_calls_handle()
27+
{
28+
startTimer();
29+
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> helloWorldService.hello());
30+
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> helloWorldService.world());
31+
CompletableFuture<String> hiCompletableFuture = CompletableFuture.supplyAsync(() -> {
32+
delay(1000);
33+
return " Hi CompletableFuture!";
34+
});
35+
String result = hello
36+
.handle((res, exception) -> { //handle exception for hello completable future here if there is a exception from hello() method since we have returned the empty string for hello cf so the result would be=""
37+
log("res is: " + res);
38+
if (exception != null)
39+
{
40+
log("Exception is: " + exception.getMessage());
41+
return "";
42+
}
43+
else
44+
{
45+
return res;
46+
}
47+
})
48+
.thenCombine(world, (h, w) -> h + w)
49+
.handle((res, exception) -> { //handle exception for world completable future
50+
log("res is: " + res);
51+
if (exception != null)
52+
{
53+
log("Exception after world is: " + exception.getMessage());
54+
return "";
55+
}
56+
else
57+
{
58+
return res;
59+
}
60+
})
61+
.thenCombine(hiCompletableFuture, (previous, current) -> previous +
62+
current) //if there is exception in world() method the result of this code will be " Hi CompletableFuture!"
63+
.thenApply(String::toUpperCase) //the result of this code will be "HELLO WORLD! HI COMPLETABLEFUTURE!"
64+
.join();
65+
timeTaken();
66+
return result;
67+
}
68+
69+
public String helloWorld_3_async_calls_exceptionally()
70+
{
71+
startTimer();
72+
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> helloWorldService.hello());
73+
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> helloWorldService.world());
74+
CompletableFuture<String> hiCompletableFuture = CompletableFuture.supplyAsync(() -> {
75+
delay(1000);
76+
return " Hi CompletableFuture!";
77+
});
78+
String result = hello
79+
.exceptionally(
80+
exception -> { //handle exception for hello completable future here if there is a exception from hello() method since we have returned the empty string for hello cf so the result would be=""
81+
log("Exception is: " + exception.getMessage());
82+
return "";
83+
})
84+
.thenCombine(world, (h, w) -> h + w)
85+
.exceptionally(exception -> { //handle exception for world completable future
86+
log("Exception after world is: " + exception.getMessage());
87+
return "";
88+
})
89+
.thenCombine(hiCompletableFuture, (previous, current) -> previous +
90+
current) //if there is exception in world() method the result of this code will be " Hi CompletableFuture!"
91+
.thenApply(String::toUpperCase) //the result of this code will be "HELLO WORLD! HI COMPLETABLEFUTURE!"
92+
.join();
93+
timeTaken();
94+
return result;
95+
}
96+
97+
public String helloWorld_3_async_calls_whenComplete()
98+
{
99+
startTimer();
100+
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> helloWorldService.hello());
101+
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> helloWorldService.world());
102+
CompletableFuture<String> hiCompletableFuture = CompletableFuture.supplyAsync(() -> {
103+
delay(1000);
104+
return " Hi CompletableFuture!";
105+
});
106+
String result = hello
107+
.whenComplete(
108+
(res, exception) -> { //handle exception for hello completable future here if there is a exception from hello() method since we have returned the empty string for hello cf so the result would be=""
109+
log("res is: " + res);
110+
if (exception != null)
111+
{
112+
log("Exception is: " + exception.getMessage());
113+
}
114+
})
115+
.thenCombine(world, (h, w) -> h + w)
116+
.whenComplete((res, exception) -> { //handle exception for world completable future
117+
log("res is: " + res);
118+
if (exception != null)
119+
{
120+
log("Exception after world is: " + exception.getMessage());
121+
}
122+
})
123+
.exceptionally(
124+
exception -> {
125+
log("Exception after thenCombine is: " + exception.getMessage());
126+
return "";
127+
})
128+
.thenCombine(hiCompletableFuture, (previous, current) -> previous +
129+
current) //if there is exception in world() method the result of this code will be " Hi CompletableFuture!"
130+
.thenApply(String::toUpperCase) //the result of this code will be "HELLO WORLD! HI COMPLETABLEFUTURE!"
131+
.join();
132+
timeTaken();
133+
return result;
134+
}
135+
}

Diff for: src/main/java/org/example/service/completablefuture_approach/ProductServiceUsingCompletableFuture.java

+59-5
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,45 @@ public Product retrieveProductDetailsWithInventory_approach2(String productId)
115115
return productInfo;
116116
});
117117
CompletableFuture<Review> reviewCf =
118-
CompletableFuture.supplyAsync(() -> reviewService.retrieveReviews(productId));
119-
Product product = productInfoCf.thenCombine(reviewCf,
120-
(productInfo, review) -> new Product(
121-
productId, productInfo,
122-
review))
118+
CompletableFuture.supplyAsync(() -> reviewService.retrieveReviews(productId))
119+
.exceptionally( exception -> {
120+
log("Handled the exception in reviewService: "+ exception.getMessage());
121+
return Review.builder().noOfReviews(0).overallRating(0.0).build();
122+
});
123+
124+
Product product = productInfoCf
125+
.thenCombine(reviewCf,(productInfo, review) -> new Product(productId, productInfo,review))
126+
.whenComplete((product1, exception)-> {
127+
log("Inside whenComplete: " +product1+ "and the exception is: "+ exception);
128+
})
129+
.join(); //join() will blocking the main thread until their response is ready
130+
stopWatch.stop();
131+
log("Total Time Taken : " + stopWatch.getTime());
132+
return product;
133+
}
134+
135+
public Product retrieveProductDetailsWithInventory_approach2_withBetterPerformance_exceptionally(String productId)
136+
{
137+
stopWatch.start();
138+
139+
CompletableFuture<ProductInfo> productInfoCf =
140+
CompletableFuture.supplyAsync(() -> productInfoService.retrieveProductInfo(productId)).
141+
thenApply(productInfo -> {
142+
productInfo.setProductOptions(updateInventory_approach2_withBetterPerformance_exceptionally(productInfo));
143+
return productInfo;
144+
});
145+
CompletableFuture<Review> reviewCf =
146+
CompletableFuture.supplyAsync(() -> reviewService.retrieveReviews(productId))
147+
.exceptionally( exception -> {
148+
log("Handled the exception in reviewService: "+ exception.getMessage());
149+
return Review.builder().noOfReviews(0).overallRating(0.0).build();
150+
});
151+
152+
Product product = productInfoCf
153+
.thenCombine(reviewCf,(productInfo, review) -> new Product(productId, productInfo,review))
154+
.whenComplete((product1, exception)-> {
155+
log("Inside whenComplete: " +product1+ " and the exception is: "+ exception);
156+
})
123157
.join(); //join() will blocking the main thread until their response is ready
124158
stopWatch.stop();
125159
log("Total Time Taken : " + stopWatch.getTime());
@@ -155,6 +189,26 @@ private List<ProductOption> updateInventory_approach2_withBetterPerformance(Prod
155189
Collectors.toList());
156190
}
157191

192+
private List<ProductOption> updateInventory_approach2_withBetterPerformance_exceptionally(ProductInfo productInfo)
193+
{
194+
List<CompletableFuture<ProductOption> > productOptions = productInfo.getProductOptions().stream()
195+
.map(productOption -> {
196+
return CompletableFuture.supplyAsync( () -> inventoryService.retrieveInventory(productOption))
197+
.exceptionally( exception -> {
198+
log("Handled the exception in updateInventory call: "+ exception.getMessage());
199+
return Inventory.builder().count(1).build();
200+
})
201+
.thenApply(inventory -> {
202+
productOption.setInventory(inventory);
203+
return productOption;
204+
});
205+
})
206+
.collect(Collectors.toList());
207+
208+
return productOptions.stream().map(CompletableFuture::join).collect(
209+
Collectors.toList());
210+
}
211+
158212
public static void main(String[] args)
159213
{
160214

0 commit comments

Comments
 (0)