Skip to content

Commit b1462dd

Browse files
committed
simplify and document the coalescing bulkloader example (fixes #7)
1 parent f3a9345 commit b1462dd

File tree

27 files changed

+809
-1120
lines changed

27 files changed

+809
-1120
lines changed

.github/dependabot.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ updates:
1212
directory: /
1313
schedule:
1414
interval: daily
15-
- package-ecosystem: maven
16-
directory: examples/coalescing-bulkloader
15+
- package-ecosystem: gradle
16+
directory: examples/coalescing-bulkloader-reactor
1717
schedule:
1818
interval: daily
1919
- package-ecosystem: gradle

.github/workflows/examples.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ jobs:
9898
- name: Write-behind (rxjava)
9999
working-directory: examples/write-behind-rxjava
100100
run: ./gradlew build
101-
- name: Coalescing Bulkloader
102-
working-directory: examples/coalescing-bulkloader
103-
run: ./mvnw test
101+
- name: Coalescing Bulkloader (reactor)
102+
working-directory: examples/coalescing-bulkloader-reactor
103+
run: ./gradlew build
104104
- name: Hibernate (jcache)
105105
working-directory: examples/hibernate
106106
run: ./gradlew build
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
[Reactor][reactor] data streams facilitate the consolidation of independent asynchronous loads into
2+
batches at the cost of a small buffering delay. The [bufferTimeout][] operator accumulates requests
3+
until reaching a maximum size or time limit. Since each request consists of a key and its pending
4+
result, when the subscriber is notified it performs the batch load and completes the key's future
5+
with its corresponding value.
6+
7+
It some scenarios it may be desirable to only aggregate cache refreshes rather than imposing delays
8+
on callers awaiting explicit loads. An automated reload initiated by `refreshAfterWrite` will occur
9+
on the first stale request for an entry. While the key is being refreshed the previous value
10+
continues to be returned, in contrast to eviction which forces retrievals to wait until the value
11+
is loaded anew. In such cases, batching these optimistic reloads can minimize the impact on the
12+
source system without adversely affecting the responsiveness of the explicit requests.
13+
14+
### Refresh coalescing
15+
A [Sink][sink] collects requests, buffering them up to the configured threshold, and subsequently
16+
delivers the batch to the subscriber. The `parallelism` setting determines the number of concurrent
17+
bulk loads that can be executed if the size constraint results in multiple batches.
18+
19+
```java
20+
public final class CoalescingBulkLoader<K, V> implements CacheLoader<K, V> {
21+
private final Function<Set<K>, Map<K, V>> mappingFunction;
22+
private final Sinks.Many<Request<K, V>> sink;
23+
24+
/**
25+
* @param maxSize the maximum entries to collect before performing a bulk request
26+
* @param maxTime the maximum duration to wait before performing a bulk request
27+
* @param parallelism the number of parallel bulk loads that can be performed
28+
* @param mappingFunction the function to compute the values
29+
*/
30+
public CoalescingBulkLoader(int maxSize, Duration maxTime, int parallelism,
31+
Function<Set<K>, Map<K, V>> mappingFunction) {
32+
this.sink = Sinks.many().unicast().onBackpressureBuffer();
33+
this.mappingFunction = requireNonNull(mappingFunction);
34+
sink.asFlux()
35+
.bufferTimeout(maxSize, maxTime)
36+
.map(requests -> requests.stream().collect(
37+
toMap(Entry::getKey, Entry::getValue)))
38+
.parallel(parallelism)
39+
.runOn(Schedulers.boundedElastic())
40+
.subscribe(this::handle);
41+
}
42+
```
43+
44+
To ensure immediate responses for explicit loads these calls directly invoke the mapping function,
45+
while the optimistic reloads are instead submitted to the sink. It's worth noting that this call is
46+
`synchronized`, as a sink does not support concurrent submissions.
47+
48+
```java
49+
@Override public V load(K key) {
50+
return loadAll(Set.of(key)).get(key);
51+
}
52+
53+
@Override public abstract Map<K, V> loadAll(Set<? extends K> key) {
54+
return mappingFunction.apply(keys);
55+
}
56+
57+
@Override public synchronized CompletableFuture<V> asyncReload(K key, V oldValue, Executor e) {
58+
var entry = Map.entry(key, new CompletableFuture<V>());
59+
sink.tryEmitNext(entry).orThrow();
60+
return entry.getValue();
61+
}
62+
```
63+
64+
The subscriber receives a batch of requests, each comprising of a key and a pending future result.
65+
It performs the synchronous load and then either completes the key's future with the corresponding
66+
value or an exception if a failure occurs.
67+
68+
```java
69+
private void handle(Map<K, CompletableFuture<V>> requests) {
70+
try {
71+
var results = mappingFunction.apply(requests.keySet());
72+
requests.forEach((key, result) -> result.complete(results.get(key)));
73+
} catch (Throwable t) {
74+
requests.forEach((key, result) -> result.completeExceptionally(t));
75+
}
76+
}
77+
```
78+
79+
### Async coalescing
80+
The previous logic can be streamlined if all loads should be collected into batches. This approach
81+
is most suitable for an `AsyncLoadingCache` since it does not block any other map operations while
82+
an entry is being loaded.
83+
84+
```java
85+
public final class CoalescingBulkLoader<K, V> implements AsyncCacheLoader<K, V> {
86+
private final Function<Set<K>, Map<K, V>> mappingFunction;
87+
private final Sinks.Many<Request<K, V>> sink;
88+
89+
public CoalescingBulkLoader(int maxSize, Duration maxTime, int parallelism,
90+
Function<Set<K>, Map<K, V>> mappingFunction) {
91+
this.sink = Sinks.many().unicast().onBackpressureBuffer();
92+
this.mappingFunction = requireNonNull(mappingFunction);
93+
sink.asFlux()
94+
.bufferTimeout(maxSize, maxTime)
95+
.map(requests -> requests.stream().collect(
96+
toMap(Entry::getKey, Entry::getValue)))
97+
.parallel(parallelism)
98+
.runOn(Schedulers.boundedElastic())
99+
.subscribe(this::handle);
100+
}
101+
102+
@Override public synchronized CompletableFuture<V> asyncLoad(K key, Executor e) {
103+
var entry = Map.entry(key, new CompletableFuture<V>());
104+
sink.tryEmitNext(entry).orThrow();
105+
return entry.getValue();
106+
}
107+
108+
private void handle(Map<K, CompletableFuture<V>> requests) {
109+
try {
110+
var results = mappingFunction.apply(requests.keySet());
111+
requests.forEach((key, result) -> result.complete(results.get(key)));
112+
} catch (Throwable t) {
113+
requests.forEach((key, result) -> result.completeExceptionally(t));
114+
}
115+
}
116+
}
117+
```
118+
119+
[reactor]: https://projectreactor.io
120+
[bufferTimeout]: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#bufferTimeout-int-java.time.Duration-
121+
[sink]: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Sinks.html
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
plugins {
2+
`java-library`
3+
alias(libs.plugins.versions)
4+
}
5+
6+
dependencies {
7+
implementation(libs.caffeine)
8+
implementation(libs.reactor)
9+
10+
testImplementation(libs.junit)
11+
testImplementation(libs.truth)
12+
}
13+
14+
testing.suites {
15+
val test by getting(JvmTestSuite::class) {
16+
useJUnitJupiter()
17+
}
18+
}
19+
20+
java.toolchain.languageVersion = JavaLanguageVersion.of(
21+
System.getenv("JAVA_VERSION")?.toIntOrNull() ?: 11)
22+
23+
tasks.withType<JavaCompile>().configureEach {
24+
javaCompiler = javaToolchains.compilerFor {
25+
languageVersion = java.toolchain.languageVersion
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[versions]
2+
caffeine = "3.1.7"
3+
junit = "5.10.0"
4+
reactor = "3.5.8"
5+
truth = "1.1.5"
6+
versions = "0.47.0"
7+
8+
[libraries]
9+
caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" }
10+
junit = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit" }
11+
reactor = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
12+
truth = { module = "com.google.truth:truth", version.ref = "truth" }
13+
14+
[plugins]
15+
versions = { id = "com.github.ben-manes.versions", version.ref = "versions" }
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
distributionBase=GRADLE_USER_HOME
2+
distributionPath=wrapper/dists
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-rc-3-bin.zip
4+
networkTimeout=10000
5+
validateDistributionUrl=true
6+
zipStoreBase=GRADLE_USER_HOME
7+
zipStorePath=wrapper/dists

0 commit comments

Comments
 (0)