Skip to content

Commit 443ceb9

Browse files
authored
Add missing implementation for datetime relevant arrow type into dataframe (#6675)
* Add missing implementation for datetime relevant arrow type * Return required usage
1 parent 4c799ab commit 443ceb9

File tree

7 files changed

+60
-31
lines changed

7 files changed

+60
-31
lines changed

eng/Versions.props

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<SystemTextJsonVersion>6.0.1</SystemTextJsonVersion>
3131
<SystemThreadingChannelsVersion>4.7.1</SystemThreadingChannelsVersion>
3232
<!-- Other product dependencies -->
33-
<ApacheArrowVersion>2.0.0</ApacheArrowVersion>
33+
<ApacheArrowVersion>11.0.0</ApacheArrowVersion>
3434
<GoogleProtobufVersion>3.19.6</GoogleProtobufVersion>
3535
<LightGBMVersion>2.3.1</LightGBMVersion>
3636
<MicrosoftCodeAnalysisAnalyzersVersion>3.3.0</MicrosoftCodeAnalysisAnalyzersVersion>

src/Microsoft.Data.Analysis/DataFrame.Arrow.cs

+12-3
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,18 @@ private static void AppendDataFrameColumnFromArrowArray(Field field, IArrowArray
101101
AppendDataFrameColumnFromArrowArray(fieldsEnumerator.Current, structArrayEnumerator.Current, ret, field.Name + "_");
102102
}
103103
break;
104-
case ArrowTypeId.Decimal:
104+
case ArrowTypeId.Date64:
105+
Date64Array arrowDate64Array = (Date64Array)arrowArray;
106+
dataFrameColumn = new DateTimeDataFrameColumn(fieldName, arrowDate64Array.Data.Length);
107+
for (int i = 0; i < arrowDate64Array.Data.Length; i++)
108+
{
109+
dataFrameColumn[i] = arrowDate64Array.GetDateTime(i);
110+
}
111+
break;
112+
case ArrowTypeId.Decimal128:
113+
case ArrowTypeId.Decimal256:
105114
case ArrowTypeId.Binary:
106115
case ArrowTypeId.Date32:
107-
case ArrowTypeId.Date64:
108116
case ArrowTypeId.Dictionary:
109117
case ArrowTypeId.FixedSizedBinary:
110118
case ArrowTypeId.HalfFloat:
@@ -114,6 +122,7 @@ private static void AppendDataFrameColumnFromArrowArray(Field field, IArrowArray
114122
case ArrowTypeId.Null:
115123
case ArrowTypeId.Time32:
116124
case ArrowTypeId.Time64:
125+
case ArrowTypeId.Timestamp:
117126
default:
118127
throw new NotImplementedException($"{fieldType.Name}");
119128
}
@@ -145,7 +154,7 @@ public static DataFrame FromArrowRecordBatch(RecordBatch recordBatch)
145154
}
146155

147156
/// <summary>
148-
/// Returns an <see cref="IEnumerable{RecordBatch}"/> without copying data
157+
/// Returns an <see cref="IEnumerable{RecordBatch}"/> mostly without copying data
149158
/// </summary>
150159
public IEnumerable<RecordBatch> ToArrowRecordBatches()
151160
{

src/Microsoft.Data.Analysis/DataFrame.IO.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ private static DataFrameColumn CreateColumn(Type kind, string columnName)
336336
}
337337
else if (kind == typeof(DateTime))
338338
{
339-
ret = new PrimitiveDataFrameColumn<DateTime>(columnName);
339+
ret = new DateTimeDataFrameColumn(columnName);
340340
}
341341
else
342342
{

src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs

-12
Original file line numberDiff line numberDiff line change
@@ -374,18 +374,6 @@ internal int MaxRecordBatchLength(long startIndex)
374374
return Buffers[arrayIndex].Length - (int)startIndex;
375375
}
376376

