Skip to content

Smcbeth/series write parquet #54675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,125 @@ def to_markdown(
buf, mode=mode, index=index, storage_options=storage_options, **kwargs
)

@overload
def to_parquet(
self,
path: None = ...,
engine: Literal["auto", "pyarrow", "fastparquet"] = ...,
compression: str | None = ...,
index: bool | None = ...,
partition_cols: list[str] | None = ...,
storage_options: StorageOptions = ...,
**kwargs,
) -> bytes:
...

@overload
def to_parquet(
self,
path: FilePath | WriteBuffer[bytes],
engine: Literal["auto", "pyarrow", "fastparquet"] = ...,
compression: str | None = ...,
index: bool | None = ...,
partition_cols: list[str] | None = ...,
storage_options: StorageOptions = ...,
**kwargs,
) -> None:
...

@doc(storage_options=_shared_docs["storage_options"])
def to_parquet(
self,
path: FilePath | WriteBuffer[bytes] | None = None,
engine: Literal["auto", "pyarrow", "fastparquet"] = "auto",
compression: str | None = "snappy",
index: bool | None = None,
partition_cols: list[str] | None = None,
storage_options: StorageOptions | None = None,
**kwargs,
) -> bytes | None:
"""
Write a Series to the binary parquet format.
This function writes the series as a `parquet file
<https://parquet.apache.org/>`_. You can choose different parquet
backends, and have the option of compression. See
:ref:`the user guide <io.parquet>` for more details.
Parameters
----------
path : str, path object, file-like object, or None, default None
String, path object (implementing ``os.PathLike[str]``), or file-like
object implementing a binary ``write()`` function. If None, the result is
returned as bytes. If a string or path, it will be used as Root Directory
path when writing a partitioned dataset.
.. versionchanged:: 1.2.0
Previously this was "fname"
engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
behavior is to try 'pyarrow', falling back to 'fastparquet' if
'pyarrow' is unavailable.
compression : str or None, default 'snappy'
Name of the compression to use. Use ``None`` for no compression.
Supported options: 'snappy', 'gzip', 'brotli', 'lz4', 'zstd'.
index : bool, default None
If ``True``, include the dataframe's index(es) in the file output.
If ``False``, they will not be written to the file.
If ``None``, similar to ``True`` the dataframe's index(es)
will be saved. However, instead of being saved as values,
the RangeIndex will be stored as a range in the metadata so it
doesn't require much space and is faster. Other indexes will
be included as columns in the file output.
partition_cols : list, optional, default None
Column names by which to partition the dataset.
Columns are partitioned in the order they are given.
Must be None if path is not a string.
{storage_options}
.. versionadded:: 1.2.0
**kwargs
Additional arguments passed to the parquet library. See
:ref:`pandas io <io.parquet>` for more details.
Returns
-------
bytes if no path argument is provided else None
See Also
--------
To do: Add other io methods to Series
Notes
-----
This function requires either the `fastparquet
<https://pypi.org/project/fastparquet>`_ or `pyarrow
<https://arrow.apache.org/docs/python/>`_ library.
Examples
--------
>>> df = pd.DataFrame(data={{'col1': [1, 2], 'col2': [3, 4]}})
>>> df.to_parquet('df.parquet.gzip',
... compression='gzip') # doctest: +SKIP
>>> pd.read_parquet('df.parquet.gzip') # doctest: +SKIP
col1 col2
0 1 3
1 2 4
If you want to get a buffer to the parquet content you can use a io.BytesIO
object, as long as you don't use partition_cols, which creates multiple files.
>>> import io
>>> f = io.BytesIO()
>>> df.to_parquet(f)
>>> f.seek(0)
0
>>> content = f.read()
"""
from pandas.io.parquet import to_parquet

return to_parquet(
self,
path,
engine,
compression=compression,
index=index,
partition_cols=partition_cols,
storage_options=storage_options,
**kwargs,
)

# ----------------------------------------------------------------------

def items(self) -> Iterable[tuple[Hashable, Any]]:
Expand Down
37 changes: 20 additions & 17 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pandas as pd
from pandas import (
DataFrame,
Series,
get_option,
)
from pandas.core.shared_docs import _shared_docs
Expand Down Expand Up @@ -146,9 +147,9 @@ def _get_path_or_handle(

class BaseImpl:
@staticmethod
def validate_dataframe(df: DataFrame) -> None:
if not isinstance(df, DataFrame):
raise ValueError("to_parquet only supports IO with DataFrames")
def validate_data(data: DataFrame | Series) -> None:
if not isinstance(data, DataFrame) and not isinstance(data, Series):
raise ValueError("to_parquet only supports IO with DataFrames and Series")

def write(self, df: DataFrame, path, compression, **kwargs):
raise AbstractMethodError(self)
Expand All @@ -171,7 +172,7 @@ def __init__(self) -> None:

def write(
self,
df: DataFrame,
data: DataFrame | Series,
path: FilePath | WriteBuffer[bytes],
compression: str | None = "snappy",
index: bool | None = None,
Expand All @@ -180,18 +181,20 @@ def write(
filesystem=None,
**kwargs,
) -> None:
self.validate_dataframe(df)
self.validate_data(data)

from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)}
if index is not None:
from_pandas_kwargs["preserve_index"] = index
if isinstance(data, Series):
table = self.api.Table.from_pandas(data.to_frame(), **from_pandas_kwargs)
else:
table = self.api.Table.from_pandas(data, **from_pandas_kwargs)

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)

if df.attrs:
df_metadata = {"PANDAS_ATTRS": json.dumps(df.attrs)}
if data.attrs:
data_metadata = {"PANDAS_ATTRS": json.dumps(data.attrs)}
existing_metadata = table.schema.metadata
merged_metadata = {**existing_metadata, **df_metadata}
merged_metadata = {**existing_metadata, **data_metadata}
table = table.replace_schema_metadata(merged_metadata)

path_or_handle, handles, filesystem = _get_path_or_handle(
Expand Down Expand Up @@ -302,7 +305,7 @@ def __init__(self) -> None:

def write(
self,
df: DataFrame,
data: DataFrame | Series,
path,
compression: Literal["snappy", "gzip", "brotli"] | None = "snappy",
index=None,
Expand All @@ -311,7 +314,7 @@ def write(
filesystem=None,
**kwargs,
) -> None:
self.validate_dataframe(df)
self.validate_data(data)

if "partition_on" in kwargs and partition_cols is not None:
raise ValueError(
Expand Down Expand Up @@ -346,7 +349,7 @@ def write(
with catch_warnings(record=True):
self.api.write(
path,
df,
data,
compression=compression,
write_index=index,
partition_on=partition_cols,
Expand Down Expand Up @@ -406,7 +409,7 @@ def read(

@doc(storage_options=_shared_docs["storage_options"])
def to_parquet(
df: DataFrame,
data: DataFrame | Series,
path: FilePath | WriteBuffer[bytes] | None = None,
engine: str = "auto",
compression: str | None = "snappy",
Expand All @@ -417,11 +420,11 @@ def to_parquet(
**kwargs,
) -> bytes | None:
"""
Write a DataFrame to the parquet format.
Write a DataFrame or a Series to the parquet format.

Parameters
----------
df : DataFrame
data : DataFrame or Series
path : str, path object, file-like object, or None, default None
String, path object (implementing ``os.PathLike[str]``), or file-like
object implementing a binary ``write()`` function. If None, the result is
Expand Down Expand Up @@ -481,7 +484,7 @@ def to_parquet(
path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path

impl.write(
df,
data,
path_or_buf,
compression=compression,
index=index,
Expand Down