Skip to content

Added statistics support #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 20, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions src/main/java/org/dataloader/DataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.dataloader;

import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.stats.Statistics;
import org.dataloader.stats.StatisticsCollector;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -64,6 +67,7 @@ public class DataLoader<K, V> {
private final DataLoaderOptions loaderOptions;
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final Map<K, CompletableFuture<V>> loaderQueue;
private final StatisticsCollector stats;

/**
* Creates new DataLoader with the specified batch loader function and default options
Expand Down Expand Up @@ -153,6 +157,7 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
this.futureCache = determineCacheMap(loaderOptions);
// order of keys matter in data loader
this.loaderQueue = new LinkedHashMap<>();
this.stats = nonNull(this.loaderOptions.getStatisticsCollector());
}

@SuppressWarnings("unchecked")
Expand All @@ -173,8 +178,11 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
*/
public CompletableFuture<V> load(K key) {
Object cacheKey = getCacheKey(nonNull(key));
stats.incrementLoadCount();

synchronized (futureCache) {
if (loaderOptions.cachingEnabled() && futureCache.containsKey(cacheKey)) {
stats.incrementCacheHitCount();
return futureCache.get(cacheKey);
}
}
Expand All @@ -185,6 +193,7 @@ public CompletableFuture<V> load(K key) {
loaderQueue.put(key, future);
}
} else {
stats.incrementBatchLoadCountBy(1);
// immediate execution of batch function
CompletableFuture<List<V>> batchedLoad = batchLoadFunction
.load(singletonList(key))
Expand Down Expand Up @@ -291,7 +300,14 @@ private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<

@SuppressWarnings("unchecked")
private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<CompletableFuture<V>> queuedFutures) {
return batchLoadFunction.load(keys)
stats.incrementBatchLoadCountBy(keys.size());
CompletionStage<List<V>> batchLoad;
try {
batchLoad = nonNull(batchLoadFunction.load(keys), "Your batch loader function MUST return a non null CompletionStage promise");
} catch (Exception e) {
batchLoad = CompletableFutureKit.failedFuture(e);
}
return batchLoad
.toCompletableFuture()
.thenApply(values -> {
assertState(keys.size() == values.size(), "The size of the promised values MUST be the same size as the key list");
Expand All @@ -300,20 +316,28 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Complet
Object value = values.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
if (value instanceof Throwable) {
stats.incrementLoadErrorCount();
future.completeExceptionally((Throwable) value);
// we don't clear the cached view of this entry to avoid
// frequently loading the same error
} else if (value instanceof Try) {
// we allow the batch loader to return a Try so we can better represent a computation
// that might have worked or not.
handleTry((Try<V>) value, future);
Try<V> tryValue = (Try<V>) value;
if (tryValue.isSuccess()) {
future.complete(tryValue.get());
} else {
stats.incrementLoadErrorCount();
future.completeExceptionally(tryValue.getThrowable());
}
} else {
V val = (V) value;
future.complete(val);
}
}
return values;
}).exceptionally(ex -> {
stats.incrementBatchLoadExceptionCount();
for (int idx = 0; idx < queuedFutures.size(); idx++) {
K key = keys.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
Expand All @@ -325,14 +349,6 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Complet
});
}

private void handleTry(Try<V> vTry, CompletableFuture<V> future) {
if (vTry.isSuccess()) {
future.complete(vTry.get());
} else {
future.completeExceptionally(vTry.getThrowable());
}
}

/**
* Normally {@link #dispatch()} is an asynchronous operation but this version will 'join' on the
* results if dispatch and wait for them to complete. If the {@link CompletableFuture} callbacks make more
Expand Down Expand Up @@ -441,4 +457,15 @@ public Object getCacheKey(K key) {
return loaderOptions.cacheKeyFunction().isPresent() ?
loaderOptions.cacheKeyFunction().get().getKey(key) : key;
}

/**
* Gets the statistics associated with this data loader. These will have been gather via
* the {@link org.dataloader.stats.StatisticsCollector} passed in via {@link DataLoaderOptions#getStatisticsCollector()}
*
* @return statistics for this data loader
*/
public Statistics getStatistics() {
return stats.getStatistics();
}

}
34 changes: 32 additions & 2 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.dataloader;

import org.dataloader.impl.Assertions;
import org.dataloader.stats.SimpleStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;

import java.util.Optional;
import java.util.function.Supplier;

import static org.dataloader.impl.Assertions.nonNull;