377-
internal ReadOnlyMemory<byte> GetValueBuffer(long startIndex)
378-
{
379-
int arrayIndex = GetArrayContainingRowIndex(startIndex);
380-
return Buffers[arrayIndex].ReadOnlyBuffer;
381-
}
382-
383-
internal ReadOnlyMemory<byte> GetNullBuffer(long startIndex)
384-
{
385-
int arrayIndex = GetArrayContainingRowIndex(startIndex);
386-
return NullBitMapBuffers[arrayIndex].ReadOnlyBuffer;
387-
}
388-
389377
public IReadOnlyList<T?> this[long startIndex, int length]
390378
{
391379
get

src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs

+44-13
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Collections;
77
using System.Collections.Generic;
88
using System.Diagnostics;
9+
using System.Runtime.InteropServices;
910
using Apache.Arrow;
1011
using Apache.Arrow.Types;
1112
using Microsoft.ML;
@@ -103,6 +104,8 @@ private IArrowType GetArrowType()
103104
return UInt64Type.Default;
104105
else if (typeof(T) == typeof(ushort))
105106
return UInt16Type.Default;
107+
else if (typeof(T) == typeof(DateTime))
108+
return Date64Type.Default;
106109
else
107110
throw new NotImplementedException(nameof(T));
108111
}
@@ -126,36 +129,64 @@ protected internal override Apache.Arrow.Array ToArrowArray(long startIndex, int
126129
{
127130
int arrayIndex = numberOfRows == 0 ? 0 : _columnContainer.GetArrayContainingRowIndex(startIndex);
128131
int offset = (int)(startIndex - arrayIndex * ReadOnlyDataFrameBuffer<T>.MaxCapacity);
132+
129133
if (numberOfRows != 0 && numberOfRows > _columnContainer.Buffers[arrayIndex].Length - offset)
130134
{
131135
throw new ArgumentException(Strings.SpansMultipleBuffers, nameof(numberOfRows));
132136
}
133-
ArrowBuffer valueBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.GetValueBuffer(startIndex));
134-
ArrowBuffer nullBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.GetNullBuffer(startIndex));
137+
135138
int nullCount = GetNullCount(startIndex, numberOfRows);
139+
140+
//DateTime requires convertion
141+
if (this.DataType == typeof(DateTime))
142+
{
143+
if (numberOfRows == 0)
144+
return new Date64Array(ArrowBuffer.Empty, ArrowBuffer.Empty, numberOfRows, nullCount, offset);
145+
146+
ReadOnlyDataFrameBuffer<T> valueBuffer = (numberOfRows == 0) ? null : _columnContainer.Buffers[arrayIndex];
147+
ReadOnlyDataFrameBuffer<byte> nullBuffer = (numberOfRows == 0) ? null : _columnContainer.NullBitMapBuffers[arrayIndex];
148+
149+
ReadOnlySpan<DateTime> valueSpan = MemoryMarshal.Cast<T, DateTime>(valueBuffer.ReadOnlySpan);
150+
Date64Array.Builder builder = new Date64Array.Builder().Reserve(valueBuffer.Length);
151+
152+
for (int i = 0; i < valueBuffer.Length; i++)
153+
{
154+
if (BitUtility.GetBit(nullBuffer.ReadOnlySpan, i))
155+
builder.Append(valueSpan[i]);
156+
else
157+
builder.AppendNull();
158+
}
159+
160+
return builder.Build();
161+
}
162+
163+
//No convertion
164+
ArrowBuffer arrowValueBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.Buffers[arrayIndex].ReadOnlyBuffer);
165+
ArrowBuffer arrowNullBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.NullBitMapBuffers[arrayIndex].ReadOnlyBuffer);
166+
136167
Type type = this.DataType;
137168
if (type == typeof(bool))
138-
return new BooleanArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
169+
return new BooleanArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
139170
else if (type == typeof(double))
140-
return new DoubleArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
171+
return new DoubleArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
141172
else if (type == typeof(float))
142-
return new FloatArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
173+
return new FloatArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
143174
else if (type == typeof(int))
144-
return new Int32Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
175+
return new Int32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
145176
else if (type == typeof(long))
146-
return new Int64Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
177+
return new Int64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
147178
else if (type == typeof(sbyte))
148-
return new Int8Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
179+
return new Int8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
149180
else if (type == typeof(short))
150-
return new Int16Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
181+
return new Int16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
151182
else if (type == typeof(uint))
152-
return new UInt32Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
183+
return new UInt32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
153184
else if (type == typeof(ulong))
154-
return new UInt64Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
185+
return new UInt64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
155186
else if (type == typeof(ushort))
156-
return new UInt16Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
187+
return new UInt16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
157188
else if (type == typeof(byte))
158-
return new UInt8Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
189+
return new UInt8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
159190
else
160191
throw new NotImplementedException(type.ToString());
161192
}

test/Microsoft.Data.Analysis.Tests/ArrowIntegrationTests.cs

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void TestArrowIntegration()
4848
.Append("ULongColumn", false, new UInt64Array.Builder().AppendRange(Enumerable.Repeat((ulong)1, 10)).Build())
4949
.Append("ByteColumn", false, new Int8Array.Builder().AppendRange(Enumerable.Repeat((sbyte)1, 10)).Build())
5050
.Append("UByteColumn", false, new UInt8Array.Builder().AppendRange(Enumerable.Repeat((byte)1, 10)).Build())
51+
.Append("Date64Column", false, new Date64Array.Builder().AppendRange(Enumerable.Repeat(DateTime.Now, 10)).Build())
5152
.Build();
5253

5354
DataFrame df = DataFrame.FromArrowRecordBatch(originalBatch);

test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ internal static void VerifyColumnTypes(DataFrame df, bool testArrowStringColumn
102102
}
103103
else if (dataType == typeof(DateTime))
104104
{
105-
Assert.IsType<PrimitiveDataFrameColumn<DateTime>>(column);
105+
Assert.IsType<DateTimeDataFrameColumn>(column);
106106
}
107107
else
108108
{

0 commit comments

Comments
 (0)