From 245cf1596306333b555ea1efba74daa047954458 Mon Sep 17 00:00:00 2001 From: Tom Finley Date: Wed, 19 Dec 2018 20:04:46 -0800 Subject: [PATCH 1/3] Remove IRowCursorConsolidator. --- src/Microsoft.ML.Core/Data/IDataView.cs | 87 +++--- src/Microsoft.ML.Data/Data/DataViewUtils.cs | 290 ++++++++---------- src/Microsoft.ML.Data/Data/RowCursorUtils.cs | 3 +- .../DataLoadSave/Binary/BinaryLoader.cs | 4 +- .../DataLoadSave/CompositeDataLoader.cs | 5 +- .../DataLoadSave/PartitionedFileLoader.cs | 3 +- .../DataLoadSave/Text/TextLoader.cs | 5 +- .../DataLoadSave/Text/TextLoaderCursor.cs | 38 +-- .../DataLoadSave/Transpose/TransposeLoader.cs | 5 +- .../DataView/AppendRowsDataView.cs | 3 +- .../DataView/ArrayDataViewBuilder.cs | 4 +- .../DataView/CacheDataView.cs | 18 +- .../DataView/DataViewConstructionUtils.cs | 4 +- .../DataView/EmptyDataView.cs | 3 +- .../DataView/LambdaFilter.cs | 5 +- .../DataView/OpaqueDataView.cs | 5 +- .../DataView/RowToRowMapperTransform.cs | 6 +- src/Microsoft.ML.Data/DataView/Transposer.cs | 11 +- src/Microsoft.ML.Data/DataView/TypedCursor.cs | 8 +- src/Microsoft.ML.Data/DataView/ZipDataView.cs | 3 +- .../Dirty/ChooseColumnsByIndexTransform.cs | 5 +- .../Evaluators/RankerEvaluator.cs | 4 +- .../Scorers/RowToRowScorerBase.cs | 6 +- .../Training/TrainerUtils.cs | 14 +- .../Transforms/ColumnSelecting.cs | 6 +- .../Transforms/GenerateNumberTransform.cs | 6 +- src/Microsoft.ML.Data/Transforms/NAFilter.cs | 5 +- .../Transforms/NopTransform.cs | 4 +- .../Transforms/PerGroupTransformBase.cs | 3 +- .../Transforms/RangeFilter.cs | 5 +- .../Transforms/RowShufflingTransformer.cs | 4 +- .../Transforms/SkipTakeFilter.cs | 4 +- .../Transforms/TransformBase.cs | 10 +- src/Microsoft.ML.Parquet/ParquetLoader.cs | 3 +- .../SequentialTransformBase.cs | 3 +- .../SequentialTransformerBase.cs | 9 +- .../BootstrapSamplingTransformer.cs | 3 +- src/Microsoft.ML.Transforms/GroupTransform.cs | 3 +- .../OptionalColumnTransform.cs | 6 +- .../ProduceIdTransform.cs | 4 +- .../StatefulFilterTransform.cs | 3 +- .../UngroupTransform.cs | 4 +- 42 files changed, 252 insertions(+), 372 deletions(-) diff --git a/src/Microsoft.ML.Core/Data/IDataView.cs b/src/Microsoft.ML.Core/Data/IDataView.cs index d1f57a6f55..df0ff29e95 100644 --- a/src/Microsoft.ML.Core/Data/IDataView.cs +++ b/src/Microsoft.ML.Core/Data/IDataView.cs @@ -63,7 +63,7 @@ internal interface ISchema /// /// The input and output of Query Operators (Transforms). This is the fundamental data pipeline - /// type, comparable to IEnumerable for LINQ. + /// type, comparable to for LINQ. /// public interface IDataView { @@ -92,7 +92,7 @@ public interface IDataView RowCursor GetRowCursor(Func needCol, Random rand = null); /// - /// This constructs a set of parallel batch cursors. The value n is a recommended limit + /// This constructs a set of parallel batch cursors. The value is a recommended limit /// on cardinality. If is non-positive, this indicates that the caller /// has no recommendation, and the implementation should have some default behavior to cover /// this case. Note that this is strictly a recommendation: it is entirely possible that @@ -104,16 +104,18 @@ public interface IDataView /// but all rows should be returned by exactly one of the cursors returned from this cursor. /// The cursors can have their values reconciled downstream through the use of the /// property. + /// + /// The typical usage pattern is that a set of cursors is requested, each of them is then + /// given to a set of working threads that consume from them independently while, ultimately, + /// the results are finally collated in the end by exploiting the ordering of the + /// property described above. More typical scenarios will be content with pulling from the single + /// serial cursor of . /// - /// This is an object that can be used to reconcile the - /// returned array of cursors. When the array of cursors is of length 1, it is legal, - /// indeed expected, that this parameter should be null. /// The predicate, where a column is active if this returns true. /// The suggested degree of parallelism. /// An instance /// - RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func needCol, int n, Random rand = null); + RowCursor[] GetRowCursorSet(Func needCol, int n, Random rand = null); /// /// Gets an instance of Schema. @@ -122,20 +124,8 @@ RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, } /// - /// This is used to consolidate parallel cursors into a single cursor. The object that determines - /// the number of cursors and splits the row "stream" provides the consolidator object. - /// - public interface IRowCursorConsolidator - { - /// - /// Create a consolidated cursor from the given parallel cursor set. - /// - RowCursor CreateCursor(IChannelProvider provider, RowCursor[] inputs); - } - - /// - /// Delegate type to get a value. This can used for efficient access to data in an IRow - /// or IRowCursor. + /// Delegate type to get a value. This can used for efficient access to data in a + /// or . /// public delegate void ValueGetter(ref TValue value); @@ -146,43 +136,50 @@ public interface IRowCursorConsolidator public abstract class Row : IDisposable { /// - /// This is incremented when the underlying contents changes, giving clients a way to detect change. - /// Generally it's -1 when the object is in an invalid state. In particular, for an , this is -1 - /// when the is or . + /// This is incremented when the underlying contents changes, giving clients a way to detect change. Generally + /// it's -1 when the object is in an invalid state. In particular, for an , this is -1 + /// when the is or . /// - /// Note that this position is not position within the underlying data, but position of this cursor only. - /// If one, for example, opened a set of parallel streaming cursors, or a shuffled cursor, each such cursor's - /// first valid entry would always have position 0. + /// Note that this position is not position within the underlying data, but position of this cursor only. If + /// one, for example, opened a set of parallel streaming cursors, or a shuffled cursor, each such cursor's first + /// valid entry would always have position 0. /// public abstract long Position { get; } /// - /// This provides a means for reconciling multiple streams of counted things. Generally, in each stream, - /// batch numbers should be non-decreasing. Furthermore, any given batch number should only appear in one - /// of the streams. Order is determined by batch number. The reconciler ensures that each stream (that is - /// still active) has at least one item available, then takes the item with the smallest batch number. + /// This provides a means for reconciling multiple rows that have been produced generally from + /// . When getting a set, there is a need + /// to, while allowing parallel processing to proceed, always have an aim thatthe original order should be + /// reconverable. So: for any cursor implementation, batch numbers should be non-decreasing. Furthermore, any + /// given batch number should only appear in one of the cursors as returned by + /// . In this way, order is determined by + /// batch number. An operation that reconciles these cursors to produce a consistent single cursoring, could do + /// so by drawing from the single cursor, among all cursors in the set, that has the smallest batch number + /// available. /// - /// Note that there is no suggestion that the batches for a particular entry will be consistent from - /// cursoring to cursoring, except for the consistency in resulting in the same overall ordering. The same - /// entry could have different batch numbers from one cursoring to another. There is also no requirement - /// that any given batch number must appear, at all. + /// Note that there is no suggestion that the batches for a particular entry will be consistent from cursoring + /// to cursoring, except for the consistency in resulting in the same overall ordering. The same entry could + /// have different batch numbers from one cursoring to another. There is also no requirement that any given + /// batch number must appear, at all. It is merely a mechanism for recovering ordering from a possibly arbitrary + /// partitioning of the data. It also follows from this, of course, that considering the batch to be a property + /// of the data is completely invalid. /// public abstract long Batch { get; } /// /// A getter for a 128-bit ID value. It is common for objects to serve multiple /// instances to iterate over what is supposed to be the same data, for example, in a - /// a cursor set will produce the same data as a serial cursor, just partitioned, and a shuffled cursor - /// will produce the same data as a serial cursor or any other shuffled cursor, only shuffled. The ID - /// exists for applications that need to reconcile which entry is actually which. Ideally this ID should - /// be unique, but for practical reasons, it suffices if collisions are simply extremely improbable. + /// a cursor set will produce the same data as a serial cursor, just partitioned, and a shuffled cursor will + /// produce the same data as a serial cursor or any other shuffled cursor, only shuffled. The ID exists for + /// applications that need to reconcile which entry is actually which. Ideally this ID should be unique, but for + /// practical reasons, it suffices if collisions are simply extremely improbable. /// - /// Note that this ID, while it must be consistent for multiple streams according to the semantics - /// above, is not considered part of the data per se. So, to take the example of a data view specifically, - /// a single data view must render consistent IDs across all cursorings, but there is no suggestion at - /// all that if the "same" data were presented in a different data view (as by, say, being transformed, - /// cached, saved, or whatever), that the IDs between the two different data views would have any - /// discernable relationship. + /// Note that this ID, while it must be consistent for multiple streams according to the semantics above, is not + /// considered part of the data per se. So, to take the example of a data view specifically, a single data view + /// must render consistent IDs across all cursorings, but there is no suggestion at all that if the "same" data + /// were presented in a different data view (as by, say, being transformed, cached, saved, or whatever), that + /// the IDs between the two different data views would have any discernable relationship. public abstract ValueGetter GetIdGetter(); /// diff --git a/src/Microsoft.ML.Data/Data/DataViewUtils.cs b/src/Microsoft.ML.Data/Data/DataViewUtils.cs index 9f60b3f418..60f7c95340 100644 --- a/src/Microsoft.ML.Data/Data/DataViewUtils.cs +++ b/src/Microsoft.ML.Data/Data/DataViewUtils.cs @@ -129,15 +129,13 @@ public static bool TryCreateConsolidatingCursor(out RowCursor curs, return false; } - IRowCursorConsolidator consolidator; - var inputs = view.GetRowCursorSet(out consolidator, predicate, cthd, rand); + var inputs = view.GetRowCursorSet(predicate, cthd, rand); host.Check(Utils.Size(inputs) > 0); - host.Check(inputs.Length == 1 || consolidator != null); if (inputs.Length == 1) curs = inputs[0]; else - curs = consolidator.CreateCursor(host, inputs); + curs = DataViewUtils.ConsolidateGeneric(host, inputs, 64); return true; } @@ -146,13 +144,11 @@ public static bool TryCreateConsolidatingCursor(out RowCursor curs, /// cardinality. If not all the active columns are cachable, this will only /// produce the given input cursor. /// - public static RowCursor[] CreateSplitCursors(out IRowCursorConsolidator consolidator, - IChannelProvider provider, RowCursor input, int num) + public static RowCursor[] CreateSplitCursors(IChannelProvider provider, RowCursor input, int num) { Contracts.CheckValue(provider, nameof(provider)); provider.CheckValue(input, nameof(input)); - consolidator = null; if (num <= 1) return new RowCursor[1] { input }; @@ -168,7 +164,7 @@ public static RowCursor[] CreateSplitCursors(out IRowCursorConsolidator consolid // REVIEW: Keep the utility method here, move this splitter stuff // to some other file. - return Splitter.Split(out consolidator, provider, input.Schema, input, num); + return Splitter.Split(provider, input.Schema, input, num); } /// @@ -249,7 +245,7 @@ public static RowCursor ConsolidateGeneric(IChannelProvider provider, RowCursor[ return inputs[0]; object[] pools = null; - return Splitter.Consolidator.Consolidate(provider, inputs, batchSize, ref pools); + return Splitter.Consolidate(provider, inputs, batchSize, ref pools); } /// @@ -277,7 +273,6 @@ private sealed class Splitter { private readonly Schema _schema; private readonly object[] _cachePools; - private object[] _consolidateCachePools; /// /// Pipes, in addition to column values, will also communicate extra information @@ -299,214 +294,198 @@ private Splitter(Schema schema) _cachePools = new object[_schema.Count + (int)ExtraIndex._Lim]; } - public sealed class Consolidator : IRowCursorConsolidator + public static RowCursor Consolidate(IChannelProvider provider, RowCursor[] inputs, int batchSize, ref object[] ourPools) { - private readonly Splitter _splitter; - - public Consolidator(Splitter splitter) + Contracts.AssertValue(provider); + using (var ch = provider.Start("Consolidate")) { - Contracts.AssertValue(splitter); - _splitter = splitter; + return ConsolidateCore(provider, inputs, ref ourPools, ch); } + } - public RowCursor CreateCursor(IChannelProvider provider, RowCursor[] inputs) - { - return Consolidate(provider, inputs, 128, ref _splitter._consolidateCachePools); - } + private static RowCursor ConsolidateCore(IChannelProvider provider, RowCursor[] inputs, ref object[] ourPools, IChannel ch) + { + ch.CheckNonEmpty(inputs, nameof(inputs)); + if (inputs.Length == 1) + return inputs[0]; + ch.CheckParam(SameSchemaAndActivity(inputs), nameof(inputs), "Inputs not compatible for consolidation"); + + RowCursor cursor = inputs[0]; + var schema = cursor.Schema; + ch.CheckParam(AllCachable(schema, cursor.IsColumnActive), nameof(inputs), "Inputs had some uncachable input columns"); - public static RowCursor Consolidate(IChannelProvider provider, RowCursor[] inputs, int batchSize, ref object[] ourPools) + int[] activeToCol; + int[] colToActive; + Utils.BuildSubsetMaps(schema.Count, cursor.IsColumnActive, out activeToCol, out colToActive); + + // Because the schema of the consolidator is not necessary fixed, we are merely + // opportunistic about buffer sharing, from cursoring to cursoring. If we can do + // it easily, great, if not, no big deal. + if (Utils.Size(ourPools) != schema.Count) + ourPools = new object[schema.Count + (int)ExtraIndex._Lim]; + // Create the out pipes. + OutPipe[] outPipes = new OutPipe[activeToCol.Length + (int)ExtraIndex._Lim]; + for (int i = 0; i < activeToCol.Length; ++i) { - Contracts.AssertValue(provider); - using (var ch = provider.Start("Consolidate")) - { - return ConsolidateCore(provider, inputs, ref ourPools, ch); - } + int c = activeToCol[i]; + ColumnType type = schema[c].Type; + var pool = GetPool(type, ourPools, c); + outPipes[i] = OutPipe.Create(type, pool); } + int idIdx = activeToCol.Length + (int)ExtraIndex.Id; + outPipes[idIdx] = OutPipe.Create(NumberType.UG, GetPool(NumberType.UG, ourPools, idIdx)); - private static RowCursor ConsolidateCore(IChannelProvider provider, RowCursor[] inputs, ref object[] ourPools, IChannel ch) + // Create the structures to synchronize between the workers and the consumer. + const int toConsumeBound = 4; + var toConsume = new BlockingCollection(toConsumeBound); + var batchColumnPool = new MadeObjectPool(() => new BatchColumn[outPipes.Length]); + Thread[] workers = new Thread[inputs.Length]; + MinWaiter waiter = new MinWaiter(workers.Length); + bool done = false; + + for (int t = 0; t < workers.Length; ++t) { - ch.CheckNonEmpty(inputs, nameof(inputs)); - if (inputs.Length == 1) - return inputs[0]; - ch.CheckParam(SameSchemaAndActivity(inputs), nameof(inputs), "Inputs not compatible for consolidation"); - - RowCursor cursor = inputs[0]; - var schema = cursor.Schema; - ch.CheckParam(AllCachable(schema, cursor.IsColumnActive), nameof(inputs), "Inputs had some uncachable input columns"); - - int[] activeToCol; - int[] colToActive; - Utils.BuildSubsetMaps(schema.Count, cursor.IsColumnActive, out activeToCol, out colToActive); - - // Because the schema of the consolidator is not necessary fixed, we are merely - // opportunistic about buffer sharing, from cursoring to cursoring. If we can do - // it easily, great, if not, no big deal. - if (Utils.Size(ourPools) != schema.Count) - ourPools = new object[schema.Count + (int)ExtraIndex._Lim]; - // Create the out pipes. - OutPipe[] outPipes = new OutPipe[activeToCol.Length + (int)ExtraIndex._Lim]; - for (int i = 0; i < activeToCol.Length; ++i) + var localCursor = inputs[t]; + ch.Assert(localCursor.State == CursorState.NotStarted); + // Note that these all take ownership of their respective cursors, + // so they all handle their disposal internal to the thread. + workers[t] = Utils.CreateBackgroundThread(() => { - int c = activeToCol[i]; - ColumnType type = schema[c].Type; - var pool = GetPool(type, ourPools, c); - outPipes[i] = OutPipe.Create(type, pool); - } - int idIdx = activeToCol.Length + (int)ExtraIndex.Id; - outPipes[idIdx] = OutPipe.Create(NumberType.UG, GetPool(NumberType.UG, ourPools, idIdx)); - - // Create the structures to synchronize between the workers and the consumer. - const int toConsumeBound = 4; - var toConsume = new BlockingCollection(toConsumeBound); - var batchColumnPool = new MadeObjectPool(() => new BatchColumn[outPipes.Length]); - Thread[] workers = new Thread[inputs.Length]; - MinWaiter waiter = new MinWaiter(workers.Length); - bool done = false; - - for (int t = 0; t < workers.Length; ++t) - { - var localCursor = inputs[t]; - ch.Assert(localCursor.State == CursorState.NotStarted); - // Note that these all take ownership of their respective cursors, - // so they all handle their disposal internal to the thread. - workers[t] = Utils.CreateBackgroundThread(() => - { // This will be the last batch sent in the finally. If iteration procedes without // error, it will remain null, and be sent as a sentinel. If iteration results in // an exception that we catch, the exception catching block will set this to an // exception bearing block, and that will be passed along as the last block instead. Batch lastBatch = null; - try + try + { + using (localCursor) { - using (localCursor) - { - InPipe[] inPipes = new InPipe[outPipes.Length]; - for (int i = 0; i < activeToCol.Length; ++i) - inPipes[i] = outPipes[i].CreateInPipe(RowCursorUtils.GetGetterAsDelegate(localCursor, activeToCol[i])); - inPipes[idIdx] = outPipes[idIdx].CreateInPipe(localCursor.GetIdGetter()); + InPipe[] inPipes = new InPipe[outPipes.Length]; + for (int i = 0; i < activeToCol.Length; ++i) + inPipes[i] = outPipes[i].CreateInPipe(RowCursorUtils.GetGetterAsDelegate(localCursor, activeToCol[i])); + inPipes[idIdx] = outPipes[idIdx].CreateInPipe(localCursor.GetIdGetter()); - long oldBatch = 0; - int count = 0; + long oldBatch = 0; + int count = 0; // This event is used to synchronize ourselves using a MinWaiter // so that we add batches to the consumer queue at the appropriate time. ManualResetEventSlim waiterEvent = null; - Action pushBatch = () => + Action pushBatch = () => + { + if (count > 0) { - if (count > 0) - { - var batchColumns = batchColumnPool.Get(); - for (int i = 0; i < inPipes.Length; ++i) - batchColumns[i] = inPipes[i].GetBatchColumnAndReset(); + var batchColumns = batchColumnPool.Get(); + for (int i = 0; i < inPipes.Length; ++i) + batchColumns[i] = inPipes[i].GetBatchColumnAndReset(); // REVIEW: Is it worth not allocating new Batch object for each batch? var batch = new Batch(batchColumnPool, batchColumns, count, oldBatch); - count = 0; + count = 0; // The waiter event should never be null since this is only // called after a point where waiter.Register has been called. ch.AssertValue(waiterEvent); - waiterEvent.Wait(); - waiterEvent = null; - toConsume.Add(batch); - } - }; + waiterEvent.Wait(); + waiterEvent = null; + toConsume.Add(batch); + } + }; // Handle the first one separately, then go into the main loop. if (localCursor.MoveNext() && !done) - { - oldBatch = localCursor.Batch; - foreach (var pipe in inPipes) - pipe.Fill(); - count++; + { + oldBatch = localCursor.Batch; + foreach (var pipe in inPipes) + pipe.Fill(); + count++; // Register with the min waiter that we want to wait on this batch number. waiterEvent = waiter.Register(oldBatch); - while (localCursor.MoveNext() && !done) + while (localCursor.MoveNext() && !done) + { + if (oldBatch != localCursor.Batch) { - if (oldBatch != localCursor.Batch) - { - ch.Assert(count == 0 || localCursor.Batch > oldBatch); - pushBatch(); - oldBatch = localCursor.Batch; - waiterEvent = waiter.Register(oldBatch); - } - foreach (var pipe in inPipes) - pipe.Fill(); - count++; + ch.Assert(count == 0 || localCursor.Batch > oldBatch); + pushBatch(); + oldBatch = localCursor.Batch; + waiterEvent = waiter.Register(oldBatch); } - pushBatch(); + foreach (var pipe in inPipes) + pipe.Fill(); + count++; } + pushBatch(); } } - catch (Exception ex) - { + } + catch (Exception ex) + { // Whoops, we won't be sending null as the sentinel now. lastBatch = new Batch(ex); - toConsume.Add(new Batch(ex)); - } - finally + toConsume.Add(new Batch(ex)); + } + finally + { + if (waiter.Retire() == 0) { - if (waiter.Retire() == 0) + if (lastBatch == null) { - if (lastBatch == null) - { // If it wasn't null, this already sent along an exception bearing batch, in which // case sending the sentinel is unnecessary and unhelpful. toConsume.Add(null); - } - toConsume.CompleteAdding(); } + toConsume.CompleteAdding(); } - }); - workers[t].Start(); - } - - Action quitAction = () => - { - done = true; - var myOutPipes = outPipes; - foreach (var batch in toConsume.GetConsumingEnumerable()) - { - if (batch == null) - continue; - batch.SetAll(myOutPipes); - foreach (var outPipe in myOutPipes) - outPipe.Unset(); } - foreach (Thread thread in workers) - thread.Join(); - }; - - return new Cursor(provider, schema, activeToCol, colToActive, outPipes, toConsume, quitAction); + }); + workers[t].Start(); } - private static object GetPool(ColumnType type, object[] pools, int poolIdx) + Action quitAction = () => { - Func func = GetPoolCore; - var method = func.GetMethodInfo().GetGenericMethodDefinition().MakeGenericMethod(type.RawType); - return method.Invoke(null, new object[] { pools, poolIdx }); - } + done = true; + var myOutPipes = outPipes; + foreach (var batch in toConsume.GetConsumingEnumerable()) + { + if (batch == null) + continue; + batch.SetAll(myOutPipes); + foreach (var outPipe in myOutPipes) + outPipe.Unset(); + } + foreach (Thread thread in workers) + thread.Join(); + }; - private static MadeObjectPool GetPoolCore(object[] pools, int poolIdx) - { - var pool = pools[poolIdx] as MadeObjectPool; - if (pool == null) - pools[poolIdx] = pool = new MadeObjectPool(() => null); - return pool; - } + return new Cursor(provider, schema, activeToCol, colToActive, outPipes, toConsume, quitAction); + } + + private static object GetPool(ColumnType type, object[] pools, int poolIdx) + { + Func func = GetPoolCore; + var method = func.GetMethodInfo().GetGenericMethodDefinition().MakeGenericMethod(type.RawType); + return method.Invoke(null, new object[] { pools, poolIdx }); + } + + private static MadeObjectPool GetPoolCore(object[] pools, int poolIdx) + { + var pool = pools[poolIdx] as MadeObjectPool; + if (pool == null) + pools[poolIdx] = pool = new MadeObjectPool(() => null); + return pool; } - public static RowCursor[] Split(out IRowCursorConsolidator consolidator, IChannelProvider provider, Schema schema, RowCursor input, int cthd) + public static RowCursor[] Split(IChannelProvider provider, Schema schema, RowCursor input, int cthd) { Contracts.AssertValue(provider, "provider"); var splitter = new Splitter(schema); using (var ch = provider.Start("CursorSplitter")) { - var result = splitter.SplitCore(out consolidator, provider, input, cthd); + var result = splitter.SplitCore(provider, input, cthd); return result; } } - private RowCursor[] SplitCore(out IRowCursorConsolidator consolidator, IChannelProvider ch, RowCursor input, int cthd) + private RowCursor[] SplitCore(IChannelProvider ch, RowCursor input, int cthd) { Contracts.AssertValue(ch); ch.AssertValue(input); @@ -637,7 +616,6 @@ private RowCursor[] SplitCore(out IRowCursorConsolidator consolidator, IChannelP var cursors = new Cursor[cthd]; for (int i = 0; i < cthd; ++i) cursors[i] = new Cursor(ch, _schema, activeToCol, colToActive, outPipes[i], toConsume, quitAction); - consolidator = new Consolidator(this); return cursors; } diff --git a/src/Microsoft.ML.Data/Data/RowCursorUtils.cs b/src/Microsoft.ML.Data/Data/RowCursorUtils.cs index 71f288e170..0e5e226b39 100644 --- a/src/Microsoft.ML.Data/Data/RowCursorUtils.cs +++ b/src/Microsoft.ML.Data/Data/RowCursorUtils.cs @@ -515,11 +515,10 @@ public RowCursor GetRowCursor(Func needCol, Random rand = null) return new Cursor(_host, this, active); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func needCol, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func needCol, int n, Random rand = null) { _host.CheckValue(needCol, nameof(needCol)); _host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { GetRowCursor(needCol, rand) }; } diff --git a/src/Microsoft.ML.Data/DataLoadSave/Binary/BinaryLoader.cs b/src/Microsoft.ML.Data/DataLoadSave/Binary/BinaryLoader.cs index 03b6ae8f34..ddaf747b3a 100644 --- a/src/Microsoft.ML.Data/DataLoadSave/Binary/BinaryLoader.cs +++ b/src/Microsoft.ML.Data/DataLoadSave/Binary/BinaryLoader.cs @@ -1254,12 +1254,10 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return GetRowCursorCore(predicate, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); _host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { GetRowCursorCore(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/DataLoadSave/CompositeDataLoader.cs b/src/Microsoft.ML.Data/DataLoadSave/CompositeDataLoader.cs index 67dbb3cb9a..1395ec833d 100644 --- a/src/Microsoft.ML.Data/DataLoadSave/CompositeDataLoader.cs +++ b/src/Microsoft.ML.Data/DataLoadSave/CompositeDataLoader.cs @@ -576,12 +576,11 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return View.GetRowCursor(predicate, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); _host.CheckValueOrNull(rand); - return View.GetRowCursorSet(out consolidator, predicate, n, rand); + return View.GetRowCursorSet(predicate, n, rand); } public SlotCursor GetSlotCursor(int col) diff --git a/src/Microsoft.ML.Data/DataLoadSave/PartitionedFileLoader.cs b/src/Microsoft.ML.Data/DataLoadSave/PartitionedFileLoader.cs index 796d19c4e1..7b97cce6d6 100644 --- a/src/Microsoft.ML.Data/DataLoadSave/PartitionedFileLoader.cs +++ b/src/Microsoft.ML.Data/DataLoadSave/PartitionedFileLoader.cs @@ -298,9 +298,8 @@ public RowCursor GetRowCursor(Func needCol, Random rand = null) return new Cursor(_host, this, _files, needCol, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func needCol, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func needCol, int n, Random rand = null) { - consolidator = null; var cursor = new Cursor(_host, this, _files, needCol, rand); return new RowCursor[] { cursor }; } diff --git a/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoader.cs b/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoader.cs index 7e8a90b75e..27339ba1e0 100644 --- a/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoader.cs +++ b/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoader.cs @@ -1385,13 +1385,12 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return Cursor.Create(_reader, _files, active); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); _host.CheckValueOrNull(rand); var active = Utils.BuildArray(_reader._bindings.ColumnCount, predicate); - return Cursor.CreateSet(out consolidator, _reader, _files, active, n); + return Cursor.CreateSet(_reader, _files, active, n); } public void Save(ModelSaveContext ctx) => _reader.Save(ctx); diff --git a/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs b/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs index 95adf7fee3..ac879cdac8 100644 --- a/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs +++ b/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs @@ -150,8 +150,7 @@ public static RowCursor Create(TextLoader parent, IMultiStreamSource files, bool return new Cursor(parent, stats, active, reader, srcNeeded, cthd); } - public static RowCursor[] CreateSet(out IRowCursorConsolidator consolidator, - TextLoader parent, IMultiStreamSource files, bool[] active, int n) + public static RowCursor[] CreateSet(TextLoader parent, IMultiStreamSource files, bool[] active, int n) { // Note that files is allowed to be empty. Contracts.AssertValue(parent); @@ -166,12 +165,8 @@ public static RowCursor[] CreateSet(out IRowCursorConsolidator consolidator, var reader = new LineReader(files, BatchSize, 100, parent.HasHeader, parent._maxRows, cthd); var stats = new ParseStats(parent._host, cthd); if (cthd <= 1) - { - consolidator = null; return new RowCursor[1] { new Cursor(parent, stats, active, reader, srcNeeded, 1) }; - } - consolidator = new Consolidator(cthd); var cursors = new RowCursor[cthd]; try { @@ -821,37 +816,6 @@ private void Parse(int tid) } } } - - /// - /// The consolidator object. This simply records the number of threads and checks - /// that they match at the end. - /// - private sealed class Consolidator : IRowCursorConsolidator - { - private int _cthd; - - public Consolidator(int cthd) - { - Contracts.Assert(cthd > 1); - _cthd = cthd; - } - - public RowCursor CreateCursor(IChannelProvider provider, RowCursor[] inputs) - { - Contracts.AssertValue(provider); - int cthd = Interlocked.Exchange(ref _cthd, 0); - provider.Check(cthd > 1, "Consolidator can only be used once"); - provider.Check(Utils.Size(inputs) == cthd, "Unexpected number of cursors"); - - // ConsolidateGeneric does all the standard validity checks: all cursors non-null, - // all have the same schema, all have the same active columns, and all active - // column types are cachable. - using (var ch = provider.Start("Consolidator")) - { - return DataViewUtils.ConsolidateGeneric(provider, inputs, BatchSize); - } - } - } } } } diff --git a/src/Microsoft.ML.Data/DataLoadSave/Transpose/TransposeLoader.cs b/src/Microsoft.ML.Data/DataLoadSave/Transpose/TransposeLoader.cs index 76fab9628c..7566d156a5 100644 --- a/src/Microsoft.ML.Data/DataLoadSave/Transpose/TransposeLoader.cs +++ b/src/Microsoft.ML.Data/DataLoadSave/Transpose/TransposeLoader.cs @@ -677,12 +677,11 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return new Cursor(this, predicate); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); if (HasRowData) - return _schemaEntry.GetView().GetRowCursorSet(out consolidator, predicate, n, rand); - consolidator = null; + return _schemaEntry.GetView().GetRowCursorSet(predicate, n, rand); return new RowCursor[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/DataView/AppendRowsDataView.cs b/src/Microsoft.ML.Data/DataView/AppendRowsDataView.cs index d6a7c02c1b..a499ce2e61 100644 --- a/src/Microsoft.ML.Data/DataView/AppendRowsDataView.cs +++ b/src/Microsoft.ML.Data/DataView/AppendRowsDataView.cs @@ -154,9 +154,8 @@ public RowCursor GetRowCursor(Func needCol, Random rand = null) return new RandCursor(this, needCol, rand, _counts); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - consolidator = null; return new RowCursor[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/DataView/ArrayDataViewBuilder.cs b/src/Microsoft.ML.Data/DataView/ArrayDataViewBuilder.cs index a418b311b9..9c106c2996 100644 --- a/src/Microsoft.ML.Data/DataView/ArrayDataViewBuilder.cs +++ b/src/Microsoft.ML.Data/DataView/ArrayDataViewBuilder.cs @@ -233,12 +233,10 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return new Cursor(_host, this, predicate, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); _host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { new Cursor(_host, this, predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/DataView/CacheDataView.cs b/src/Microsoft.ML.Data/DataView/CacheDataView.cs index a8cba62ca6..04368994a8 100644 --- a/src/Microsoft.ML.Data/DataView/CacheDataView.cs +++ b/src/Microsoft.ML.Data/DataView/CacheDataView.cs @@ -247,8 +247,7 @@ private RowCursor GetRowCursorWaiterCore(TWaiter waiter, Func.Create(waiter, perm)); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); _host.CheckValueOrNull(rand); @@ -256,29 +255,14 @@ public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, n = DataViewUtils.GetThreadCount(_host, n); if (n <= 1) - { - consolidator = null; return new RowCursor[] { GetRowCursor(predicate, rand) }; - } - consolidator = new Consolidator(); var waiter = WaiterWaiter.Create(this, predicate); if (waiter.IsTrivial) return GetRowCursorSetWaiterCore(TrivialWaiter.Create(this), predicate, n, rand); return GetRowCursorSetWaiterCore(waiter, predicate, n, rand); } - /// - /// Minimal consolidator. - /// - private sealed class Consolidator : IRowCursorConsolidator - { - public RowCursor CreateCursor(IChannelProvider provider, RowCursor[] inputs) - { - return DataViewUtils.ConsolidateGeneric(provider, inputs, _batchSize); - } - } - private RowCursor[] GetRowCursorSetWaiterCore(TWaiter waiter, Func predicate, int n, Random rand) where TWaiter : struct, IWaiter { diff --git a/src/Microsoft.ML.Data/DataView/DataViewConstructionUtils.cs b/src/Microsoft.ML.Data/DataView/DataViewConstructionUtils.cs index 491919420b..46304599fe 100644 --- a/src/Microsoft.ML.Data/DataView/DataViewConstructionUtils.cs +++ b/src/Microsoft.ML.Data/DataView/DataViewConstructionUtils.cs @@ -399,10 +399,8 @@ protected DataViewBase(IHostEnvironment env, string name, InternalSchemaDefiniti public abstract RowCursor GetRowCursor(Func predicate, Random rand = null); - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, - int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - consolidator = null; return new[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/DataView/EmptyDataView.cs b/src/Microsoft.ML.Data/DataView/EmptyDataView.cs index cd516850bd..45560794ba 100644 --- a/src/Microsoft.ML.Data/DataView/EmptyDataView.cs +++ b/src/Microsoft.ML.Data/DataView/EmptyDataView.cs @@ -35,11 +35,10 @@ public RowCursor GetRowCursor(Func needCol, Random rand = null) return new Cursor(_host, Schema, needCol); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func needCol, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func needCol, int n, Random rand = null) { _host.CheckValue(needCol, nameof(needCol)); _host.CheckValueOrNull(rand); - consolidator = null; return new[] { new Cursor(_host, Schema, needCol) }; } diff --git a/src/Microsoft.ML.Data/DataView/LambdaFilter.cs b/src/Microsoft.ML.Data/DataView/LambdaFilter.cs index 97758f9d79..bb1753d7e0 100644 --- a/src/Microsoft.ML.Data/DataView/LambdaFilter.cs +++ b/src/Microsoft.ML.Data/DataView/LambdaFilter.cs @@ -117,15 +117,14 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(this, input, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); bool[] active; Func inputPred = GetActive(predicate, out active); - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n, rand); + var inputs = Source.GetRowCursorSet(inputPred, n, rand); Host.AssertNonEmpty(inputs); // No need to split if this is given 1 input cursor. diff --git a/src/Microsoft.ML.Data/DataView/OpaqueDataView.cs b/src/Microsoft.ML.Data/DataView/OpaqueDataView.cs index 25613791d0..835ca035fe 100644 --- a/src/Microsoft.ML.Data/DataView/OpaqueDataView.cs +++ b/src/Microsoft.ML.Data/DataView/OpaqueDataView.cs @@ -32,10 +32,9 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return _source.GetRowCursor(predicate, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - return _source.GetRowCursorSet(out consolidator, predicate, n, rand); + return _source.GetRowCursorSet(predicate, n, rand); } } } diff --git a/src/Microsoft.ML.Data/DataView/RowToRowMapperTransform.cs b/src/Microsoft.ML.Data/DataView/RowToRowMapperTransform.cs index b32ae42464..2e4a8d6446 100644 --- a/src/Microsoft.ML.Data/DataView/RowToRowMapperTransform.cs +++ b/src/Microsoft.ML.Data/DataView/RowToRowMapperTransform.cs @@ -190,7 +190,7 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, Source.GetRowCursor(predicateInput, rand), this, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); @@ -198,11 +198,11 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida Func predicateInput; var active = GetActive(predicate, out predicateInput); - var inputs = Source.GetRowCursorSet(out consolidator, predicateInput, n, rand); + var inputs = Source.GetRowCursorSet(predicateInput, n, rand); Host.AssertNonEmpty(inputs); if (inputs.Length == 1 && n > 1 && _bindings.AddedColumnIndices.Any(predicate)) - inputs = DataViewUtils.CreateSplitCursors(out consolidator, Host, inputs[0], n); + inputs = DataViewUtils.CreateSplitCursors(Host, inputs[0], n); Host.AssertNonEmpty(inputs); var cursors = new RowCursor[inputs.Length]; diff --git a/src/Microsoft.ML.Data/DataView/Transposer.cs b/src/Microsoft.ML.Data/DataView/Transposer.cs index c6f54069e0..95be4d8f69 100644 --- a/src/Microsoft.ML.Data/DataView/Transposer.cs +++ b/src/Microsoft.ML.Data/DataView/Transposer.cs @@ -271,9 +271,9 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return _view.GetRowCursor(predicate, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - return _view.GetRowCursorSet(out consolidator, predicate, n, rand); + return _view.GetRowCursorSet(predicate, n, rand); } public long? GetRowCount() @@ -866,13 +866,13 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return new Cursor(_host, this, _input.GetRowCursor(srcPred, rand), predicate, activeSplitters); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); _host.CheckValueOrNull(rand); bool[] activeSplitters; var srcPred = CreateInputPredicate(predicate, out activeSplitters); - var result = _input.GetRowCursorSet(out consolidator, srcPred, n, rand); + var result = _input.GetRowCursorSet(srcPred, n, rand); for (int i = 0; i < result.Length; ++i) result[i] = new Cursor(_host, this, result[i], predicate, activeSplitters); return result; @@ -1528,10 +1528,9 @@ private RowCursor GetRowCursor(bool active) return new Cursor(this, active); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); - consolidator = null; return new RowCursor[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/DataView/TypedCursor.cs b/src/Microsoft.ML.Data/DataView/TypedCursor.cs index 4ca0b8ff33..034ba371a4 100644 --- a/src/Microsoft.ML.Data/DataView/TypedCursor.cs +++ b/src/Microsoft.ML.Data/DataView/TypedCursor.cs @@ -192,23 +192,21 @@ public Func GetDependencies(Func additionalColumnsPredicat /// /// Create a set of cursors with additional active columns. /// - /// The consolidator for the original row cursors /// Predicate that denotes which additional columns to include in the cursor, /// in addition to the columns that are needed for populating the object. /// Number of cursors to create /// Random generator to use - public RowCursor[] GetCursorSet(out IRowCursorConsolidator consolidator, - Func additionalColumnsPredicate, int n, Random rand) + public RowCursor[] GetCursorSet(Func additionalColumnsPredicate, int n, Random rand) { _host.CheckValue(additionalColumnsPredicate, nameof(additionalColumnsPredicate)); _host.CheckValueOrNull(rand); Func inputPredicate = col => _columnIndices.Contains(col) || additionalColumnsPredicate(col); - var inputs = _data.GetRowCursorSet(out consolidator, inputPredicate, n, rand); + var inputs = _data.GetRowCursorSet(inputPredicate, n, rand); _host.AssertNonEmpty(inputs); if (inputs.Length == 1 && n > 1) - inputs = DataViewUtils.CreateSplitCursors(out consolidator, _host, inputs[0], n); + inputs = DataViewUtils.CreateSplitCursors(_host, inputs[0], n); _host.AssertNonEmpty(inputs); return inputs diff --git a/src/Microsoft.ML.Data/DataView/ZipDataView.cs b/src/Microsoft.ML.Data/DataView/ZipDataView.cs index a09a4fa7f4..714412595e 100644 --- a/src/Microsoft.ML.Data/DataView/ZipDataView.cs +++ b/src/Microsoft.ML.Data/DataView/ZipDataView.cs @@ -99,9 +99,8 @@ private RowCursor GetMinimumCursor(IDataView dv) return dv.GetRowCursor(x => false); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - consolidator = null; return new RowCursor[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/Dirty/ChooseColumnsByIndexTransform.cs b/src/Microsoft.ML.Data/Dirty/ChooseColumnsByIndexTransform.cs index 5e63af9864..769a3100a1 100644 --- a/src/Microsoft.ML.Data/Dirty/ChooseColumnsByIndexTransform.cs +++ b/src/Microsoft.ML.Data/Dirty/ChooseColumnsByIndexTransform.cs @@ -252,15 +252,14 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, _bindings, input, active); } - public sealed override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public sealed override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); var inputPred = _bindings.GetDependencies(predicate); var active = _bindings.GetActive(predicate); - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n, rand); + var inputs = Source.GetRowCursorSet(inputPred, n, rand); Host.AssertNonEmpty(inputs); // No need to split if this is given 1 input cursor. diff --git a/src/Microsoft.ML.Data/Evaluators/RankerEvaluator.cs b/src/Microsoft.ML.Data/Evaluators/RankerEvaluator.cs index 2fbcafc85d..5dc4a6dff4 100644 --- a/src/Microsoft.ML.Data/Evaluators/RankerEvaluator.cs +++ b/src/Microsoft.ML.Data/Evaluators/RankerEvaluator.cs @@ -615,9 +615,9 @@ public RowCursor GetRowCursor(Func needCol, Random rand = null) return _transform.GetRowCursor(needCol, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func needCol, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func needCol, int n, Random rand = null) { - return _transform.GetRowCursorSet(out consolidator, needCol, n, rand); + return _transform.GetRowCursorSet(needCol, n, rand); } private sealed class Transform : PerGroupTransformBase diff --git a/src/Microsoft.ML.Data/Scorers/RowToRowScorerBase.cs b/src/Microsoft.ML.Data/Scorers/RowToRowScorerBase.cs index 7ea996ca2c..4b0afe59c1 100644 --- a/src/Microsoft.ML.Data/Scorers/RowToRowScorerBase.cs +++ b/src/Microsoft.ML.Data/Scorers/RowToRowScorerBase.cs @@ -133,7 +133,7 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, this, input, active, predicateMapper); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); @@ -142,11 +142,11 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida Func predicateInput; Func predicateMapper; var active = GetActive(bindings, predicate, out predicateInput, out predicateMapper); - var inputs = Source.GetRowCursorSet(out consolidator, predicateInput, n, rand); + var inputs = Source.GetRowCursorSet(predicateInput, n, rand); Contracts.AssertNonEmpty(inputs); if (inputs.Length == 1 && n > 1 && WantParallelCursors(predicate) && (Source.GetRowCount() ?? int.MaxValue) > n) - inputs = DataViewUtils.CreateSplitCursors(out consolidator, Host, inputs[0], n); + inputs = DataViewUtils.CreateSplitCursors(Host, inputs[0], n); Contracts.AssertNonEmpty(inputs); var cursors = new RowCursor[inputs.Length]; diff --git a/src/Microsoft.ML.Data/Training/TrainerUtils.cs b/src/Microsoft.ML.Data/Training/TrainerUtils.cs index 27b47a6c48..77268dd4c9 100644 --- a/src/Microsoft.ML.Data/Training/TrainerUtils.cs +++ b/src/Microsoft.ML.Data/Training/TrainerUtils.cs @@ -242,12 +242,12 @@ public static RowCursor CreateRowCursor(this RoleMappedData data, CursOpt opt, R => data.Data.GetRowCursor(CreatePredicate(data, opt, extraCols), rand); /// - /// Create a row cursor set for the RoleMappedData with the indicated standard columns active. + /// Create a row cursor set for the with the indicated standard columns active. /// This does not verify that the columns exist, but merely activates the ones that do exist. /// - public static RowCursor[] CreateRowCursorSet(this RoleMappedData data, out IRowCursorConsolidator consolidator, + public static RowCursor[] CreateRowCursorSet(this RoleMappedData data, CursOpt opt, int n, Random rand, IEnumerable extraCols = null) - => data.Data.GetRowCursorSet(out consolidator, CreatePredicate(data, opt, extraCols), n, rand); + => data.Data.GetRowCursorSet(CreatePredicate(data, opt, extraCols), n, rand); private static void AddOpt(HashSet cols, ColumnInfo info) { @@ -563,11 +563,9 @@ public TCurs[] CreateSet(int n, Random rand = null, params int[] extraCols) lock (_lock) opt = _opts; - // The intended use of this sort of thing is for cases where we have no interest in - // doing consolidation at all, that is, the consuming endpoint using these typed - // cursors wants to consume them as a set. - IRowCursorConsolidator consolidator; - var inputs = _data.CreateRowCursorSet(out consolidator, opt, n, rand, extraCols); + // Users of this method will tend to consume the cursors in the set in separate + // threads, and so gain benefit from the parallel transformation of the data. + var inputs = _data.CreateRowCursorSet(opt, n, rand, extraCols); Contracts.Assert(Utils.Size(inputs) > 0); Action signal; diff --git a/src/Microsoft.ML.Data/Transforms/ColumnSelecting.cs b/src/Microsoft.ML.Data/Transforms/ColumnSelecting.cs index dc21cc0543..8fd08b3a35 100644 --- a/src/Microsoft.ML.Data/Transforms/ColumnSelecting.cs +++ b/src/Microsoft.ML.Data/Transforms/ColumnSelecting.cs @@ -639,14 +639,14 @@ public RowCursor GetRowCursor(Func needCol, Random rand = null) return new Cursor(_host, _mapper, inputRowCursor, active); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func needCol, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func needCol, int n, Random rand = null) { _host.CheckValue(needCol, nameof(needCol)); _host.CheckValueOrNull(rand); // Build out the active state for the input var inputPred = GetDependencies(needCol); - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n, rand); + var inputs = Source.GetRowCursorSet(inputPred, n, rand); // Build out the acitve state for the output var active = Utils.BuildArray(_mapper.OutputSchema.Count, needCol); @@ -655,9 +655,7 @@ public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func // No need to split if this is given 1 input cursor. var cursors = new RowCursor[inputs.Length]; for (int i = 0; i < inputs.Length; i++) - { cursors[i] = new Cursor(_host, _mapper, inputs[i], active); - } return cursors; } diff --git a/src/Microsoft.ML.Data/Transforms/GenerateNumberTransform.cs b/src/Microsoft.ML.Data/Transforms/GenerateNumberTransform.cs index b77c4eb02d..86446b13cc 100644 --- a/src/Microsoft.ML.Data/Transforms/GenerateNumberTransform.cs +++ b/src/Microsoft.ML.Data/Transforms/GenerateNumberTransform.cs @@ -344,8 +344,7 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, _bindings, input, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); @@ -356,7 +355,7 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida if (n > 1 && ShouldUseParallelCursors(predicate) != false) { - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n); + var inputs = Source.GetRowCursorSet(inputPred, n); Host.AssertNonEmpty(inputs); if (inputs.Length != 1) @@ -371,7 +370,6 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida else input = Source.GetRowCursor(inputPred); - consolidator = null; return new RowCursor[] { new Cursor(Host, _bindings, input, active) }; } diff --git a/src/Microsoft.ML.Data/Transforms/NAFilter.cs b/src/Microsoft.ML.Data/Transforms/NAFilter.cs index 4f0af2c570..8a5d324c36 100644 --- a/src/Microsoft.ML.Data/Transforms/NAFilter.cs +++ b/src/Microsoft.ML.Data/Transforms/NAFilter.cs @@ -215,15 +215,14 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(this, input, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); bool[] active; Func inputPred = GetActive(predicate, out active); - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n, rand); + var inputs = Source.GetRowCursorSet(inputPred, n, rand); Host.AssertNonEmpty(inputs); // No need to split if this is given 1 input cursor. diff --git a/src/Microsoft.ML.Data/Transforms/NopTransform.cs b/src/Microsoft.ML.Data/Transforms/NopTransform.cs index 278af962a9..ad41595b68 100644 --- a/src/Microsoft.ML.Data/Transforms/NopTransform.cs +++ b/src/Microsoft.ML.Data/Transforms/NopTransform.cs @@ -123,9 +123,9 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return Source.GetRowCursor(predicate, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - return Source.GetRowCursorSet(out consolidator, predicate, n, rand); + return Source.GetRowCursorSet(predicate, n, rand); } public Func GetDependencies(Func predicate) diff --git a/src/Microsoft.ML.Data/Transforms/PerGroupTransformBase.cs b/src/Microsoft.ML.Data/Transforms/PerGroupTransformBase.cs index e32ed03cea..a67dfa3652 100644 --- a/src/Microsoft.ML.Data/Transforms/PerGroupTransformBase.cs +++ b/src/Microsoft.ML.Data/Transforms/PerGroupTransformBase.cs @@ -152,11 +152,10 @@ public virtual void Save(ModelSaveContext ctx) return Source.GetRowCount(); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/Transforms/RangeFilter.cs b/src/Microsoft.ML.Data/Transforms/RangeFilter.cs index 3fd53afd5d..0b8cdcce07 100644 --- a/src/Microsoft.ML.Data/Transforms/RangeFilter.cs +++ b/src/Microsoft.ML.Data/Transforms/RangeFilter.cs @@ -215,15 +215,14 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return CreateCursorCore(input, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); bool[] active; Func inputPred = GetActive(predicate, out active); - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n, rand); + var inputs = Source.GetRowCursorSet(inputPred, n, rand); Host.AssertNonEmpty(inputs); // No need to split if this is given 1 input cursor. diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index 0c0f4a696d..f62aedc12d 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -289,12 +289,10 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, _poolRows, input, rand); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { GetRowCursorCore(predicate, rand) }; } diff --git a/src/Microsoft.ML.Data/Transforms/SkipTakeFilter.cs b/src/Microsoft.ML.Data/Transforms/SkipTakeFilter.cs index 7582c53108..eae681db0b 100644 --- a/src/Microsoft.ML.Data/Transforms/SkipTakeFilter.cs +++ b/src/Microsoft.ML.Data/Transforms/SkipTakeFilter.cs @@ -198,12 +198,10 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, input, OutputSchema, activeColumns, _skip, _take); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { GetRowCursorCore(predicate) }; } diff --git a/src/Microsoft.ML.Data/Transforms/TransformBase.cs b/src/Microsoft.ML.Data/Transforms/TransformBase.cs index 6fdf7ce7d3..fa6f887852 100644 --- a/src/Microsoft.ML.Data/Transforms/TransformBase.cs +++ b/src/Microsoft.ML.Data/Transforms/TransformBase.cs @@ -95,8 +95,7 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) /// protected abstract RowCursor GetRowCursorCore(Func predicate, Random rand = null); - public abstract RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null); + public abstract RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null); } /// @@ -733,19 +732,18 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, this, input, active); } - public sealed override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public sealed override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); var inputPred = _bindings.GetDependencies(predicate); var active = _bindings.GetActive(predicate); - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n, rand); + var inputs = Source.GetRowCursorSet(inputPred, n, rand); Host.AssertNonEmpty(inputs); if (inputs.Length == 1 && n > 1 && WantParallelCursors(predicate)) - inputs = DataViewUtils.CreateSplitCursors(out consolidator, Host, inputs[0], n); + inputs = DataViewUtils.CreateSplitCursors(Host, inputs[0], n); Host.AssertNonEmpty(inputs); var cursors = new RowCursor[inputs.Length]; diff --git a/src/Microsoft.ML.Parquet/ParquetLoader.cs b/src/Microsoft.ML.Parquet/ParquetLoader.cs index 1b3d565594..11d2c2d38e 100644 --- a/src/Microsoft.ML.Parquet/ParquetLoader.cs +++ b/src/Microsoft.ML.Parquet/ParquetLoader.cs @@ -398,11 +398,10 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return new Cursor(this, predicate, rand); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { _host.CheckValue(predicate, nameof(predicate)); _host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs b/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs index ac999b0141..d8565e486e 100644 --- a/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs +++ b/src/Microsoft.ML.TimeSeries/SequentialTransformBase.cs @@ -365,9 +365,8 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return _transform.GetRowCount(); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - consolidator = null; return new RowCursor[] { GetRowCursorCore(predicate, rand) }; } diff --git a/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs b/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs index b53ef9ed4c..f5424397e3 100644 --- a/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs +++ b/src/Microsoft.ML.TimeSeries/SequentialTransformerBase.cs @@ -448,9 +448,8 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return _transform.GetRowCount(); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { - consolidator = null; return new RowCursor[] { GetRowCursorCore(predicate, rand) }; } @@ -708,7 +707,7 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, Source.GetRowCursor(predicateInput, rand), this, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); @@ -716,11 +715,11 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida Func predicateInput; var active = GetActive(predicate, out predicateInput); - var inputs = Source.GetRowCursorSet(out consolidator, predicateInput, n, rand); + var inputs = Source.GetRowCursorSet(predicateInput, n, rand); Host.AssertNonEmpty(inputs); if (inputs.Length == 1 && n > 1 && _bindings.AddedColumnIndices.Any(predicate)) - inputs = DataViewUtils.CreateSplitCursors(out consolidator, Host, inputs[0], n); + inputs = DataViewUtils.CreateSplitCursors(Host, inputs[0], n); Host.AssertNonEmpty(inputs); var cursors = new RowCursor[inputs.Length]; diff --git a/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs b/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs index ecb3bed6fe..d1783e873d 100644 --- a/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs +++ b/src/Microsoft.ML.Transforms/BootstrapSamplingTransformer.cs @@ -175,10 +175,9 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return cursor; } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { var cursor = GetRowCursorCore(predicate, rand); - consolidator = null; return new RowCursor[] { cursor }; } diff --git a/src/Microsoft.ML.Transforms/GroupTransform.cs b/src/Microsoft.ML.Transforms/GroupTransform.cs index e48f8e08a8..3511959015 100644 --- a/src/Microsoft.ML.Transforms/GroupTransform.cs +++ b/src/Microsoft.ML.Transforms/GroupTransform.cs @@ -173,11 +173,10 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random public override bool CanShuffle { get { return false; } } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); - consolidator = null; return new RowCursor[] { GetRowCursorCore(predicate) }; } diff --git a/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs b/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs index 1f0e77a7d8..c24f7af467 100644 --- a/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs +++ b/src/Microsoft.ML.Transforms/OptionalColumnTransform.cs @@ -308,8 +308,7 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, _bindings, input, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, - Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); @@ -320,7 +319,7 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida if (n > 1 && ShouldUseParallelCursors(predicate) != false) { - var inputs = Source.GetRowCursorSet(out consolidator, inputPred, n); + var inputs = Source.GetRowCursorSet(inputPred, n); Host.AssertNonEmpty(inputs); if (inputs.Length != 1) @@ -335,7 +334,6 @@ public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolida else input = Source.GetRowCursor(inputPred); - consolidator = null; return new RowCursor[] { new Cursor(Host, _bindings, input, active) }; } diff --git a/src/Microsoft.ML.Transforms/ProduceIdTransform.cs b/src/Microsoft.ML.Transforms/ProduceIdTransform.cs index 1260570037..7bcea11c9c 100644 --- a/src/Microsoft.ML.Transforms/ProduceIdTransform.cs +++ b/src/Microsoft.ML.Transforms/ProduceIdTransform.cs @@ -148,13 +148,13 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, _bindings, input, active); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Host.CheckValue(predicate, nameof(predicate)); Host.CheckValueOrNull(rand); var inputPred = _bindings.GetDependencies(predicate); - RowCursor[] cursors = Source.GetRowCursorSet(out consolidator, inputPred, n, rand); + RowCursor[] cursors = Source.GetRowCursorSet(inputPred, n, rand); bool active = predicate(_bindings.MapIinfoToCol(0)); for (int c = 0; c < cursors.Length; ++c) cursors[c] = new Cursor(Host, _bindings, cursors[c], active); diff --git a/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs b/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs index 7085e803b8..949f02a564 100644 --- a/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs +++ b/src/Microsoft.ML.Transforms/StatefulFilterTransform.cs @@ -120,7 +120,7 @@ public RowCursor GetRowCursor(Func predicate, Random rand = null) return new Cursor(this, input, predicate); } - public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, int n, Random rand = null) + public RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { Contracts.CheckValue(predicate, nameof(predicate)); Contracts.CheckParam(n >= 0, nameof(n)); @@ -128,7 +128,6 @@ public RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func // This transform is stateful, its contract is to allocate exactly one state object per cursor and call the filter function // on every row in sequence. Therefore, parallel cursoring is not possible. - consolidator = null; return new[] { GetRowCursor(predicate, rand) }; } diff --git a/src/Microsoft.ML.Transforms/UngroupTransform.cs b/src/Microsoft.ML.Transforms/UngroupTransform.cs index dadf4e24c6..11008670d5 100644 --- a/src/Microsoft.ML.Transforms/UngroupTransform.cs +++ b/src/Microsoft.ML.Transforms/UngroupTransform.cs @@ -186,11 +186,11 @@ protected override RowCursor GetRowCursorCore(Func predicate, Random return new Cursor(Host, inputCursor, _schemaImpl, predicate); } - public override RowCursor[] GetRowCursorSet(out IRowCursorConsolidator consolidator, Func predicate, + public override RowCursor[] GetRowCursorSet(Func predicate, int n, Random rand = null) { var activeInput = _schemaImpl.GetActiveInput(predicate); - var inputCursors = Source.GetRowCursorSet(out consolidator, col => activeInput[col], n, null); + var inputCursors = Source.GetRowCursorSet(col => activeInput[col], n, null); return Utils.BuildArray(inputCursors.Length, x => new Cursor(Host, inputCursors[x], _schemaImpl, predicate)); } From f8f89c0a037e019d6c908052f924e039ffae43c0 Mon Sep 17 00:00:00 2001 From: Tom Finley Date: Thu, 20 Dec 2018 08:50:27 -0800 Subject: [PATCH 2/3] Scott documentation review. --- src/Microsoft.ML.Core/Data/IDataView.cs | 40 +++++++++++++------------ 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/Microsoft.ML.Core/Data/IDataView.cs b/src/Microsoft.ML.Core/Data/IDataView.cs index df0ff29e95..51f6400dc0 100644 --- a/src/Microsoft.ML.Core/Data/IDataView.cs +++ b/src/Microsoft.ML.Core/Data/IDataView.cs @@ -92,24 +92,22 @@ public interface IDataView RowCursor GetRowCursor(Func needCol, Random rand = null); /// - /// This constructs a set of parallel batch cursors. The value is a recommended limit - /// on cardinality. If is non-positive, this indicates that the caller - /// has no recommendation, and the implementation should have some default behavior to cover - /// this case. Note that this is strictly a recommendation: it is entirely possible that - /// an implementation can return a different number of cursors. + /// This constructs a set of parallel batch cursors. The value is a recommended limit on + /// cardinality. If is non-positive, this indicates that the caller has no recommendation, + /// and the implementation should have some default behavior to cover this case. Note that this is strictly a + /// recommendation: it is entirely possible that an implementation can return a different number of cursors. /// /// The cursors should return the same data as returned through - /// , except partitioned: no two cursors - /// should return the "same" row as would have been returned through the regular serial cursor, - /// but all rows should be returned by exactly one of the cursors returned from this cursor. - /// The cursors can have their values reconciled downstream through the use of the - /// property. + /// , except partitioned: no two cursors should return the + /// "same" row as would have been returned through the regular serial cursor, but all rows should be returned by + /// exactly one of the cursors returned from this cursor. The cursors can have their values reconciled + /// downstream through the use of the property. /// - /// The typical usage pattern is that a set of cursors is requested, each of them is then - /// given to a set of working threads that consume from them independently while, ultimately, - /// the results are finally collated in the end by exploiting the ordering of the - /// property described above. More typical scenarios will be content with pulling from the single - /// serial cursor of . + /// The typical usage pattern is that a set of cursors is requested, each of them is then given to a set of + /// working threads that consume from them independently while, ultimately, the results are finally collated in + /// the end by exploiting the ordering of the property described above. More typical + /// scenarios will be content with pulling from the single serial cursor of + /// . /// /// The predicate, where a column is active if this returns true. /// The suggested degree of parallelism. @@ -124,7 +122,7 @@ public interface IDataView } /// - /// Delegate type to get a value. This can used for efficient access to data in a + /// Delegate type to get a value. This can be used for efficient access to data in a /// or . /// public delegate void ValueGetter(ref TValue value); @@ -150,9 +148,13 @@ public abstract class Row : IDisposable /// /// This provides a means for reconciling multiple rows that have been produced generally from /// . When getting a set, there is a need - /// to, while allowing parallel processing to proceed, always have an aim thatthe original order should be - /// reconverable. So: for any cursor implementation, batch numbers should be non-decreasing. Furthermore, any - /// given batch number should only appear in one of the cursors as returned by + /// to, while allowing parallel processing to proceed, always have an aim that the original order should be + /// reconverable. Note, whether or not a user cares about that original order in ones specific application is + /// another story altogether (most callers of this as a practical matter do not, otherwise they would not call + /// it), but at least in principle it should be possible to reconstruct the original order one would get from an + /// identically configured . So: for any cursor + /// implementation, batch numbers should be non-decreasing. Furthermore, any given batch number should only + /// appear in one of the cursors as returned by /// . In this way, order is determined by /// batch number. An operation that reconciles these cursors to produce a consistent single cursoring, could do /// so by drawing from the single cursor, among all cursors in the set, that has the smallest batch number From 36c5b98ddfaa583dfabd833a61b340d31a027ceb Mon Sep 17 00:00:00 2001 From: Tom Finley Date: Thu, 20 Dec 2018 11:01:45 -0800 Subject: [PATCH 3/3] Senja about why 64. --- src/Microsoft.ML.Data/Data/DataViewUtils.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.ML.Data/Data/DataViewUtils.cs b/src/Microsoft.ML.Data/Data/DataViewUtils.cs index 60f7c95340..01f6960cce 100644 --- a/src/Microsoft.ML.Data/Data/DataViewUtils.cs +++ b/src/Microsoft.ML.Data/Data/DataViewUtils.cs @@ -135,7 +135,14 @@ public static bool TryCreateConsolidatingCursor(out RowCursor curs, if (inputs.Length == 1) curs = inputs[0]; else - curs = DataViewUtils.ConsolidateGeneric(host, inputs, 64); + { + // We have a somewhat arbitrary batch size of about 64 for buffering results from the + // intermediate cursors, since that at least empirically for most datasets seems to + // strike a nice balance between a size large enough to benefit from parallelism but + // small enough so as to not be too onerous to keep in memory. + const int batchSize = 64; + curs = DataViewUtils.ConsolidateGeneric(host, inputs, batchSize); + } return true; }