/**
* Configuration options for {@link DataLoader} instances.
Expand All @@ -32,6 +36,7 @@ public class DataLoaderOptions {
private CacheKey cacheKeyFunction;
private CacheMap cacheMap;
private int maxBatchSize;
private Supplier<StatisticsCollector> statisticsCollector;

/**
* Creates a new data loader options with default settings.
Expand All @@ -40,6 +45,7 @@ public DataLoaderOptions() {
batchingEnabled = true;
cachingEnabled = true;
maxBatchSize = -1;
statisticsCollector = SimpleStatisticsCollector::new;
}

/**
Expand All @@ -48,12 +54,13 @@ public DataLoaderOptions() {
* @param other the other options instance
*/
public DataLoaderOptions(DataLoaderOptions other) {
Assertions.nonNull(other);
nonNull(other);
this.batchingEnabled = other.batchingEnabled;
this.cachingEnabled = other.cachingEnabled;
this.cacheKeyFunction = other.cacheKeyFunction;
this.cacheMap = other.cacheMap;
this.maxBatchSize = other.maxBatchSize;
this.statisticsCollector = other.statisticsCollector;
}

/**
Expand Down Expand Up @@ -173,4 +180,27 @@ public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

/**
* @return the statistics collector to use with these options
*/
public StatisticsCollector getStatisticsCollector() {
return nonNull(this.statisticsCollector.get());
}

/**
* Sets the statistics collector supplier that will be used with these data loader options. Since it uses
* the supplier pattern, you can create a new statistics collector on each call or you can reuse
* a common value
*
* @param statisticsCollector the statistics collector to use
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setStatisticsCollector(Supplier<StatisticsCollector> statisticsCollector) {
this.statisticsCollector = nonNull(statisticsCollector);
return this;
}


}
15 changes: 15 additions & 0 deletions src/main/java/org/dataloader/DataLoaderRegistry.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.dataloader;

import org.dataloader.stats.Statistics;
import org.dataloader.stats.StatisticsImpl;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -91,4 +94,16 @@ public Set<String> getKeys() {
public void dispatchAll() {
getDataLoaders().forEach(DataLoader::dispatch);
}

/**
* @return a combined set of statistics for all data loaders in this registry presented
* as the sum of all their statistics
*/
public Statistics getStatistics() {
Statistics stats = new StatisticsImpl();
for (DataLoader<?, ?> dataLoader : dataLoaders.values()) {
stats = stats.combine(dataLoader.getStatistics());
}
return stats;
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/dataloader/impl/Assertions.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public static <T> T nonNull(T t) {
return Objects.requireNonNull(t, "nonNull object required");
}

public static <T> T nonNull(T t, String message) {
return Objects.requireNonNull(t, message);
}

private static class AssertionException extends IllegalStateException {
public AssertionException(String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.dataloader.stats;

import static org.dataloader.impl.Assertions.nonNull;

/**
* This statistics collector keeps dataloader statistics AND also calls the delegate
* collector at the same time. This allows you to keep a specific set of statistics
* and also delegate the calls onto another collector.
*/
public class DelegatingStatisticsCollector implements StatisticsCollector {

private final StatisticsCollector collector = new SimpleStatisticsCollector();
private final StatisticsCollector delegateCollector;

/**
* @param delegateCollector a non null delegate collector
*/
public DelegatingStatisticsCollector(StatisticsCollector delegateCollector) {
this.delegateCollector = nonNull(delegateCollector);
}

@Override
public long incrementLoadCount() {
delegateCollector.incrementLoadCount();
return collector.incrementLoadCount();
}

@Override
public long incrementBatchLoadCountBy(long delta) {
delegateCollector.incrementBatchLoadCountBy(delta);
return collector.incrementBatchLoadCountBy(delta);
}

@Override
public long incrementCacheHitCount() {
delegateCollector.incrementCacheHitCount();
return collector.incrementCacheHitCount();
}

@Override
public long incrementLoadErrorCount() {
delegateCollector.incrementLoadErrorCount();
return collector.incrementLoadErrorCount();
}

@Override
public long incrementBatchLoadExceptionCount() {
delegateCollector.incrementBatchLoadExceptionCount();
return collector.incrementBatchLoadExceptionCount();
}

/**
* @return the statistics of the this collector (and not its delegate)
*/
@Override
public Statistics getStatistics() {
return collector.getStatistics();
}

/**
* @return the statistics of the delegate
*/
public Statistics getDelegateStatistics() {
return delegateCollector.getStatistics();
}

}
39 changes: 39 additions & 0 deletions src/main/java/org/dataloader/stats/NoOpStatisticsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.dataloader.stats;

/**
* A statistics collector that does nothing
*/
public class NoOpStatisticsCollector implements StatisticsCollector {

private static final StatisticsImpl ZERO_STATS = new StatisticsImpl();

@Override
public long incrementLoadCount() {
return 0;
}

@Override
public long incrementLoadErrorCount() {
return 0;
}

@Override
public long incrementBatchLoadCountBy(long delta) {
return 0;
}

@Override
public long incrementBatchLoadExceptionCount() {
return 0;
}

@Override
public long incrementCacheHitCount() {
return 0;
}

@Override
public Statistics getStatistics() {
return ZERO_STATS;
}
}
Loading