Skip to content

Commit 7a5b303

Browse files
mandyshiehshauheen
authored andcommitted
Added Block Size Checks for ParquetLoader (#120)
* added ceiling to mathutils and have parquetloader default to sequential reading instead of throwing * factor out sequence creation and add check for overflow in MathUtils
1 parent 3012428 commit 7a5b303

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

src/Microsoft.ML.Core/Utilities/MathUtils.cs

+11
Original file line numberDiff line numberDiff line change
@@ -871,5 +871,16 @@ public static double Cos(double a)
871871
var res = Math.Cos(a);
872872
return Math.Abs(res) > 1 ? double.NaN : res;
873873
}
874+
875+
/// <summary>
876+
/// Returns the smallest integral value that is greater than or equal to the result of the division.
877+
/// </summary>
878+
/// <param name="numerator">Number to be divided.</param>
879+
/// <param name="denomenator">Number with which to divide the numerator.</param>
880+
/// <returns></returns>
881+
public static long DivisionCeiling(long numerator, long denomenator)
882+
{
883+
return (checked(numerator + denomenator) - 1) / denomenator;
884+
}
874885
}
875886
}

src/Microsoft.ML.Parquet/ParquetLoader.cs

+38-11
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public sealed class Arguments
9494
private readonly int _columnChunkReadSize;
9595
private readonly Column[] _columnsLoaded;
9696
private readonly DataSet _schemaDataSet;
97-
private const int _defaultColumnChunkReadSize = 100; // Should ideally be close to Rowgroup size
97+
private const int _defaultColumnChunkReadSize = 1000000;
9898

9999
private bool _disposed;
100100

@@ -368,8 +368,8 @@ private sealed class Cursor : RootCursorBase, IRowCursor
368368
private readonly Delegate[] _getters;
369369
private readonly ReaderOptions _readerOptions;
370370
private int _curDataSetRow;
371-
private IEnumerator _dataSetEnumerator;
372-
private IEnumerator _blockEnumerator;
371+
private IEnumerator<int> _dataSetEnumerator;
372+
private IEnumerator<int> _blockEnumerator;
373373
private IList[] _columnValues;
374374
private IRandom _rand;
375375

@@ -390,11 +390,18 @@ public Cursor(ParquetLoader parent, Func<int, bool> predicate, IRandom rand)
390390
Columns = _loader._columnsLoaded.Select(i => i.Name).ToArray()
391391
};
392392

393-
int numBlocks = (int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count));
394-
int[] blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks);
393+
// The number of blocks is calculated based on the specified rows in a block (defaults to 1M).
394+
// Since we want to shuffle the blocks in addition to shuffling the rows in each block, checks
395+
// are put in place to ensure we can produce a shuffle order for the blocks.
396+
var numBlocks = MathUtils.DivisionCeiling((long)parent.GetRowCount(), _readerOptions.Count);
397+
if (numBlocks > int.MaxValue)
398+
{
399+
throw _loader._host.ExceptParam(nameof(Arguments.ColumnChunkReadSize), "Error due to too many blocks. Try increasing block size.");
400+
}
401+
var blockOrder = CreateOrderSequence((int)numBlocks);
395402
_blockEnumerator = blockOrder.GetEnumerator();
396403

397-
_dataSetEnumerator = new int[0].GetEnumerator(); // Initialize an empty enumerator to get started
404+
_dataSetEnumerator = Enumerable.Empty<int>().GetEnumerator();
398405
_columnValues = new IList[_actives.Length];
399406
_getters = new Delegate[_actives.Length];
400407
for (int i = 0; i < _actives.Length; ++i)
@@ -472,12 +479,12 @@ protected override bool MoveNextCore()
472479
{
473480
if (_dataSetEnumerator.MoveNext())
474481
{
475-
_curDataSetRow = (int)_dataSetEnumerator.Current;
482+
_curDataSetRow = _dataSetEnumerator.Current;
476483
return true;
477484
}
478485
else if (_blockEnumerator.MoveNext())
479486
{
480-
_readerOptions.Offset = (int)_blockEnumerator.Current * _readerOptions.Count;
487+
_readerOptions.Offset = (long)_blockEnumerator.Current * _readerOptions.Count;
481488

482489
// When current dataset runs out, read the next portion of the parquet file.
483490
DataSet ds;
@@ -486,9 +493,9 @@ protected override bool MoveNextCore()
486493
ds = ParquetReader.Read(_loader._parquetStream, _loader._parquetOptions, _readerOptions);
487494
}
488495

489-
int[] dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount);
496+
var dataSetOrder = CreateOrderSequence(ds.RowCount);
490497
_dataSetEnumerator = dataSetOrder.GetEnumerator();
491-
_curDataSetRow = dataSetOrder[0];
498+
_curDataSetRow = dataSetOrder.ElementAt(0);
492499

493500
// Cache list for each active column
494501
for (int i = 0; i < _actives.Length; i++)
@@ -533,6 +540,26 @@ public bool IsColumnActive(int col)
533540
Ch.CheckParam(0 <= col && col < _colToActivesIndex.Length, nameof(col));
534541
return _colToActivesIndex[col] >= 0;
535542
}
543+
544+
/// <summary>
545+
/// Creates a in-order or shuffled sequence, based on whether _rand is specified.
546+
/// If unable to create a shuffle sequence, will default to sequential.
547+
/// </summary>
548+
/// <param name="size">Number of elements in the sequence.</param>
549+
/// <returns></returns>
550+
private IEnumerable<int> CreateOrderSequence(int size)
551+
{
552+
IEnumerable<int> order;
553+
try
554+
{
555+
order = _rand == null ? Enumerable.Range(0, size) : Utils.GetRandomPermutation(_rand, size);
556+
}
557+
catch (OutOfMemoryException)
558+
{
559+
order = Enumerable.Range(0, size);
560+
}
561+
return order;
562+
}
536563
}
537564

538565
#region Dispose
@@ -671,4 +698,4 @@ private string ConvertListToString(IList list)
671698
}
672699
}
673700
}
674-
}
701+
}

0 commit comments

Comments
 (0)