Skip to content

Commit 095e323

Browse files
committed
#198: revamp
1 parent fabf880 commit 095e323

File tree

7 files changed

+94
-120
lines changed

7 files changed

+94
-120
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.simplejavamail.internal.batchsupport;
2+
3+
class BatchException extends RuntimeException {
4+
5+
static final String ERROR_ACQUIRING_KEYED_POOLABLE = "Was unable to obtain a poolable object for key:\t\n%s";
6+
7+
BatchException(final String msg, final Throwable cause) {
8+
super(msg, cause);
9+
}
10+
}

modules/batch-module/src/main/java/org/simplejavamail/internal/batchsupport/BatchSupport.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import org.simplejavamail.internal.batchsupport.concurrent.NonJvmBlockingThreadPoolExecutor;
66
import org.simplejavamail.internal.batchsupport.transportpool.LifecycleDelegatingTransportImpl;
77
import org.simplejavamail.internal.batchsupport.transportpool.PoolableTransportAllocatorFactory;
8-
import org.simplejavamail.internal.batchsupport.transportpool.keyedcloseablepools.KeyedObjectPools;
8+
import org.simplejavamail.internal.batchsupport.transportpool.keyedcloseablepools.KeyedCyclingObjectPools;
99
import org.simplejavamail.internal.batchsupport.transportpool.keyedcloseablepools.SimpleDelegatingPoolable;
1010
import org.simplejavamail.internal.modules.BatchModule;
1111
import org.slf4j.Logger;
@@ -19,7 +19,9 @@
1919
import java.util.concurrent.ExecutorService;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import static java.lang.String.format;
2223
import static java.util.concurrent.TimeUnit.SECONDS;
24+
import static org.simplejavamail.internal.batchsupport.BatchException.ERROR_ACQUIRING_KEYED_POOLABLE;
2325

2426
/**
2527
* This class only serves to hide the Batch implementation behind an easy-to-load-with-reflection class.
@@ -31,8 +33,8 @@ public class BatchSupport implements BatchModule {
3133
private static final Timeout WAIT_FOREVER = new Timeout(Long.MAX_VALUE, TimeUnit.DAYS);
3234
private static final TimeExpiration<SimpleDelegatingPoolable<Transport>> TIME_TO_LIVE_5_SECONDS = new TimeExpiration<>(5, SECONDS);
3335

34-
private final KeyedObjectPools<Session, SimpleDelegatingPoolable<Transport>> transportPools =
35-
new KeyedObjectPools<>(new PoolableTransportAllocatorFactory(), TIME_TO_LIVE_5_SECONDS, WAIT_FOREVER);
36+
private final KeyedCyclingObjectPools<Session, SimpleDelegatingPoolable<Transport>> transportPools =
37+
new KeyedCyclingObjectPools<>(new PoolableTransportAllocatorFactory(), TIME_TO_LIVE_5_SECONDS, WAIT_FOREVER);
3638

3739
/**
3840
* @see BatchModule#executeAsync(String, Runnable)
@@ -68,7 +70,11 @@ public ExecutorService createDefaultExecutorService(final int threadPoolSize, fi
6870
@Override
6971
@SuppressWarnings("deprecation")
7072
public LifecycleDelegatingTransport acquireTransport(@Nonnull final Session session) {
71-
return new LifecycleDelegatingTransportImpl(transportPools.acquire(session));
73+
try {
74+
return new LifecycleDelegatingTransportImpl(transportPools.acquire(session));
75+
} catch (InterruptedException e) {
76+
throw new BatchException(format(ERROR_ACQUIRING_KEYED_POOLABLE, session), e);
77+
}
7278
}
7379

7480
/**

modules/batch-module/src/main/java/org/simplejavamail/internal/batchsupport/transportpool/keyedcloseablepools/AcquireKeyedPoolableException.java

-14
This file was deleted.

modules/batch-module/src/main/java/org/simplejavamail/internal/batchsupport/transportpool/keyedcloseablepools/CloseablePool.java

-31
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.simplejavamail.internal.batchsupport.transportpool.keyedcloseablepools;
2+
3+
import stormpot.BlazePool;
4+
import stormpot.Config;
5+
import stormpot.Expiration;
6+
import stormpot.LifecycledPool;
7+
import stormpot.Poolable;
8+
import stormpot.Timeout;
9+
10+
import javax.annotation.Nonnull;
11+
import java.util.LinkedList;
12+
import java.util.Queue;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
15+
import static java.util.Collections.singletonList;
16+
17+
/**
18+
* <ol>
19+
* <li>Keyed: because you govern object pools based on keys, like a cache on steroids</li>
20+
* <li>Cycling: because you can cycle multiple pools per key round robin</li>
21+
* </ol>
22+
* Example usage:
23+
* <p>
24+
* Say you have two different mail clusters, each with several servers. The keys map to the clusters, where each server
25+
* is accessed round robin, and the objects in the respective cluster-pool are multiple connections to the same mail server.
26+
* <p>
27+
* FIXME implement key properly and then document
28+
*/
29+
public class KeyedCyclingObjectPools<Key, T extends Poolable> {
30+
31+
private final ConcurrentHashMap<Key, Queue<LifecycledPool<T>>> keyedPool = new ConcurrentHashMap<>();
32+
33+
@Nonnull private final AllocatorFactory<Key, T> allocatorFactory;
34+
@Nonnull private final Expiration<T> expirationPolicy;
35+
@Nonnull private final Timeout claimTimeout;
36+
37+
public KeyedCyclingObjectPools(
38+
@Nonnull final AllocatorFactory<Key, T> allocatorFactory,
39+
@Nonnull final Expiration<T> expirationPolicy,
40+
@Nonnull final Timeout claimTimeout) {
41+
this.allocatorFactory = allocatorFactory;
42+
this.expirationPolicy = expirationPolicy;
43+
this.claimTimeout = claimTimeout;
44+
}
45+
46+
public T acquire(final Key key) throws InterruptedException {
47+
return findPool(key).claim(claimTimeout);
48+
}
49+
50+
/**
51+
* Clearing a pool is like shutting down a pool, but semantically speaking, a cleared pool can be restarted again.
52+
*/
53+
public synchronized void clearPool(final Key key) {
54+
if (keyedPool.containsKey(key)) {
55+
for (final LifecycledPool<T> poolInCluster : keyedPool.remove(key)) {
56+
poolInCluster.shutdown();
57+
}
58+
}
59+
}
60+
61+
private synchronized LifecycledPool<T> findPool(final Key key) {
62+
if (!keyedPool.containsKey(key)) {
63+
LifecycledPool<T> pool = new BlazePool<>(new Config<T>()
64+
.setExpiration(expirationPolicy)
65+
// FIXME how to create a cluster?
66+
.setAllocator(allocatorFactory.create(key)));
67+
keyedPool.put(key, new LinkedList<>(singletonList(pool)));
68+
}
69+
Queue<LifecycledPool<T>> lifecycledPools = keyedPool.get(key);
70+
LifecycledPool<T> nextRoundrobinInCluster = lifecycledPools.remove();
71+
lifecycledPools.add(nextRoundrobinInCluster);
72+
return nextRoundrobinInCluster;
73+
}
74+
}

modules/batch-module/src/main/java/org/simplejavamail/internal/batchsupport/transportpool/keyedcloseablepools/KeyedObjectPools.java

-51
This file was deleted.

modules/batch-module/src/main/java/org/simplejavamail/internal/batchsupport/transportpool/keyedcloseablepools/KillSwitchExpiration.java

-20
This file was deleted.

0 commit comments

Comments
 (0)