Skip to content

Commit 01eb404

Browse files
committed
Reworked serialization api
1 parent 03edf19 commit 01eb404

27 files changed

+2282
-3447
lines changed

.classpath

-3
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
<classpathentry kind="lib" path="libs/slf4j-api-1.7.10.jar"/>
1919
<classpathentry kind="lib" path="libs/trove-3.1a1.jar" sourcepath="/Users/cogmission/Resources/trove/3.1a1/trove-3.1a1-src.jar"/>
2020
<classpathentry kind="lib" path="libs/java-util-1.19.3.jar" sourcepath="/Users/cogmission/.m2/repository/com/cedarsoftware/java-util/1.19.3/java-util-1.19.3-sources.jar"/>
21-
<classpathentry kind="lib" path="libs/kryo-shaded-3.0.3.jar" sourcepath="/Users/cogmission/.m2/repository/com/esotericsoftware/kryo-shaded/3.0.3/kryo-shaded-3.0.3-sources.jar"/>
22-
<classpathentry kind="lib" path="libs/kryo-serializers-0.37.jar" sourcepath="/Users/cogmission/.m2/repository/de/javakaffee/kryo-serializers/0.37/kryo-serializers-0.37-sources.jar"/>
2321
<classpathentry kind="lib" path="libs/objenesis-2.1.jar"/>
24-
<classpathentry kind="lib" path="libs/minlog-1.3-SNAPHOT.jar"/>
2522
<classpathentry combineaccessrules="false" kind="src" path="/fast-serialization"/>
2623
<classpathentry kind="output" path="bin"/>
2724
</classpath>

Changelog.txt

-2
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,3 @@ Changes for Serialization branch (notes for CHANGELOG.md)
1919
11. Added NetworkSerializer interface (and NetworkSerializerImpl) which executes FST serialization
2020
and simultaneously implements the Kryo interface also.
2121
12. Added the SerialConfig class which contains config information for the NetworkSerializer.
22-
13. Added Network.Scheme enum which has two values { FST, KRYO }, to designate underlying
23-
serialization scheme (Kryo still delegates to FST but implements Kryo Serializer interface)

build.gradle

