Skip to content

Commit 3c2ac5b

Browse files
refactor(rust): Remove FileType in favor of ReaderCapabilities for new-streaming multiscan (#21881)
1 parent 4fd8d57 commit 3c2ac5b

File tree

10 files changed

+182
-84
lines changed

10 files changed

+182
-84
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/polars-stream/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ description = "Private crate for the streaming execution engine for the Polars D
1212
arrow = { workspace = true }
1313
async-trait = { workspace = true }
1414
atomic-waker = { workspace = true }
15+
bitflags = { workspace = true }
1516
crossbeam-channel = { workspace = true }
1617
crossbeam-deque = { workspace = true }
1718
crossbeam-queue = { workspace = true }

crates/polars-stream/src/nodes/io_sources/multi_file_reader/extra_ops/apply.rs

+12-18
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ impl ApplyExtraOps {
6666
ExtraOperations {
6767
row_index,
6868
pre_slice,
69-
cast_columns,
70-
missing_columns,
69+
cast_columns_policy,
70+
missing_columns_policy,
7171
include_file_paths,
7272
predicate,
7373
},
@@ -78,15 +78,11 @@ impl ApplyExtraOps {
7878
// This should always be pushed to the reader, or otherwise handled separately.
7979
assert!(pre_slice.is_none());
8080

81-
let cast_columns = if let Some(policy) = cast_columns {
82-
CastColumns::try_init_from_policy(
83-
policy,
84-
&final_output_schema,
85-
incoming_schema,
86-
)?
87-
} else {
88-
None
89-
};
81+
let cast_columns = CastColumns::try_init_from_policy(
82+
cast_columns_policy,
83+
&final_output_schema,
84+
incoming_schema,
85+
)?;
9086

9187
let n_expected_extra_columns = final_output_schema.len()
9288
- incoming_schema.len()
@@ -95,13 +91,11 @@ impl ApplyExtraOps {
9591
let mut extra_columns: Vec<ScalarColumn> =
9692
Vec::with_capacity(n_expected_extra_columns);
9793

98-
if let Some(policy) = missing_columns {
99-
policy.initialize_policy(
100-
&projected_file_schema,
101-
incoming_schema,
102-
&mut extra_columns,
103-
)?;
104-
}
94+
missing_columns_policy.initialize_policy(
95+
&projected_file_schema,
96+
incoming_schema,
97+
&mut extra_columns,
98+
)?;
10599

106100
if let Some(hive_parts) = hive_parts {
107101
extra_columns.extend(hive_parts.df().get_columns().iter().map(|c| {

crates/polars-stream/src/nodes/io_sources/multi_file_reader/extra_ops/cast_columns.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use polars_core::schema::SchemaRef;
33
use polars_error::{PolarsResult, polars_bail};
44

55
/// TODO: Eventually move this enum to polars-plan
6-
#[derive(Debug, Clone)]
6+
#[derive(Debug, Clone, Default)]
77
pub enum CastColumnsPolicy {
88
/// Raise an error if the datatypes do not match
9+
#[default]
910
ErrorOnMismatch,
1011
}
1112

crates/polars-stream/src/nodes/io_sources/multi_file_reader/extra_ops/missing_columns.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use polars_core::schema::Schema;
55
use polars_error::{PolarsResult, polars_bail};
66

77
/// TODO: Eventually move this enum to polars-plan
8-
#[derive(Debug, Clone)]
8+
#[derive(Debug, Clone, Default)]
99
pub enum MissingColumnsPolicy {
10+
#[default]
1011
Raise,
1112
Insert,
1213
}

crates/polars-stream/src/nodes/io_sources/multi_file_reader/extra_ops/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ pub struct ExtraOperations {
2323
// Note: These fields are ordered according to when they (should be) applied.
2424
pub row_index: Option<RowIndex>,
2525
pub pre_slice: Option<Slice>,
26-
pub cast_columns: Option<CastColumnsPolicy>,
27-
pub missing_columns: Option<MissingColumnsPolicy>,
26+
pub cast_columns_policy: CastColumnsPolicy,
27+
pub missing_columns_policy: MissingColumnsPolicy,
2828
pub include_file_paths: Option<PlSmallStr>,
2929
pub predicate: Option<ScanIOPredicate>,
3030
}

crates/polars-stream/src/nodes/io_sources/multi_file_reader/reader_interface/builder.rs

+4-19
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,12 @@ use polars_io::cloud::CloudOptions;
77
use polars_plan::dsl::ScanSource;
88

99
use super::FileReader;
10-
11-
/// `FileReaderType` to avoid confusion with a `FileType` enum from polars-plan.
12-
#[derive(Debug, Clone, PartialEq)]
13-
pub enum FileReaderType {
14-
#[cfg(feature = "parquet")]
15-
Parquet,
16-
#[cfg(feature = "ipc")]
17-
#[expect(unused)]
18-
Ipc,
19-
#[cfg(feature = "csv")]
20-
#[expect(unused)]
21-
Csv,
22-
#[cfg(feature = "json")]
23-
NDJson,
24-
/// So that we can compile when all feature flags disabled.
25-
#[expect(unused)]
26-
Unknown,
27-
}
10+
use super::capabilities::ReaderCapabilities;
2811

2912
pub trait FileReaderBuilder: Debug + Send + Sync + 'static {
30-
fn file_type(&self) -> FileReaderType;
13+
fn reader_name(&self) -> &str;
14+
15+
fn reader_capabilities(&self) -> ReaderCapabilities;
3116

3217
fn build_file_reader(
3318
&self,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use bitflags::bitflags;
2+
3+
bitflags! {
4+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
5+
pub struct ReaderCapabilities: u8 {
6+
const ROW_INDEX = 1 << 0;
7+
const PRE_SLICE = 1 << 1;
8+
const NEGATIVE_PRE_SLICE = 1 << 2;
9+
const FILTER = 1 << 3;
10+
}
11+
}

crates/polars-stream/src/nodes/io_sources/multi_file_reader/reader_interface/mod.rs

+69-42
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
//! Interface for single-file readers
22
33
pub mod builder;
4+
pub mod capabilities;
45
pub mod output;
56

67
use async_trait::async_trait;
78
use output::FileReaderOutputRecv;
89
use polars_core::schema::SchemaRef;
910
use polars_error::PolarsResult;
11+
use polars_io::RowIndex;
12+
use polars_io::predicates::ScanIOPredicate;
1013
use polars_utils::IdxSize;
1114
use polars_utils::slice_enum::Slice;
1215

13-
use super::extra_ops::apply::ApplyExtraOps;
16+
use super::extra_ops::cast_columns::CastColumnsPolicy;
17+
use super::extra_ops::missing_columns::MissingColumnsPolicy;
1418
use crate::async_executor::JoinHandle;
1519

1620
/// Interface to read a single file
@@ -19,24 +23,12 @@ pub trait FileReader: Send + Sync {
1923
/// Initialize this FileReader. Intended to allow the reader to pre-fetch metadata.
2024
///
2125
/// This must be called before calling any other functions of the FileReader.
22-
///
23-
/// Returns the schema of the morsels that this FileReader will return.
2426
async fn initialize(&mut self) -> PolarsResult<()>;
2527

2628
/// Begin reading the file into morsels.
2729
fn begin_read(
2830
&self,
29-
// Note: This may contain more columns that what exist in the file. The reader should project
30-
// the ones that it finds. The remaining ones will be handled in post.
31-
projected_schema: &SchemaRef,
32-
extra_ops: ApplyExtraOps,
33-
34-
num_pipelines: usize,
35-
callbacks: FileReaderCallbacks,
36-
// TODO
37-
// We could introduce dynamic `Option<Box<dyn Any>>` for the reader to use. That would help
38-
// with e.g. synchronizing row group prefetches across multiple files in Parquet. Currently
39-
// every reader started concurrently will prefetch up to the row group prefetch limit.
31+
args: BeginReadArgs,
4032
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)>;
4133

4234
/// This FileReader must be initialized before calling this.
@@ -46,15 +38,16 @@ pub trait FileReader: Send + Sync {
4638
async fn n_rows_in_file(&self) -> PolarsResult<IdxSize> {
4739
let (tx, rx) = tokio::sync::oneshot::channel();
4840

49-
let (morsel_receivers, handle) = self.begin_read(
50-
&Default::default(), // pass empty schema
51-
ApplyExtraOps::Noop,
52-
1,
53-
FileReaderCallbacks {
41+
let (morsel_receivers, handle) = self.begin_read(BeginReadArgs {
42+
// Passing 0-0 slice indicates to the reader that we want the full row count, but it can
43+
// skip actually reading the data if it is able to.
44+
pre_slice: Some(Slice::Positive { offset: 0, len: 0 }),
45+
callbacks: FileReaderCallbacks {
5446
n_rows_in_file_tx: Some(tx),
5547
..Default::default()
5648
},
57-
)?;
49+
..Default::default()
50+
})?;
5851

5952
drop(morsel_receivers);
6053

@@ -75,15 +68,14 @@ pub trait FileReader: Send + Sync {
7568

7669
let (tx, rx) = tokio::sync::oneshot::channel();
7770

78-
let (mut morsel_receivers, handle) = self.begin_read(
79-
&Default::default(), // pass empty schema
80-
ApplyExtraOps::Noop,
81-
1,
82-
FileReaderCallbacks {
71+
let (mut morsel_receivers, handle) = self.begin_read(BeginReadArgs {
72+
pre_slice,
73+
callbacks: FileReaderCallbacks {
8374
row_position_on_end_tx: Some(tx),
8475
..Default::default()
8576
},
86-
)?;
77+
..Default::default()
78+
})?;
8779

8880
// We are using the `row_position_on_end` callback, this means we must fully consume all of
8981
// the morsels sent by the reader.
@@ -102,7 +94,54 @@ pub trait FileReader: Send + Sync {
10294
}
10395
}
10496

105-
#[derive(Default)]
97+
#[derive(Debug)]
98+
pub struct BeginReadArgs {
99+
/// Columns to project from the file.
100+
pub projected_schema: SchemaRef,
101+
102+
pub row_index: Option<RowIndex>,
103+
pub pre_slice: Option<Slice>,
104+
pub predicate: Option<ScanIOPredicate>,
105+
106+
/// User-configured policy for when datatypes do not match.
107+
///
108+
/// A reader may wish to use this if it is applying predicates.
109+
///
110+
/// This can be ignored by the reader, as the policy is also applied in post.
111+
#[expect(unused)]
112+
pub cast_columns_policy: CastColumnsPolicy,
113+
/// User-configured policy for when columns are not found in the file.
114+
///
115+
/// A reader may wish to use this if it is applying predicates.
116+
///
117+
/// This can be ignored by the reader, as the policy is also applied in post.
118+
pub missing_columns_policy: MissingColumnsPolicy,
119+
120+
pub num_pipelines: usize,
121+
pub callbacks: FileReaderCallbacks,
122+
// TODO
123+
// We could introduce dynamic `Option<Box<dyn Any>>` for the reader to use. That would help
124+
// with e.g. synchronizing row group prefetches across multiple files in Parquet. Currently
125+
// every reader started concurrently will prefetch up to the row group prefetch limit.
126+
}
127+
128+
impl Default for BeginReadArgs {
129+
fn default() -> Self {
130+
BeginReadArgs {
131+
projected_schema: SchemaRef::default(),
132+
row_index: None,
133+
pre_slice: None,
134+
predicate: None,
135+
// TODO: Use less restrictive default
136+
cast_columns_policy: CastColumnsPolicy::ErrorOnMismatch,
137+
missing_columns_policy: MissingColumnsPolicy::Insert,
138+
num_pipelines: 1,
139+
callbacks: FileReaderCallbacks::default(),
140+
}
141+
}
142+
}
143+
144+
#[derive(Debug, Default)]
106145
/// We have this to avoid a footgun of accidentally swapping the arguments.
107146
pub struct FileReaderCallbacks {
108147
/// Full file schema
@@ -112,10 +151,10 @@ pub struct FileReaderCallbacks {
112151
/// on the source. Prefer instead to use `row_position_on_end`, which can be much faster.
113152
///
114153
/// Notes:
154+
/// * All readers must ensure that this count is sent if requested, even if the output port
155+
/// closes prematurely, or a slice is sent.
115156
/// * Some readers will only send this after their output morsels to be fully consumed (or if
116157
/// their output port is dropped), so you should not block morsel consumption on waiting for this.
117-
/// * All readers must ensure that this count is sent if requested, even if the output port
118-
/// closes prematurely.
119158
pub n_rows_in_file_tx: Option<tokio::sync::oneshot::Sender<IdxSize>>,
120159

121160
/// Returns the row position reached by this reader.
@@ -139,19 +178,7 @@ pub fn calc_row_position_after_slice(n_rows_in_file: IdxSize, pre_slice: Option<
139178

140179
let out = match pre_slice {
141180
None => n_rows_in_file,
142-
143-
Some(Slice::Positive { offset, len }) => {
144-
let slice_end = offset.saturating_add(len);
145-
n_rows_in_file.min(slice_end)
146-
},
147-
148-
Some(Slice::Negative {
149-
offset_from_end,
150-
len,
151-
}) => {
152-
let n_from_end = offset_from_end.saturating_sub(len);
153-
n_rows_in_file.saturating_sub(n_from_end)
154-
},
181+
Some(v) => v.restrict_to_bounds(n_rows_in_file).end_position(),
155182
};
156183

157184
IdxSize::try_from(out).unwrap_or(IdxSize::MAX)

0 commit comments

Comments
 (0)