Skip to content

Add support for orc format #790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft

Conversation

MehulBatra
Copy link
Contributor

@MehulBatra MehulBatra commented Jun 3, 2024

  • Ability to read orc file format based Iceberg Table
  • Ability to create orc file format based Iceberg Table
  • Unit Test
  • Integration Test for Reads
  • Integration Test for Writes

@MehulBatra MehulBatra marked this pull request as draft June 3, 2024 17:55
@MehulBatra
Copy link
Contributor Author

MehulBatra commented Jun 5, 2024

Hi @Fokko and @HonahX
✅ I have modified the read logic to read the orc file-based iceberg table and wrote an integration test too it is working great.

Would love Some guidance on:

I could find a way to create an orc file-based iceberg table via glue client(by passing the properties with format=orc)

But this is still making parquet data files when I am appending the data ( Is it due to datafile and deletefile logic that they are by default taking parquet file format)

from pyiceberg.catalog import load_catalog
from decimal import Decimal
import pyarrow as pa

catalog = load_catalog("default") #my default catalog is glue
namespace = 'demo_ns'
table_name = 'test_table_dummy_orc_demo'
pylist = [{'decimal_col': Decimal('32768.1'), 'int_col': 1, 'string_col': "demo_one"},
          {'decimal_col': Decimal('44456.1'), 'int_col': 2, 'string_col': "demo_two"}]
arrow_schema = pa.schema(
    [
        pa.field('decimal_col', pa.decimal128(33, 1)),
        pa.field('int_col',  pa.int32()),
        pa.field('string_col', pa.string()),
    ],
)
arrow_table = pa.Table.from_pylist(pylist, schema=arrow_schema)
new_table = catalog.create_table(
    identifier=f'{namespace}.{table_name}',
    schema=arrow_schema,
    properties={
        'format': 'orc'
    }

table.append(arrow_table)

@MehulBatra
Copy link
Contributor Author

MehulBatra commented Jun 5, 2024

I believe we need to make a change in this write_file method to support ORC writes, as the link goes

write_file->dataframe_to_datafile->append || overwrite

at the end it is called by the user, please correct me if I am going towards the wrong direction

def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:

@HonahX HonahX linked an issue Jun 6, 2024 that may be closed by this pull request
@HonahX
Copy link
Contributor

HonahX commented Jun 10, 2024

Hi @MehulBatra. Thanks for taking this! It looks like a great start.

I believe we need to make a change in this write_file method to support ORC writes, as the link goes

Yes, I think this is the right place to add the ORC write logic. As you have noticed, the datafile format is controlled by the table property write.default.format. Currently we do not support this property in pyiceberg since we assume the format is parquet.

We can add the property in

class TableProperties:
PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB
PARQUET_ROW_GROUP_LIMIT = "write.parquet.row-group-limit"
PARQUET_ROW_GROUP_LIMIT_DEFAULT = 128 * 1024 * 1024 # 128 MB

and doc it here:
## Write options
| Key | Options | Default | Description |
| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |

In the write_file, we check the write.default.format property and write to the correct format. For statistics, we may need a data_file_statistics_from_orc similar to

def data_file_statistics_from_parquet_metadata(
parquet_metadata: pq.FileMetaData,
stats_columns: Dict[int, StatisticsCollector],
parquet_column_mapping: Dict[str, int],
) -> DataFileStatistics:
"""
Compute and return DataFileStatistics that includes the following.
- record_count
- column_sizes
- value_counts
- null_value_counts
- nan_value_counts
- column_aggregates
- split_offsets
Args:
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
set the mode for column metrics collection
parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
"""
if parquet_metadata.num_columns != len(stats_columns):
raise ValueError(
f"Number of columns in statistics configuration ({len(stats_columns)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})"

(we can make statistics collection as a follow-up feature since most statistics fields are optional)

@MehulBatra
Copy link
Contributor Author

Thanks, @HonahX for the feedback, I will consider all this while moving forward!

Copy link
Contributor

@HonahX HonahX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some comments for the read side.

We may try to merge the read support first and make write support a separate PR. WDYT?

if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)))
else None
)
if field.metadata and (field_id_str := field.metadata.get(PYARROW_ORC_FIELD_ID_KEY)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to add a doc somewhere to mention that ORC read support requires pyarrow version >= 13.0.0, since the orc metadata is exposed in pyarrow schema in 13.0.0 apache/arrow#35304

Also, shall we check PYARROW_PARQUET_FIELD_ID_KEY first since parquet is the default file format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the right approach I will promote Parquet at the top then ORC and will make necessary doc changes while releasing ORC read it will definitely help users to get started, thanks for the feedback.

Comment on lines +919 to +921
if primitive.unit == "ns":
if primitive.tz == "UTC":
return TimestamptzType()
Copy link
Contributor

@HonahX HonahX Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nanosecond timestamp is added in version 3, which is still under development and not formally adopted. Pyiceberg does not support nanosecond yet so I think we should not add the conversion here. (and it should be converted to a separate TimestampNanoType in the future: https://iceberg.apache.org/spec/#primitive-types )

I think you add this because arrow reads ORC timestamp as nanoseconds since ORC's timestamp types always contain nanosecond information, but we want to read this as microsecond.

It seems Java side currently just treat ORC's timestamp type as us unit one

We could probably fix this at arrow schema level. For example, we can add an additional conversion for physical_schema here to change the unit of arrow timestamp from ns to us:

fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
file_schema = pyarrow_to_schema(physical_schema, name_mapping)

Changing the physical schema also ensures that the actual timestamp data is read with us unit as required by TimestampType
fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
columns=[col.name for col in file_project_schema.columns],

However, I also feel it is not the ultimate solution because we assume the unit is microsecond. When TimestampNanoType is in, we may need to do some additional steps to ensure we reads the data using the correct unit.

@MehulBatra @Fokko Would love to hear your thoughts on this. Please correct me if I make any mistakes about the ORC's behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @HonahX you got it right arrow is reading the ORC timestamp unit as ns that's why I added a schema conversion at the primitive types even after being aware that we will be supporting nanoseconds in the coming version3, but I feel your suggestion makes sense till the time we start supporting version3 altogether.
I will try to incorporate the changes to read ns as us.
@Fokko what do you think are we good with this, for the time being?

@MehulBatra
Copy link
Contributor Author

I've added some comments for the read side.

We may try to merge the read support first and make write support a separate PR. WDYT?

@HonahX Works for me and I believe it will also benefit the community to get unblocked at least on the read side meanwhile we can grind on the write support, I have already started on the write support changes I will raise a separate PR for the same and attach it on #20

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ORC file format support
2 participants