diff --git a/src/Microsoft.ML.Core/Data/IDataView.cs b/src/Microsoft.ML.Core/Data/IDataView.cs
index d1f57a6f55..51f6400dc0 100644
--- a/src/Microsoft.ML.Core/Data/IDataView.cs
+++ b/src/Microsoft.ML.Core/Data/IDataView.cs
@@ -63,7 +63,7 @@ internal interface ISchema
///
/// The input and output of Query Operators (Transforms). This is the fundamental data pipeline
- /// type, comparable to IEnumerable for LINQ.
+ /// type, comparable to for LINQ.
///
public interface IDataView
{
@@ -92,28 +92,28 @@ public interface IDataView
RowCursor GetRowCursor(Func needCol, Random rand = null);
///
- /// This constructs a set of parallel batch cursors. The value n is a recommended limit
- /// on cardinality. If is non-positive, this indicates that the caller
- /// has no recommendation, and the implementation should have some default behavior to cover
- /// this case. Note that this is strictly a recommendation: it is entirely possible that
- /// an implementation can return a different number of cursors.
+ /// This constructs a set of parallel batch cursors. The value is a recommended limit on
+ /// cardinality. If is non-positive, this indicates that the caller has no recommendation,
+ /// and the implementation should have some default behavior to cover this case. Note that this is strictly a
+ /// recommendation: it is entirely possible that an implementation can return a different number of cursors.
///
/// The cursors should return the same data as returned through
- /// , except partitioned: no two cursors
- /// should return the "same" row as would have been returned through the regular serial cursor,
- /// but all rows should be returned by exactly one of the cursors returned from this cursor.
- /// The cursors can have their values reconciled downstream through the use of the
- /// property.
+ /// , except partitioned: no two cursors should return the
+ /// "same" row as would have been returned through the regular serial cursor, but all rows should be returned by
+ /// exactly one of the cursors returned from this cursor. The cursors can have their values reconciled
+ /// downstream through the use of the property.
+ ///
+ /// The typical usage pattern is that a set of cursors is requested, each of them is then given to a set of
+ /// working threads that consume from them independently while, ultimately, the results are finally collated in
+ /// the end by exploiting the ordering of the property described above. More typical
+ /// scenarios will be content with pulling from the single serial cursor of
+ /// .
///
- /// This is an object that can be used to reconcile the
- /// returned array of cursors. When the array of cursors is of length 1, it is legal,
- /// indeed expected, that this parameter should be null.
/// The predicate, where a column is active if this returns true.
/// The suggested degree of parallelism.
/// An instance
///
- RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator,
- Func needCol, int n, Random rand = null);
+ RowCursor[] GetRowCursorSet(Func needCol, int n, Random rand = null);
///
/// Gets an instance of Schema.
@@ -122,20 +122,8 @@ RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator,
}
///
- /// This is used to consolidate parallel cursors into a single cursor. The object that determines
- /// the number of cursors and splits the row "stream" provides the consolidator object.
- ///
- public interface IRowCursorConsolidator
- {
- ///
- /// Create a consolidated cursor from the given parallel cursor set.
- ///
- RowCursor CreateCursor(IChannelProvider provider, RowCursor[] inputs);
- }
-
- ///
- /// Delegate type to get a value. This can used for efficient access to data in an IRow
- /// or IRowCursor.
+ /// Delegate type to get a value. This can be used for efficient access to data in a
+ /// or .
///
public delegate void ValueGetter(ref TValue value);
@@ -146,43 +134,54 @@ public interface IRowCursorConsolidator
public abstract class Row : IDisposable
{
///
- /// This is incremented when the underlying contents changes, giving clients a way to detect change.
- /// Generally it's -1 when the object is in an invalid state. In particular, for an , this is -1
- /// when the is or .
+ /// This is incremented when the underlying contents changes, giving clients a way to detect change. Generally
+ /// it's -1 when the object is in an invalid state. In particular, for an , this is -1
+ /// when the is or .
///
- /// Note that this position is not position within the underlying data, but position of this cursor only.
- /// If one, for example, opened a set of parallel streaming cursors, or a shuffled cursor, each such cursor's
- /// first valid entry would always have position 0.
+ /// Note that this position is not position within the underlying data, but position of this cursor only. If
+ /// one, for example, opened a set of parallel streaming cursors, or a shuffled cursor, each such cursor's first
+ /// valid entry would always have position 0.
///
public abstract long Position { get; }
///
- /// This provides a means for reconciling multiple streams of counted things. Generally, in each stream,
- /// batch numbers should be non-decreasing. Furthermore, any given batch number should only appear in one
- /// of the streams. Order is determined by batch number. The reconciler ensures that each stream (that is
- /// still active) has at least one item available, then takes the item with the smallest batch number.
+ /// This provides a means for reconciling multiple rows that have been produced generally from
+ /// . When getting a set, there is a need
+ /// to, while allowing parallel processing to proceed, always have an aim that the original order should be
+ /// reconverable. Note, whether or not a user cares about that original order in ones specific application is
+ /// another story altogether (most callers of this as a practical matter do not, otherwise they would not call
+ /// it), but at least in principle it should be possible to reconstruct the original order one would get from an
+ /// identically configured . So: for any cursor
+ /// implementation, batch numbers should be non-decreasing. Furthermore, any given batch number should only
+ /// appear in one of the cursors as returned by
+ /// . In this way, order is determined by
+ /// batch number. An operation that reconciles these cursors to produce a consistent single cursoring, could do
+ /// so by drawing from the single cursor, among all cursors in the set, that has the smallest batch number
+ /// available.
///
- /// Note that there is no suggestion that the batches for a particular entry will be consistent from
- /// cursoring to cursoring, except for the consistency in resulting in the same overall ordering. The same
- /// entry could have different batch numbers from one cursoring to another. There is also no requirement
- /// that any given batch number must appear, at all.
+ /// Note that there is no suggestion that the batches for a particular entry will be consistent from cursoring
+ /// to cursoring, except for the consistency in resulting in the same overall ordering. The same entry could
+ /// have different batch numbers from one cursoring to another. There is also no requirement that any given
+ /// batch number must appear, at all. It is merely a mechanism for recovering ordering from a possibly arbitrary
+ /// partitioning of the data. It also follows from this, of course, that considering the batch to be a property
+ /// of the data is completely invalid.
///
public abstract long Batch { get; }
///
/// A getter for a 128-bit ID value. It is common for objects to serve multiple
/// instances to iterate over what is supposed to be the same data, for example, in a
- /// a cursor set will produce the same data as a serial cursor, just partitioned, and a shuffled cursor
- /// will produce the same data as a serial cursor or any other shuffled cursor, only shuffled. The ID
- /// exists for applications that need to reconcile which entry is actually which. Ideally this ID should
- /// be unique, but for practical reasons, it suffices if collisions are simply extremely improbable.
+ /// a cursor set will produce the same data as a serial cursor, just partitioned, and a shuffled cursor will
+ /// produce the same data as a serial cursor or any other shuffled cursor, only shuffled. The ID exists for
+ /// applications that need to reconcile which entry is actually which. Ideally this ID should be unique, but for
+ /// practical reasons, it suffices if collisions are simply extremely improbable.
///
- /// Note that this ID, while it must be consistent for multiple streams according to the semantics
- /// above, is not considered part of the data per se. So, to take the example of a data view specifically,
- /// a single data view must render consistent IDs across all cursorings, but there is no suggestion at
- /// all that if the "same" data were presented in a different data view (as by, say, being transformed,
- /// cached, saved, or whatever), that the IDs between the two different data views would have any
- /// discernable relationship.
+ /// Note that this ID, while it must be consistent for multiple streams according to the semantics above, is not
+ /// considered part of the data per se. So, to take the example of a data view specifically, a single data view
+ /// must render consistent IDs across all cursorings, but there is no suggestion at all that if the "same" data
+ /// were presented in a different data view (as by, say, being transformed, cached, saved, or whatever), that
+ /// the IDs between the two different data views would have any discernable relationship.
public abstract ValueGetter GetIdGetter();
///
diff --git a/src/Microsoft.ML.Data/Data/DataViewUtils.cs b/src/Microsoft.ML.Data/Data/DataViewUtils.cs
index 9f60b3f418..01f6960cce 100644
--- a/src/Microsoft.ML.Data/Data/DataViewUtils.cs
+++ b/src/Microsoft.ML.Data/Data/DataViewUtils.cs
@@ -129,15 +129,20 @@ public static bool TryCreateConsolidatingCursor(out RowCursor curs,
return false;
}
- IRowCursorConsolidator consolidator;
- var inputs = view.GetRowCursorSet(out consolidator, predicate, cthd, rand);
+ var inputs = view.GetRowCursorSet(predicate, cthd, rand);
host.Check(Utils.Size(inputs) > 0);
- host.Check(inputs.Length == 1 || consolidator != null);
if (inputs.Length == 1)
curs = inputs[0];
else
- curs = consolidator.CreateCursor(host, inputs);
+ {
+ // We have a somewhat arbitrary batch size of about 64 for buffering results from the
+ // intermediate cursors, since that at least empirically for most datasets seems to
+ // strike a nice balance between a size large enough to benefit from parallelism but
+ // small enough so as to not be too onerous to keep in memory.
+ const int batchSize = 64;
+ curs = DataViewUtils.ConsolidateGeneric(host, inputs, batchSize);
+ }
return true;
}
@@ -146,13 +151,11 @@ public static bool TryCreateConsolidatingCursor(out RowCursor curs,
/// cardinality. If not all the active columns are cachable, this will only
/// produce the given input cursor.
///
- public static RowCursor[] CreateSplitCursors(out IRowCursorConsolidator consolidator,
- IChannelProvider provider, RowCursor input, int num)
+ public static RowCursor[] CreateSplitCursors(IChannelProvider provider, RowCursor input, int num)
{
Contracts.CheckValue(provider, nameof(provider));
provider.CheckValue(input, nameof(input));
- consolidator = null;
if (num <= 1)
return new RowCursor[1] { input };
@@ -168,7 +171,7 @@ public static RowCursor[] CreateSplitCursors(out IRowCursorConsolidator consolid
// REVIEW: Keep the utility method here, move this splitter stuff
// to some other file.
- return Splitter.Split(out consolidator, provider, input.Schema, input, num);
+ return Splitter.Split(provider, input.Schema, input, num);
}
///
@@ -249,7 +252,7 @@ public static RowCursor ConsolidateGeneric(IChannelProvider provider, RowCursor[
return inputs[0];
object[] pools = null;
- return Splitter.Consolidator.Consolidate(provider, inputs, batchSize, ref pools);
+ return Splitter.Consolidate(provider, inputs, batchSize, ref pools);
}
///
@@ -277,7 +280,6 @@ private sealed class Splitter
{
private readonly Schema _schema;
private readonly object[] _cachePools;
- private object[] _consolidateCachePools;
///
/// Pipes, in addition to column values, will also communicate extra information
@@ -299,214 +301,198 @@ private Splitter(Schema schema)
_cachePools = new object[_schema.Count + (int)ExtraIndex._Lim];
}
- public sealed class Consolidator : IRowCursorConsolidator
+ public static RowCursor Consolidate(IChannelProvider provider, RowCursor[] inputs, int batchSize, ref object[] ourPools)
{
- private readonly Splitter _splitter;
-
- public Consolidator(Splitter splitter)
+ Contracts.AssertValue(provider);
+ using (var ch = provider.Start("Consolidate"))
{
- Contracts.AssertValue(splitter);
- _splitter = splitter;
+ return ConsolidateCore(provider, inputs, ref ourPools, ch);
}
+ }
- public RowCursor CreateCursor(IChannelProvider provider, RowCursor[] inputs)
- {
- return Consolidate(provider, inputs, 128, ref _splitter._consolidateCachePools);
- }
+ private static RowCursor ConsolidateCore(IChannelProvider provider, RowCursor[] inputs, ref object[] ourPools, IChannel ch)
+ {
+ ch.CheckNonEmpty(inputs, nameof(inputs));
+ if (inputs.Length == 1)
+ return inputs[0];
+ ch.CheckParam(SameSchemaAndActivity(inputs), nameof(inputs), "Inputs not compatible for consolidation");
+
+ RowCursor cursor = inputs[0];
+ var schema = cursor.Schema;
+ ch.CheckParam(AllCachable(schema, cursor.IsColumnActive), nameof(inputs), "Inputs had some uncachable input columns");
- public static RowCursor Consolidate(IChannelProvider provider, RowCursor[] inputs, int batchSize, ref object[] ourPools)
+ int[] activeToCol;
+ int[] colToActive;
+ Utils.BuildSubsetMaps(schema.Count, cursor.IsColumnActive, out activeToCol, out colToActive);
+
+ // Because the schema of the consolidator is not necessary fixed, we are merely
+ // opportunistic about buffer sharing, from cursoring to cursoring. If we can do
+ // it easily, great, if not, no big deal.
+ if (Utils.Size(ourPools) != schema.Count)
+ ourPools = new object[schema.Count + (int)ExtraIndex._Lim];
+ // Create the out pipes.
+ OutPipe[] outPipes = new OutPipe[activeToCol.Length + (int)ExtraIndex._Lim];
+ for (int i = 0; i < activeToCol.Length; ++i)
{
- Contracts.AssertValue(provider);
- using (var ch = provider.Start("Consolidate"))
- {
- return ConsolidateCore(provider, inputs, ref ourPools, ch);
- }
+ int c = activeToCol[i];
+ ColumnType type = schema[c].Type;
+ var pool = GetPool(type, ourPools, c);
+ outPipes[i] = OutPipe.Create(type, pool);
}
+ int idIdx = activeToCol.Length + (int)ExtraIndex.Id;
+ outPipes[idIdx] = OutPipe.Create(NumberType.UG, GetPool(NumberType.UG, ourPools, idIdx));
+
+ // Create the structures to synchronize between the workers and the consumer.
+ const int toConsumeBound = 4;
+ var toConsume = new BlockingCollection(toConsumeBound);
+ var batchColumnPool = new MadeObjectPool(() => new BatchColumn[outPipes.Length]);
+ Thread[] workers = new Thread[inputs.Length];
+ MinWaiter waiter = new MinWaiter(workers.Length);
+ bool done = false;
- private static RowCursor ConsolidateCore(IChannelProvider provider, RowCursor[] inputs, ref object[] ourPools, IChannel ch)
+ for (int t = 0; t < workers.Length; ++t)
{
- ch.CheckNonEmpty(inputs, nameof(inputs));
- if (inputs.Length == 1)
- return inputs[0];
- ch.CheckParam(SameSchemaAndActivity(inputs), nameof(inputs), "Inputs not compatible for consolidation");
-
- RowCursor cursor = inputs[0];
- var schema = cursor.Schema;
- ch.CheckParam(AllCachable(schema, cursor.IsColumnActive), nameof(inputs), "Inputs had some uncachable input columns");
-
- int[] activeToCol;
- int[] colToActive;
- Utils.BuildSubsetMaps(schema.Count, cursor.IsColumnActive, out activeToCol, out colToActive);
-
- // Because the schema of the consolidator is not necessary fixed, we are merely
- // opportunistic about buffer sharing, from cursoring to cursoring. If we can do
- // it easily, great, if not, no big deal.
- if (Utils.Size(ourPools) != schema.Count)
- ourPools = new object[schema.Count + (int)ExtraIndex._Lim];
- // Create the out pipes.
- OutPipe[] outPipes = new OutPipe[activeToCol.Length + (int)ExtraIndex._Lim];
- for (int i = 0; i < activeToCol.Length; ++i)
+ var localCursor = inputs[t];
+ ch.Assert(localCursor.State == CursorState.NotStarted);
+ // Note that these all take ownership of their respective cursors,
+ // so they all handle their disposal internal to the thread.
+ workers[t] = Utils.CreateBackgroundThread(() =>
{
- int c = activeToCol[i];
- ColumnType type = schema[c].Type;
- var pool = GetPool(type, ourPools, c);
- outPipes[i] = OutPipe.Create(type, pool);
- }
- int idIdx = activeToCol.Length + (int)ExtraIndex.Id;
- outPipes[idIdx] = OutPipe.Create(NumberType.UG, GetPool(NumberType.UG, ourPools, idIdx));
-
- // Create the structures to synchronize between the workers and the consumer.
- const int toConsumeBound = 4;
- var toConsume = new BlockingCollection(toConsumeBound);
- var batchColumnPool = new MadeObjectPool(() => new BatchColumn[outPipes.Length]);
- Thread[] workers = new Thread[inputs.Length];
- MinWaiter waiter = new MinWaiter(workers.Length);
- bool done = false;
-
- for (int t = 0; t < workers.Length; ++t)
- {
- var localCursor = inputs[t];
- ch.Assert(localCursor.State == CursorState.NotStarted);
- // Note that these all take ownership of their respective cursors,
- // so they all handle their disposal internal to the thread.
- workers[t] = Utils.CreateBackgroundThread(() =>
- {
// This will be the last batch sent in the finally. If iteration procedes without
// error, it will remain null, and be sent as a sentinel. If iteration results in
// an exception that we catch, the exception catching block will set this to an
// exception bearing block, and that will be passed along as the last block instead.
Batch lastBatch = null;
- try
+ try
+ {
+ using (localCursor)
{
- using (localCursor)
- {
- InPipe[] inPipes = new InPipe[outPipes.Length];
- for (int i = 0; i < activeToCol.Length; ++i)
- inPipes[i] = outPipes[i].CreateInPipe(RowCursorUtils.GetGetterAsDelegate(localCursor, activeToCol[i]));
- inPipes[idIdx] = outPipes[idIdx].CreateInPipe(localCursor.GetIdGetter());
+ InPipe[] inPipes = new InPipe[outPipes.Length];
+ for (int i = 0; i < activeToCol.Length; ++i)
+ inPipes[i] = outPipes[i].CreateInPipe(RowCursorUtils.GetGetterAsDelegate(localCursor, activeToCol[i]));
+ inPipes[idIdx] = outPipes[idIdx].CreateInPipe(localCursor.GetIdGetter());
- long oldBatch = 0;
- int count = 0;
+ long oldBatch = 0;
+ int count = 0;
// This event is used to synchronize ourselves using a MinWaiter
// so that we add batches to the consumer queue at the appropriate time.
ManualResetEventSlim waiterEvent = null;
- Action pushBatch = () =>
+ Action pushBatch = () =>
+ {
+ if (count > 0)
{
- if (count > 0)
- {
- var batchColumns = batchColumnPool.Get();
- for (int i = 0; i < inPipes.Length; ++i)
- batchColumns[i] = inPipes[i].GetBatchColumnAndReset();
+ var batchColumns = batchColumnPool.Get();
+ for (int i = 0; i < inPipes.Length; ++i)
+ batchColumns[i] = inPipes[i].GetBatchColumnAndReset();
// REVIEW: Is it worth not allocating new Batch object for each batch?
var batch = new Batch(batchColumnPool, batchColumns, count, oldBatch);
- count = 0;
+ count = 0;
// The waiter event should never be null since this is only
// called after a point where waiter.Register has been called.
ch.AssertValue(waiterEvent);
- waiterEvent.Wait();
- waiterEvent = null;
- toConsume.Add(batch);
- }
- };
+ waiterEvent.Wait();
+ waiterEvent = null;
+ toConsume.Add(batch);
+ }
+ };
// Handle the first one separately, then go into the main loop.
if (localCursor.MoveNext() && !done)
- {
- oldBatch = localCursor.Batch;
- foreach (var pipe in inPipes)
- pipe.Fill();
- count++;
+ {
+ oldBatch = localCursor.Batch;
+ foreach (var pipe in inPipes)
+ pipe.Fill();
+ count++;
// Register with the min waiter that we want to wait on this batch number.
waiterEvent = waiter.Register(oldBatch);
- while (localCursor.MoveNext() && !done)
+ while (localCursor.MoveNext() && !done)
+ {
+ if (oldBatch != localCursor.Batch)
{
- if (oldBatch != localCursor.Batch)
- {
- ch.Assert(count == 0 || localCursor.Batch > oldBatch);
- pushBatch();
- oldBatch = localCursor.Batch;
- waiterEvent = waiter.Register(oldBatch);
- }
- foreach (var pipe in inPipes)
- pipe.Fill();
- count++;
+ ch.Assert(count == 0 || localCursor.Batch > oldBatch);
+ pushBatch();
+ oldBatch = localCursor.Batch;
+ waiterEvent = waiter.Register(oldBatch);
}
- pushBatch();
+ foreach (var pipe in inPipes)
+ pipe.Fill();
+ count++;
}
+ pushBatch();
}
}
- catch (Exception ex)
- {
+ }
+ catch (Exception ex)
+ {
// Whoops, we won't be sending null as the sentinel now.
lastBatch = new Batch(ex);
- toConsume.Add(new Batch(ex));
- }
- finally
+ toConsume.Add(new Batch(ex));
+ }
+ finally
+ {
+ if (waiter.Retire() == 0)
{
- if (waiter.Retire() == 0)
+ if (lastBatch == null)
{
- if (lastBatch == null)
- {
// If it wasn't null, this already sent along an exception bearing batch, in which
// case sending the sentinel is unnecessary and unhelpful.
toConsume.Add(null);
- }
- toConsume.CompleteAdding();
}
+ toConsume.CompleteAdding();
}
- });
- workers[t].Start();
- }
-
- Action quitAction = () =>
- {
- done = true;
- var myOutPipes = outPipes;
- foreach (var batch in toConsume.GetConsumingEnumerable())
- {
- if (batch == null)
- continue;
- batch.SetAll(myOutPipes);
- foreach (var outPipe in myOutPipes)
- outPipe.Unset();
}
- foreach (Thread thread in workers)
- thread.Join();
- };
-
- return new Cursor(provider, schema, activeToCol, colToActive, outPipes, toConsume, quitAction);
+ });
+ workers[t].Start();
}
- private static object GetPool(ColumnType type, object[] pools, int poolIdx)
+ Action quitAction = () =>
{
- Func
protected abstract RowCursor GetRowCursorCore(Func predicate, Random rand = null);
- public abstract RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator,
- Func predicate, int n, Random rand = null);
+ public abstract RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null);
}
///
@@ -733,19 +732,18 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return new Cursor(Host, this, input, active);
}
- public sealed override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator,
- Func predicate, int n, Random rand = null)
+ public sealed override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
Host.CheckValue(predicate, nameof(predicate));
Host.CheckValueOrNull(rand);
var inputPred = _bindings.GetDependencies(predicate);
var active = _bindings.GetActive(predicate);
- var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n, rand);
+ var inputs = Source.GetRowCursorSet(inputPred, n, rand);
Host.AssertNonEmpty(inputs);
if (inputs.Length == 1 && n > 1 && WantParallelCursors(predicate))
- inputs = DataViewUtils.CreateSplitCursors(out consolidator, Host, inputs[0], n);
+ inputs = DataViewUtils.CreateSplitCursors(Host, inputs[0], n);
Host.AssertNonEmpty(inputs);
var cursors = new RowCursor[inputs.Length];
diff --git a/src/Microsoft.ML.Parquet/ParquetLoader.cs b/src/Microsoft.ML.Parquet/ParquetLoader.cs
index 1b3d565594..11d2c2d38e 100644
--- a/src/Microsoft.ML.Parquet/ParquetLoader.cs
+++ b/src/Microsoft.ML.Parquet/ParquetLoader.cs
@@ -398,11 +398,10 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null)
return new Cursor(this, predicate, rand);
}
- public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
_host.CheckValue(predicate, nameof(predicate));
_host.CheckValueOrNull(rand);
- consolidator = null;
return new RowCursor[] { GetRowCursor(predicate, rand) };
}
diff --git a/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs b/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs
index ac999b0141..d8565e486e 100644
--- a/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs
+++ b/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs
@@ -365,9 +365,8 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return _transform.GetRowCount();
}
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
- consolidator = null;
return new RowCursor[] { GetRowCursorCore(predicate, rand) };
}
diff --git a/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs b/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs
index b53ef9ed4c..f5424397e3 100644
--- a/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs
+++ b/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs
@@ -448,9 +448,8 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return _transform.GetRowCount();
}
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
- consolidator = null;
return new RowCursor[] { GetRowCursorCore(predicate, rand) };
}
@@ -708,7 +707,7 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return new Cursor(Host, Source.GetRowCursor(predicateInput, rand), this, active);
}
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
Host.CheckValue(predicate, nameof(predicate));
Host.CheckValueOrNull(rand);
@@ -716,11 +715,11 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida
Func predicateInput;
var active = GetActive(predicate, out predicateInput);
- var inputs = Source.GetRowCursorSet(out consolidator, predicateInput, n, rand);
+ var inputs = Source.GetRowCursorSet(predicateInput, n, rand);
Host.AssertNonEmpty(inputs);
if (inputs.Length == 1 && n > 1 && _bindings.AddedColumnIndices.Any(predicate))
- inputs = DataViewUtils.CreateSplitCursors(out consolidator, Host, inputs[0], n);
+ inputs = DataViewUtils.CreateSplitCursors(Host, inputs[0], n);
Host.AssertNonEmpty(inputs);
var cursors = new RowCursor[inputs.Length];
diff --git a/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs b/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs
index ecb3bed6fe..d1783e873d 100644
--- a/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs
+++ b/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs
@@ -175,10 +175,9 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return cursor;
}
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
var cursor = GetRowCursorCore(predicate, rand);
- consolidator = null;
return new RowCursor[] { cursor };
}
diff --git a/src/Microsoft.ML.Transforms/GroupTransform.cs b/src/Microsoft.ML.Transforms/GroupTransform.cs
index e48f8e08a8..3511959015 100644
--- a/src/Microsoft.ML.Transforms/GroupTransform.cs
+++ b/src/Microsoft.ML.Transforms/GroupTransform.cs
@@ -173,11 +173,10 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
public override bool CanShuffle { get { return false; } }
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
Host.CheckValue(predicate, nameof(predicate));
Host.CheckValueOrNull(rand);
- consolidator = null;
return new RowCursor[] { GetRowCursorCore(predicate) };
}
diff --git a/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs b/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs
index 1f0e77a7d8..c24f7af467 100644
--- a/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs
+++ b/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs
@@ -308,8 +308,7 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return new Cursor(Host, _bindings, input, active);
}
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator,
- Func predicate, int n, Random rand = null)
+ public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
Host.CheckValue(predicate, nameof(predicate));
Host.CheckValueOrNull(rand);
@@ -320,7 +319,7 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida
if (n > 1 && ShouldUseParallelCursors(predicate) != false)
{
- var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n);
+ var inputs = Source.GetRowCursorSet(inputPred, n);
Host.AssertNonEmpty(inputs);
if (inputs.Length != 1)
@@ -335,7 +334,6 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida
else
input = Source.GetRowCursor(inputPred);
- consolidator = null;
return new RowCursor[] { new Cursor(Host, _bindings, input, active) };
}
diff --git a/src/Microsoft.ML.Transforms/ProduceIdTransform.cs b/src/Microsoft.ML.Transforms/ProduceIdTransform.cs
index 1260570037..7bcea11c9c 100644
--- a/src/Microsoft.ML.Transforms/ProduceIdTransform.cs
+++ b/src/Microsoft.ML.Transforms/ProduceIdTransform.cs
@@ -148,13 +148,13 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return new Cursor(Host, _bindings, input, active);
}
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
Host.CheckValue(predicate, nameof(predicate));
Host.CheckValueOrNull(rand);
var inputPred = _bindings.GetDependencies(predicate);
- RowCursor[] cursors = Source.GetRowCursorSet(out consolidator, inputPred, n, rand);
+ RowCursor[] cursors = Source.GetRowCursorSet(inputPred, n, rand);
bool active = predicate(_bindings.MapIinfoToCol(0));
for (int c = 0; c < cursors.Length; ++c)
cursors[c] = new Cursor(Host, _bindings, cursors[c], active);
diff --git a/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs b/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs
index 7085e803b8..949f02a564 100644
--- a/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs
+++ b/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs
@@ -120,7 +120,7 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null)
return new Cursor(this, input, predicate);
}
- public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null)
+ public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null)
{
Contracts.CheckValue(predicate, nameof(predicate));
Contracts.CheckParam(n >= 0, nameof(n));
@@ -128,7 +128,6 @@ public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func
// This transform is stateful, its contract is to allocate exactly one state object per cursor and call the filter function
// on every row in sequence. Therefore, parallel cursoring is not possible.
- consolidator = null;
return new[] { GetRowCursor(predicate, rand) };
}
diff --git a/src/Microsoft.ML.Transforms/UngroupTransform.cs b/src/Microsoft.ML.Transforms/UngroupTransform.cs
index dadf4e24c6..11008670d5 100644
--- a/src/Microsoft.ML.Transforms/UngroupTransform.cs
+++ b/src/Microsoft.ML.Transforms/UngroupTransform.cs
@@ -186,11 +186,11 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random
return new Cursor(Host, inputCursor, _schemaImpl, predicate);
}
- public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate,
+ public override RowCursor[] GetRowCursorSet(Func predicate,
int n, Random rand = null)
{
var activeInput = _schemaImpl.GetActiveInput(predicate);
- var inputCursors = Source.GetRowCursorSet(out consolidator, col => activeInput[col], n, null);
+ var inputCursors = Source.GetRowCursorSet(col => activeInput[col], n, null);
return Utils.BuildArray(inputCursors.Length,
x => new Cursor(Host, inputCursors[x], _schemaImpl, predicate));
}