-3
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ dependencies {
4949
compile group: 'org.slf4j', name: 'slf4j-api', version:'1.7.10'
5050
compile group: 'io.reactivex', name: 'rxjava', version: '1.0.10'
5151
compile group: 'de.ruedigermoeller', name: 'fst', version: '2.45'
52-
compile group: 'com.esotericsoftware', name: 'kryo-shaded', version: '3.0.3'
53-
compile group: 'com.esotericsoftware', name: 'minlog', version: '1.3-SNAPSHOT'
54-
compile group: 'de.javakaffee', name: 'kryo-serializers', version: '0.37'
5552
compile group: 'com.cedarsoftware', name: 'java-util', version: '1.19.3'
5653
testCompile group: 'junit', name: 'junit', version:'4.11'
5754
testCompile group: 'ch.qos.logback', name: 'logback-classic', version:'1.1.2'

libs/kryo-serializers-0.37.jar

-81.5 KB
Binary file not shown.

libs/kryo-shaded-3.0.3.jar

-350 KB
Binary file not shown.

libs/minlog-1.3-SNAPHOT.jar

-5.52 KB
Binary file not shown.

pom.xml

+2-17
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
<groupId>com.chaschev</groupId>
5050
<artifactId>chutils</artifactId>
5151
<version>1.4</version>
52-
<!--We're only interested in ASCIITable from this library, so exclude all
53-
transitive dependencies -->
52+
<!--We're only interested in ASCIITable from this library, so exclude
53+
all transitive dependencies -->
5454
<exclusions>
5555
<exclusion>
5656
<groupId>org.mockito</groupId>
@@ -128,21 +128,6 @@
128128
<version>2.45</version>
129129
</dependency>
130130
<dependency>
131-
<groupId>com.esotericsoftware</groupId>
132-
<artifactId>kryo-shaded</artifactId>
133-
<version>3.0.3</version>
134-
</dependency>
135-
<dependency>
136-
<groupId>com.esotericsoftware</groupId>
137-
<artifactId>minlog</artifactId>
138-
<version>1.3-SNAPHOT</version>
139-
</dependency>
140-
<dependency>
141-
<groupId>de.javakaffee</groupId>
142-
<artifactId>kryo-serializers</artifactId>
143-
<version>0.37</version>
144-
</dependency>
145-
<dependency>
146131
<groupId>org.openjdk.jmh</groupId>
147132
<artifactId>jmh-core</artifactId>
148133
<version>1.11.3</version>

src/main/java/org/numenta/nupic/algorithms/TemporalMemory.java

+3
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ public void burstColumns(Connections c, ComputeCycle cycle,
204204
activeColumns.removeAll(predictedColumns);
205205

206206
for(Column column : activeColumns) {
207+
if(column == null || column.getCells() == null) {
208+
System.out.println("here");
209+
}
207210
List<Cell> cells = column.getCells();
208211
cycle.activeCells.addAll(cells);
209212

src/main/java/org/numenta/nupic/network/CheckPointer.java renamed to src/main/java/org/numenta/nupic/network/CheckPointOp.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,33 @@
2828
* <p>
2929
* Executes check point behavior through the {@link #checkPoint(Observer)} method. The
3030
* checkPoint() method adds the specified {@link rx.Observer} to the list of those
31-
* observers notified following a check point operation. The notification consists of
31+
* observers notified following a check point operation. This "subscribe" action invokes
32+
* the underlying check point operation and returns a notification. The notification consists of
3233
* a byte[] containing the serialized {@link Network}.
3334
* </p><p>
3435
* <b>Typical usage is as follows:</b>
3536
* <pre>
36-
* {@link Network} network = ...
37+
* {@link Persistence} p = Persistence.get();
3738
*
38-
* network.checkPointer().checkPoint(new Observer<byte[]>() {
39+
* p.checkPointOp().checkPoint(new Observer<byte[]>() {
3940
* public void onCompleted() {}
4041
* public void onError(Throwable e) { e.printStackTrace(); }
4142
* public void onNext(byte[] bytes) {
4243
* // Do work here, use serialized Network byte[] here if desired...
4344
* }
4445
* });
4546
*
46-
* Merely by adding this Observer, the Network knows to check point after completion of
47+
* Again, by subscribing to this CheckPointOp, the Network knows to check point after completion of
4748
* the current compute cycle (it checks the List of Observers to see if it's non-empty).
4849
* Then after it notifies all current observers, it clears the list prior to the next
49-
* following compute cycle. see {@link PersistenceAPI} for a more detailed discussion...
50+
* following compute cycle. see {@link PAPI} for a more detailed discussion...
5051
*
5152
* @author cogmission
5253
*
53-
* @param <T> a {@link Network}
54+
* @param <T> the notification return type
5455
*/
5556
@FunctionalInterface
56-
public interface CheckPointer<T> {
57+
public interface CheckPointOp<T> {
5758
/**
5859
* Registers the Observer for a single notification following the checkPoint
5960
* operation. The user will be notified with the byte[] of the {@link Network}

src/main/java/org/numenta/nupic/network/KryoSerializer.java

-96
This file was deleted.

src/main/java/org/numenta/nupic/network/Layer.java

+44-29
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,8 @@ public class Layer<T> implements Persistable {
226226
private Layer<Inference> previous;
227227

228228
private transient List<Observer<Inference>> observers = new ArrayList<Observer<Inference>>();
229-
private transient CheckPointer<byte[]> checkPointer;
230-
private transient List<Observer<byte[]>> checkPointObservers = new ArrayList<>();
229+
private transient CheckPointOperator<?> checkPointOp;
230+
private transient List<Observer<byte[]>> checkPointOpObservers = new ArrayList<>();
231231

232232

233233

@@ -325,7 +325,7 @@ public Layer<T> postDeSerialize() {
325325
factory = new FunctionFactory();
326326
factory.inference = old.inference.postSerialize(old.inference);
327327

328-
checkPointObservers = new ArrayList<>();
328+
checkPointOpObservers = new ArrayList<>();
329329

330330
if(sensor != null) {
331331
sensor.setLocalParameters(params);
@@ -404,7 +404,18 @@ public Layer(Parameters params, MultiEncoder e, SpatialPooler sp, TemporalMemory
404404
(anomalyComputer == null ? "" : "Anomaly"));
405405
}
406406
}
407-
407+
408+
/**
409+
* USED INTERNALLY, DO NOT CALL.
410+
* @return
411+
*/
412+
public CheckPointOp<byte[]> delegateCheckPointCall() {
413+
if(parentNetwork != null) {
414+
return parentNetwork.getCheckPointOperator();
415+
}
416+
return null;
417+
}
418+
408419
/**
409420
* Sets the parent region which contains this {@code Layer}
410421
*
@@ -1426,9 +1437,9 @@ public int getMostProbableBucketIndex(String field, int step) {
14261437
return c.getMostProbableBucketIndex(step);
14271438
}
14281439

1429-
// ////////////////////////////////////////////////////////////
1430-
// PRIVATE METHODS AND CLASSES BELOW HERE //
1431-
// ////////////////////////////////////////////////////////////
1440+
//////////////////////////////////////////////////////////////
1441+
// PRIVATE METHODS AND CLASSES BELOW HERE //
1442+
//////////////////////////////////////////////////////////////
14321443
/**
14331444
* Notify all subscribers through the delegate that stream processing has
14341445
* been completed or halted.
@@ -1639,7 +1650,7 @@ private Observable<ManualInput> resolveObservableSequence(T t) {
16391650
}
16401651

16411652
sequenceStart = sequenceStart.filter(m -> {
1642-
if(!checkPointObservers.isEmpty() && parentNetwork != null) {
1653+
if(!checkPointOpObservers.isEmpty() && parentNetwork != null) {
16431654
// Execute check point logic
16441655
doCheckPoint();
16451656
}
@@ -1656,21 +1667,21 @@ private Observable<ManualInput> resolveObservableSequence(T t) {
16561667
* Observers; then clears the list of Observers.
16571668
*/
16581669
private void doCheckPoint() {
1659-
byte[] bytes = parentNetwork.internalCheckPoint();
1670+
byte[] bytes = parentNetwork.internalCheckPointOp();
16601671

16611672
if(bytes != null) {
16621673
LOGGER.debug("Layer [" + getName() + "] checkPointed file: " +
1663-
Network.serializer(null, false).getLastCheckPointFileName() + ", hc = " + Network.serializer(null, false).hashCode());
1674+
Persistence.get().getLastCheckPointFileName());
16641675
}else{
16651676
LOGGER.debug("Layer [" + getName() + "] checkPoint F A I L E D at: " + (new DateTime()));
16661677
}
16671678

1668-
for(Observer<byte[]> o : checkPointObservers) {
1679+
for(Observer<byte[]> o : checkPointOpObservers) {
16691680
o.onNext(bytes);
16701681
o.onCompleted();
16711682
}
16721683

1673-
checkPointObservers.clear();
1684+
checkPointOpObservers.clear();
16741685
}
16751686

16761687
/**
@@ -2005,25 +2016,29 @@ public void run() {
20052016
}
20062017

20072018
/**
2008-
* Returns the pre-built subscribe function used to add subscribers (callers of
2009-
* {@link #checkPointer()}) to the check point observer notifications.
2010-
*
2011-
* @return the internal Observable used for post check point notifications.
2019+
* Returns an {@link rx.Observable} operator that when subscribed to, invokes an operation
2020+
* that stores the state of this {@code Network} while keeping the Network up and running.
2021+
* The Network will be stored at the pre-configured location (in binary form only, not JSON).
2022+
*
2023+
* @param network the {@link Network} to check point.
2024+
* @return the {@link CheckPointOp} operator
20122025
*/
2013-
CheckPointer<byte[]> checkPointer() {
2014-
if(checkPointer == null) {
2015-
checkPointer = new CheckPointerImpl<byte[]>(Layer.this);
2026+
@SuppressWarnings("unchecked")
2027+
CheckPointOp<byte[]> getCheckPointOperator() {
2028+
if(checkPointOp == null) {
2029+
checkPointOp = new CheckPointOperator<byte[]>(Layer.this);
20162030
}
2017-
return checkPointer;
2031+
return (CheckPointOp<byte[]>)checkPointOp;
20182032
}
20192033

20202034

20212035
//////////////////////////////////////////////////////////////
20222036
// Inner Class Definition for CheckPointer (Observable) //
20232037
//////////////////////////////////////////////////////////////
2038+
20242039
/**
20252040
* <p>
2026-
* Implementation of the CheckPointer interface which serves to checkpoint
2041+
* Implementation of the CheckPointOp interface which serves to checkpoint
20272042
* and register a listener at the same time. The {@link rx.Observer} will be
20282043
* notified with the byte array of the {@link Network} being serialized.
20292044
* </p><p>
@@ -2032,18 +2047,18 @@ CheckPointer<byte[]> checkPointer() {
20322047
* be executed.
20332048
* </p>
20342049
*
2035-
* @param <T> {@link rx.Observer}
2050+
* @param <T> {@link rx.Observer}'s return type
20362051
*/
2037-
static class CheckPointerImpl<T> extends Observable<T> implements CheckPointer<T> {
2038-
private CheckPointerImpl(Layer<?> l) {
2052+
static class CheckPointOperator<T> extends Observable<T> implements CheckPointOp<T> {
2053+
private CheckPointOperator(Layer<?> l) {
20392054
this(new Observable.OnSubscribe<T>() {
2040-
@SuppressWarnings("unchecked")
2041-
@Override public void call(Subscriber<? super T> t) {
2055+
@SuppressWarnings({ "unchecked" })
2056+
@Override public void call(Subscriber<? super T> r) {
20422057
if(l.LAYER_THREAD != null) {
20432058
// The layer thread automatically tests for the list of observers to
20442059
// contain > 0 elements, which indicates a check point operation should
20452060
// be executed.
2046-
l.checkPointObservers.add((Observer<byte[]>)t);
2061+
l.checkPointOpObservers.add((Observer<byte[]>)r);
20472062
}else{
20482063
l.doCheckPoint();
20492064
}
@@ -2052,10 +2067,10 @@ private CheckPointerImpl(Layer<?> l) {
20522067
}
20532068

20542069
/**
2055-
* Constructs this {@code CheckPointerImpl}
2070+
* Constructs this {@code CheckPointOperator}
20562071
* @param f a subscriber function
20572072
*/
2058-
protected CheckPointerImpl(rx.Observable.OnSubscribe<T> f) {
2073+
protected CheckPointOperator(rx.Observable.OnSubscribe<T> f) {
20592074
super(f);
20602075
}
20612076

0 commit comments

Comments
 (0)