Skip to content

Commit ed47ef1

Browse files
authored
feat: add write_engine parameter to read_FORMATNAME methods to control how data is written to BigQuery (#371)
* feat: add `write_engine` parameter to `read_FORMATNAME` methods to control how data is written to BigQuery * set constraints to allow unit tests to pass * allow binary columns to be inlined * bump minimum pandas-gbq to 0.25.0 * use pandas-gbq schema detection in load jobs too * Update bigframes/session/__init__.py * fix mypy * fix `tests/system/small/test_session.py::test_read_pandas_w_unsupported_mixed_dtype` test * add advice for which write_engine to use * add unit tests * use prerelease version of pandas-gbq * update to pandas-gbq 0.26.0
1 parent 840aaff commit ed47ef1

File tree

19 files changed

+461
-98
lines changed

19 files changed

+461
-98
lines changed

bigframes/dtypes.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -406,12 +406,19 @@ def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype:
406406
return pd.ArrowDtype(arrow_dtype)
407407
if pa.types.is_struct(arrow_dtype):
408408
return pd.ArrowDtype(arrow_dtype)
409+
410+
# BigFrames doesn't distinguish between string and large_string because the
411+
# largest string (2 GB) is already larger than the largest BigQuery row.
412+
if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype):
413+
return STRING_DTYPE
414+
409415
if arrow_dtype == pa.null():
410416
return DEFAULT_DTYPE
411-
else:
412-
raise ValueError(
413-
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
414-
)
417+
418+
# No other types matched.
419+
raise ValueError(
420+
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
421+
)
415422

416423

417424
_BIGFRAMES_TO_ARROW = {

bigframes/pandas/io/api.py

+34-5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
Union,
3131
)
3232

33+
import bigframes_vendored.constants as constants
3334
import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq
3435
from google.cloud import bigquery
3536
import numpy
@@ -103,6 +104,7 @@ def read_csv(
103104
Literal["c", "python", "pyarrow", "python-fwf", "bigquery"]
104105
] = None,
105106
encoding: Optional[str] = None,
107+
write_engine: constants.WriteEngineType = "default",
106108
**kwargs,
107109
) -> bigframes.dataframe.DataFrame:
108110
return global_session.with_default_session(
@@ -116,6 +118,7 @@ def read_csv(
116118
dtype=dtype,
117119
engine=engine,
118120
encoding=encoding,
121+
write_engine=write_engine,
119122
**kwargs,
120123
)
121124

@@ -133,6 +136,7 @@ def read_json(
133136
encoding: Optional[str] = None,
134137
lines: bool = False,
135138
engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson",
139+
write_engine: constants.WriteEngineType = "default",
136140
**kwargs,
137141
) -> bigframes.dataframe.DataFrame:
138142
return global_session.with_default_session(
@@ -143,6 +147,7 @@ def read_json(
143147
encoding=encoding,
144148
lines=lines,
145149
engine=engine,
150+
write_engine=write_engine,
146151
**kwargs,
147152
)
148153

@@ -243,24 +248,41 @@ def read_gbq_table(
243248

244249

245250
@typing.overload
246-
def read_pandas(pandas_dataframe: pandas.DataFrame) -> bigframes.dataframe.DataFrame:
251+
def read_pandas(
252+
pandas_dataframe: pandas.DataFrame,
253+
*,
254+
write_engine: constants.WriteEngineType = "default",
255+
) -> bigframes.dataframe.DataFrame:
247256
...
248257

249258

250259
@typing.overload
251-
def read_pandas(pandas_dataframe: pandas.Series) -> bigframes.series.Series:
260+
def read_pandas(
261+
pandas_dataframe: pandas.Series,
262+
*,
263+
write_engine: constants.WriteEngineType = "default",
264+
) -> bigframes.series.Series:
252265
...
253266

254267

255268
@typing.overload
256-
def read_pandas(pandas_dataframe: pandas.Index) -> bigframes.core.indexes.Index:
269+
def read_pandas(
270+
pandas_dataframe: pandas.Index,
271+
*,
272+
write_engine: constants.WriteEngineType = "default",
273+
) -> bigframes.core.indexes.Index:
257274
...
258275

259276

260-
def read_pandas(pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index]):
277+
def read_pandas(
278+
pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index],
279+
*,
280+
write_engine: constants.WriteEngineType = "default",
281+
):
261282
return global_session.with_default_session(
262283
bigframes.session.Session.read_pandas,
263284
pandas_dataframe,
285+
write_engine=write_engine,
264286
)
265287

266288

@@ -271,25 +293,32 @@ def read_pickle(
271293
filepath_or_buffer: FilePath | ReadPickleBuffer,
272294
compression: CompressionOptions = "infer",
273295
storage_options: StorageOptions = None,
296+
*,
297+
write_engine: constants.WriteEngineType = "default",
274298
):
275299
return global_session.with_default_session(
276300
bigframes.session.Session.read_pickle,
277301
filepath_or_buffer=filepath_or_buffer,
278302
compression=compression,
279303
storage_options=storage_options,
304+
write_engine=write_engine,
280305
)
281306

282307

283308
read_pickle.__doc__ = inspect.getdoc(bigframes.session.Session.read_pickle)
284309

285310

286311
def read_parquet(
287-
path: str | IO["bytes"], *, engine: str = "auto"
312+
path: str | IO["bytes"],
313+
*,
314+
engine: str = "auto",
315+
write_engine: constants.WriteEngineType = "default",
288316
) -> bigframes.dataframe.DataFrame:
289317
return global_session.with_default_session(
290318
bigframes.session.Session.read_parquet,
291319
path,
292320
engine=engine,
321+
write_engine=write_engine,
293322
)
294323

295324

0 commit comments

Comments
 (0)