-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Stateful Prediction engine for time series. #1727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -19,6 +19,9 @@ public sealed class CompositeRowToRowMapper : IRowToRowMapper | |||
public Schema InputSchema { get; } | |||
public Schema Schema { get; } | |||
|
|||
[BestFriend] | |||
internal IRowToRowMapper[] InnerMappers => _innerMappers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InnerMappers [](start = 35, length = 12)
Just to re-iterate, get rid of _innerMappers
and replace with autoimplemented property, e.g., internal IRowToRowMapper[] InnerMappers { get; }
. #Closed
@@ -36,20 +36,33 @@ public enum TransformerScope | |||
Everything = Training | Testing | Scoring | |||
} | |||
|
|||
[BestFriend] | |||
internal interface ITransformerAccessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ITransformerAccessor [](start = 23, length = 20)
As we discussed earlier this is the sort of thing that really bears some explanation. Also maybe ITransformerChainAccessor
so it's clear that the limited scope of this becomes clear... #Closed
private readonly ITransformer[] _transformers; | ||
private readonly TransformerScope[] _scopes; | ||
[BestFriend] | ||
internal readonly ITransformer[] Transformers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internal readonly ITransformer[] Transformers; [](start = 8, length = 46)
Didn't you introduce the internal interface so you wouldn't have to expose these things? Please go back to using _transformers
, _scopes
... #Closed
@@ -13,19 +13,30 @@ namespace Microsoft.ML.Runtime.Internal.Utilities | |||
/// an item will result in discarding the least recently added item. | |||
/// </summary> | |||
[BestFriend] | |||
internal sealed class FixedSizeQueue<T> | |||
internal sealed class FixedSizeQueue<T> : ICloneable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ICloneable [](start = 46, length = 10)
Please never implement ICloneable
. Having a clone method is fine. Having it return an object you'll have to immediatley cast is not. I see no particular reason to use this interface. #Closed
{ | ||
private readonly T[] _array; | ||
private int _startIndex; | ||
private int _count; | ||
|
||
public int StartIndex => _startIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public int StartIndex => _startIndex; [](start = 8, length = 37)
Even though it's an internal class this doesn't mean that we should design it badly by exposing its guts. This makes it quite difficult to reason about.
As far as I can tell you just need this for serialization. What had happened if you'd just written the results of iterating from i
over range of 0
to q.Count
and writing q[i]
when serializing, and in the reverse case just read and inserted the result of that writing back into the queue? Still would have worked as far as I can tell, would it not? #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also need this to clone the state, which means I will need to know the start index, count and the content.
In reply to: 236475666 [](ancestors = 236475666)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state had to be cloned in any case before your change came along. Somehow that managed to work without doing this. Even if you somehow found something that requires it, I think the procedure I outlined above will work. If it won't work, please explain why.
Let's just be clear, I don't consider breaking something as basic as encapsulation for this structure to be acceptable. Please find another way.
In reply to: 236478429 [](ancestors = 236478429,236475666)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where TState : SequentialTransformerBase<TInput, TOutput, TState>.StateBase, new() | ||
{ | ||
/// <summary> | ||
/// The base class for encapsulating the State object for sequential processing. This class implements a windowed buffer. | ||
/// </summary> | ||
public abstract class StateBase | ||
public abstract class StateBase : ICanSaveModel, ICloneable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ICloneable [](start = 57, length = 10)
Please no, there's no particular reason for this ICloneable
. Even if you wanted this thing to have a public clone method, why would you insist that it be object
instead of something like StateBase
? #Closed
ctx.Writer.Write(WindowedBuffer.Capacity); | ||
ctx.Writer.Write(WindowedBuffer.StartIndex); | ||
ctx.Writer.WriteSingleArray(WindowedBuffer.Buffer); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, just to be clear what you'd do here once you get rid of those public accessors to the guts of the buffer, you'd write WindowedBuffer.Count
, then write each element of that array.
If I were you I'd write a little utility serializing/deserializing these windowed buffers. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about start index? This is a circular buffer right?
In reply to: 236476800 [](ancestors = 236476800)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, of course. And what do you think the difference between a circular buffer is with buffer {1,2,3}
and start index 0
, and one with a buffer {2, 3, 1}
and start index 2
? :)
In reply to: 236477016 [](ancestors = 236477016,236476800)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feeling a little stupid now...may be lack of sleep, thanks!
In reply to: 236481618 [](ancestors = 236481618,236477016,236476800)
public State(ModelLoadContext ctx) : base(ctx) | ||
{ | ||
WindowedBuffer = new FixedSizeQueue<float>( | ||
ctx.Reader.ReadInt32(), ctx.Reader.ReadInt32(), ctx.Reader.ReadSingleArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctx.Reader.ReadInt32(), [](start = 20, length = 23)
When you write the utility, please make sure to do proper validation of the data you're reading from disk. #Closed
private bool _isIniatilized; | ||
protected long PreviousPosition; | ||
|
||
public StateBase(ModelLoadContext ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public [](start = 12, length = 6)
public
, why? #Closed
where TState : SequentialTransformerBase<TInput, TOutput, TState>.StateBase, new() | ||
{ | ||
/// <summary> | ||
/// The base class for encapsulating the State object for sequential processing. This class implements a windowed buffer. | ||
/// </summary> | ||
public abstract class StateBase | ||
public abstract class StateBase : ICanSaveModel, ICloneable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ICanSaveModel [](start = 42, length = 13)
Please don't use ICanSaveModel
. At the risk of speaking tautologically, ICanSaveModel
is for objects that save themselves as models. You're not doing that; these things that contain these state objects are writing themselves as models, but this object is not -- it is just writing and saving itself to a binary loader. You're just using the BinaryWriter
and on the flipside using the BinaryReader
. You could just as easily have done this by having a private protected
or internal
virtual or abstract method on this class, with an internal
. And on the flipside you would have had a nice, hidden, internal or private protected constructor. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, unless there's some sort of "loadable class" at the other end that knows how to load this as a model, it shouldn't implement ICanSaveModel
. Go to every implementation of ICanSaveModel.Save
. You will see something like this at the head of it:
machinelearning/src/Microsoft.ML.Data/Prediction/Calibrator.cs
Lines 371 to 372 in 08947ef
ctx.CheckAtModel(); | |
ctx.SetVersionInfo(GetVersionInfo()); |
You will also see a corresponding entry, at the head of the file, something like this:
machinelearning/src/Microsoft.ML.Data/Prediction/Calibrator.cs
Lines 46 to 48 in 08947ef
[assembly: LoadableClass(typeof(PlattCalibrator), null, typeof(SignatureLoadModel), | |
"Platt Calibration Executor", | |
PlattCalibrator.LoaderSignature)] |
This is how we save and load models. Not just raw bitstreams, it's not for that, but real honest-to-God models.
In reply to: 236479112 [](ancestors = 236479112)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually something I observe in both these implementations of an interface... you picked two interfaces seemingly at random that "kinda sorta" did what you wanted, even though nothing at all relies (or should rely) on them implementing either interface, and just sort of wrote methods. When thinking "should this implement some random interface," it usually pays to have some sort of scenario in mind that actually requires that interface, which neither of these did.
In reply to: 236481050 [](ancestors = 236481050,236479112)
// AdaptiveSingularSpectrumSequenceModeler: _model | ||
|
||
base.Save(ctx); | ||
ctx.Writer.Write(SeasonalWindowSize); | ||
ctx.Writer.Write(DiscountFactor); | ||
ctx.Writer.Write((byte)ErrorFunction); | ||
ctx.Writer.Write(IsAdaptive); | ||
StateRef.Save(ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StateRef.Save(ctx); [](start = 12, length = 19)
Actually this doesn't need to even be an abstract method... every state instance can just implement a method (internally, of course), since we're never saving state in any sort of "general" fashion. #Closed
{ | ||
} | ||
|
||
public interface IStatefulTransformer : ITransformer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IStatefulTransformer [](start = 21, length = 20)
So, as far as I know all of these interfaces are used only in this one assembly. We don't want them used anywhere else. Why are they or their implementations public? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your observation is correct, they should be made internal.
In reply to: 236482471 [](ancestors = 236482471)
@@ -454,6 +454,7 @@ public AdaptiveSingularSpectrumSequenceModeler(IHostEnvironment env, ModelLoadCo | |||
_wTrans = new CpuAlignedMatrixRow(_rank, _windowSize, SseUtils.CbAlign); | |||
int i = 0; | |||
_wTrans.CopyFrom(tempArray, ref i); | |||
_y = new CpuAlignedVector(_rank, SseUtils.CbAlign); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this line coming from? Why is this change? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized we also need to save this vector to disk if the matrix is saved because they go hand in hand. I have made this change.
In reply to: 236520664 [](ancestors = 236520664)
var prediction = engine.Predict(new Data(1)); | ||
Assert.Equal(-1, prediction.Random); | ||
prediction = engine.Predict(new Data(2)); | ||
Assert.Equal(-1, prediction.Random); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didnt get how you check that observations are passing through timeseries transform #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debugger but we can update this test to check-point before and after, I have made this change in the below test.
In reply to: 236521589 [](ancestors = 236521589)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debugger? its supposed to be a unit test :) with no debugger attached, would it be possible to do this check in code?
In reply to: 236804680 [](ancestors = 236804680,236521589)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert.Equal(0, prediction.Change[0], precision: 7); // Alert | ||
Assert.Equal(-0.12883400917053223, prediction.Change[1], precision: 7); // Raw score | ||
Assert.Equal(0.5, prediction.Change[2], precision: 7); // P-Value score | ||
Assert.Equal(2.6214400000000113E-15, prediction.Change[3], precision: 7); // Martingale score |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you compare if these are the same results when no checkpoint is done? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea! This verifies check-pointing is working as it should! Thanks!!!
In reply to: 236522068 [](ancestors = 236522068)
{ | ||
var clone = (SsaSpikeDetector)MemberwiseClone(); | ||
clone.Model = clone.Model.Clone(); | ||
clone.StateRef = (State)clone.StateRef.Clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be atomic operation, so Model and StateRef are consistent with each other? Could it happen that after Model is cloned but StateRef is not yet, the original StateRef changes, so then cloning StateRef would break consistency of Model & StateRef couple? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, since cloning happens when a prediction engine is created and it clones from a transformer that will not mutate it's state. The prediction engine keeps a copy and it mutates it and nobody clones the transformer reference maintained by the prediction engine because I don't allow access to it.
In reply to: 236522741 [](ancestors = 236522741)
{ | ||
var clone = (SsaSpikeDetector)MemberwiseClone(); | ||
clone.Model = clone.Model.Clone(); | ||
clone.StateRef = (State)clone.StateRef.Clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have 1 to 1 correspondence between Model and State now? if so maybe you could make a State a part of a Model now? So State object is tracked inside the Model #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be we can merge State and Model in one but I will prefer doing this after part of bug-bash because its more of a cleanup and I'm short on time for check-in.
In reply to: 236525220 [](ancestors = 236525220)
…to tspredictionfunction
public double[] Change; | ||
#pragma warning restore CS0649 | ||
#pragma warning restore CS0649 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CS0649 [](start = 24, length = 18)
You decided to add a whole bunch of random spaces here, and one on line 24 for some reason. #Resolved
@@ -36,11 +36,22 @@ public enum TransformerScope | |||
Everything = Training | Testing | Scoring | |||
} | |||
|
|||
/// <summary> | |||
/// Used to determine if <see cref="ITransformer"/> object is of type <see cref="TransformerChain"/> | |||
/// so that it's inter fields can be accessed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's [](start = 16, length = 4)
Its "it's" ... no it's "its"! #Closed
@@ -36,11 +36,22 @@ public enum TransformerScope | |||
Everything = Training | Testing | Scoring | |||
} | |||
|
|||
/// <summary> | |||
/// Used to determine if <see cref="ITransformer"/> object is of type <see cref="TransformerChain"/> | |||
/// so that it's inter fields can be accessed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inter [](start = 21, length = 5)
internal #Closed
var q = new FixedSizeQueue<double>(reader.ReadInt32()); | ||
int count = reader.ReadInt32(); | ||
|
||
Contracts.Assert(0 <= count & count <= q.Capacity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert [](start = 22, length = 6)
You don't assert data you read from disk. You CheckDecode
it. #Closed
|
||
internal static FixedSizeQueue<double> DeserializeFixedSizeQueueDouble(BinaryReader reader) | ||
{ | ||
var q = new FixedSizeQueue<double>(reader.ReadInt32()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reader.ReadInt32() [](start = 47, length = 18)
Remember, check the data you get from disk to make sure it's sensible. In this case, I suppose, a positive value. #Closed
{ | ||
writer.Write(queue.Capacity); | ||
writer.Write(queue.Count); | ||
for (int index = 0; index < queue.Count; index++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure you have asserts that mirror your checks when serializing the data. #Closed
Hi @codemzs, looking OK. The only real blocking problem I see is you I guess forgot your deserialization checks and serialization asserts, but that should be easy. #Resolved |
…to tspredictionfunction
Thanks for reviewing, Tom! I have addressed all your comments. In reply to: 442163549 [](ancestors = 442163549) |
Action<long> GetPinger(); | ||
} | ||
|
||
public interface IStatefulRowMapper : IRowMapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IStatefulRowMapper [](start = 21, length = 18)
You forgot to make this one internal... #Resolved
|
||
} | ||
|
||
public sealed class Row : IStatefulRow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public [](start = 8, length = 6)
Public, why? Here and everywhere... #Resolved
|
||
IStatefulTransformer IStatefulTransformer.Clone() => Clone(); | ||
|
||
public sealed class SequentialDataTransform : TransformBase, ITransformTemplate, IRowToRowMapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public [](start = 8, length = 6)
public, why? #Resolved
@@ -288,25 +357,41 @@ public IRowToRowMapper GetRowToRowMapper(Schema inputSchema) | |||
throw new InvalidOperationException("Not a RowToRowMapper."); | |||
} | |||
|
|||
public sealed class SequentialDataTransform : TransformBase, ITransformTemplate | |||
public IRowToRowMapper GetStatefulRowToRowMapper(Schema inputSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetStatefulRowToRowMapper [](start = 31, length = 25)
You made the interfaces internal, but then forgot I think to make the interface implementations explicit. #Resolved
@@ -201,6 +210,38 @@ public sealed class State : AnomalyDetectionStateBase | |||
private SequenceModelerBase<Single, Single> _model; | |||
private SsaAnomalyDetectionBase _parentAnomalyDetector; | |||
|
|||
public State() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public State() [](start = 12, length = 14)
More public stuff...
So, do you see how I changed these things to be private protected
? And then we introduce not one, not two, but four additional needlessly public?
I have to admit this is a little frustrating. I spend a considerable amount of effort trying to keep the codebase as clean as I can, but I can't do it myself. #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be public since State object is instantiated in SsaAnomalyDetectionBase.cs line 127
In reply to: 236828808 [](ancestors = 236828808)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which is code in this assembly, so it can be internal. Not public.
In reply to: 236854833 [](ancestors = 236854833,236828808)
private bool _isIniatilized; | ||
protected long PreviousPosition; | ||
|
||
protected StateBase(ModelLoadContext ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected [](start = 12, length = 9)
protected
is better than publi
c. Private protected is better than protected.
Remember, we're trying to reduce the surface area of our API, and a protected member is still parse of the API surface area... #Resolved
Hi @codemzs. Only real major problem I see remaining is that you have made the public surface area of this assembly needlessly large, by introducing public methods everywhere, not having explicit implementations of interfaces but just having them be public methods, and so on, and so on. I do not quite have time to enumerate the perhaps dozens of places you've needlessly made things public, so please just review your code and try to catch all of them, thanks. #Resolved |
} | ||
|
||
_buffer = new FixedSizeQueue<Single>(_seriesLength); | ||
|
||
//_buffer = new FixedSizeQueue<Single>(_seriesLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//_buffer = new FixedSizeQueue(_seriesLength); [](start = 11, length = 55)
Remove? #Resolved
@@ -338,7 +339,7 @@ private AdaptiveSingularSpectrumSequenceModeler(AdaptiveSingularSpectrumSequence | |||
_shouldStablize = model._shouldStablize; | |||
_shouldMaintainInfo = model._shouldMaintainInfo; | |||
_info = model._info; | |||
_buffer = new FixedSizeQueue<Single>(_seriesLength); | |||
_buffer = model._buffer.Clone();//new FixedSizeQueue<Single>(_seriesLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//new FixedSizeQueue(_seriesLength); [](start = 44, length = 44)
Remove? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried my best to clean up but as we discussed a lot cannot be clean up because of the way this code is structured but I will want us to clean this soon by restructuring the code. In reply to: 442201431 [](ancestors = 442201431) |
…to tspredictionfunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @codemzs looks more or less good, if we could just lock down the public surface where we can a bit more and not use the model contexts for things not saving/loading models, that would be great!
Stateful prediction engine for time series that updates the time series model with new observations at the prediction (anomaly detection) phase and allows for checkpointing the updated model to file stream.
fixes #1219
Replaces #1618.