Skip to content

Commit fda3191

Browse files
authored
feat: Add support for io-plugins in new-streaming (#21870)
1 parent d95e343 commit fda3191

File tree

11 files changed

+393
-44
lines changed

11 files changed

+393
-44
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/polars-mem-engine/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@ mod predicate;
44
mod prelude;
55

66
pub use executors::Executor;
7+
#[cfg(feature = "python")]
8+
pub use planner::python_scan_predicate;
79
pub use planner::{create_multiple_physical_plans, create_physical_plan, create_scan_predicate};
810
pub use predicate::ScanPredicate;

crates/polars-mem-engine/src/planner/lp.rs

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,58 @@ pub fn create_multiple_physical_plans(
134134
})
135135
}
136136

137+
#[cfg(feature = "python")]
138+
#[allow(clippy::type_complexity)]
139+
pub fn python_scan_predicate(
140+
options: &mut PythonOptions,
141+
expr_arena: &Arena<AExpr>,
142+
state: &mut ExpressionConversionState,
143+
) -> PolarsResult<(
144+
Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
145+
Option<Vec<u8>>,
146+
)> {
147+
let mut predicate_serialized = None;
148+
let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
149+
// Convert to a pyarrow eval string.
150+
if matches!(options.python_source, PythonScanSource::Pyarrow) {
151+
if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
152+
e.node(),
153+
expr_arena,
154+
Default::default(),
155+
) {
156+
options.predicate = PythonPredicate::PyArrow(eval_str);
157+
// We don't have to use a physical expression as pyarrow deals with the filter.
158+
None
159+
} else {
160+
Some(create_physical_expr(
161+
e,
162+
Context::Default,
163+
expr_arena,
164+
&options.schema,
165+
state,
166+
)?)
167+
}
168+
}
169+
// Convert to physical expression for the case the reader cannot consume the predicate.
170+
else {
171+
let dsl_expr = e.to_expr(expr_arena);
172+
predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
173+
174+
Some(create_physical_expr(
175+
e,
176+
Context::Default,
177+
expr_arena,
178+
&options.schema,
179+
state,
180+
)?)
181+
}
182+
} else {
183+
None
184+
};
185+
186+
Ok((predicate, predicate_serialized))
187+
}
188+
137189
#[recursive]
138190
fn create_physical_plan_impl(
139191
root: Node,
@@ -160,45 +212,9 @@ fn create_physical_plan_impl(
160212
match logical_plan {
161213
#[cfg(feature = "python")]
162214
PythonScan { mut options } => {
163-
let mut predicate_serialized = None;
164-
165-
let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
166-
let phys_expr = || {
167-
let mut state = ExpressionConversionState::new(true, state.expr_depth);
168-
create_physical_expr(
169-
e,
170-
Context::Default,
171-
expr_arena,
172-
&options.schema,
173-
&mut state,
174-
)
175-
};
176-
177-
// Convert to a pyarrow eval string.
178-
if matches!(options.python_source, PythonScanSource::Pyarrow) {
179-
if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
180-
e.node(),
181-
expr_arena,
182-
Default::default(),
183-
) {
184-
options.predicate = PythonPredicate::PyArrow(eval_str);
185-
// We don't have to use a physical expression as pyarrow deals with the filter.
186-
None
187-
} else {
188-
Some(phys_expr()?)
189-
}
190-
}
191-
// Convert to physical expression for the case the reader cannot consume the predicate.
192-
else {
193-
let dsl_expr = e.to_expr(expr_arena);
194-
predicate_serialized =
195-
polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
196-
197-
Some(phys_expr()?)
198-
}
199-
} else {
200-
None
201-
};
215+
let mut expr_conv_state = ExpressionConversionState::new(true, state.expr_depth);
216+
let (predicate, predicate_serialized) =
217+
python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
202218
Ok(Box::new(executors::PythonScanExec {
203219
options,
204220
predicate,

crates/polars-stream/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ percent-encoding = { workspace = true }
2424
pin-project-lite = { workspace = true }
2525
polars-io = { workspace = true, features = ["async"] }
2626
polars-utils = { workspace = true }
27+
pyo3 = { workspace = true, optional = true }
2728
rand = { workspace = true }
2829
rayon = { workspace = true }
2930
recursive = { workspace = true }
@@ -54,7 +55,7 @@ json = ["polars-mem-engine/json", "polars-plan/json", "polars-io/json"]
5455
cloud = ["polars-mem-engine/cloud", "polars-plan/cloud", "polars-io/cloud"]
5556
dtype-categorical = ["polars-core/dtype-categorical"]
5657
object = ["polars-ops/object"]
57-
python = ["polars-plan/python"]
58+
python = ["pyo3", "polars-plan/python"]
5859

5960
# We need to specify default features here to match workspace defaults.
6061
# Otherwise we get warnings with cargo check/clippy.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
use polars_core::frame::DataFrame;
2+
use polars_core::schema::SchemaRef;
3+
use polars_error::{PolarsResult, polars_err};
4+
use polars_utils::pl_str::PlSmallStr;
5+
use polars_utils::{IdxSize, format_pl_smallstr};
6+
use tokio::sync::oneshot;
7+
8+
use super::{
9+
JoinHandle, Morsel, MorselSeq, SourceNode, SourceOutput, StreamingExecutionState, TaskPriority,
10+
};
11+
use crate::async_executor::spawn;
12+
use crate::async_primitives::connector::Receiver;
13+
use crate::async_primitives::wait_group::WaitGroup;
14+
use crate::morsel::SourceToken;
15+
16+
type GetBatchFn =
17+
Box<dyn Fn(&StreamingExecutionState) -> PolarsResult<Option<DataFrame>> + Send + Sync>;
18+
19+
pub struct BatchSourceNode {
20+
pub name: PlSmallStr,
21+
pub output_schema: SchemaRef,
22+
pub get_batch_fn: Option<GetBatchFn>,
23+
}
24+
25+
impl BatchSourceNode {
26+
pub fn new(name: &str, output_schema: SchemaRef, get_batch_fn: Option<GetBatchFn>) -> Self {
27+
let name = format_pl_smallstr!("batch_source[{name}]");
28+
Self {
29+
name,
30+
output_schema,
31+
get_batch_fn,
32+
}
33+
}
34+
}
35+
36+
impl SourceNode for BatchSourceNode {
37+
fn name(&self) -> &str {
38+
self.name.as_str()
39+
}
40+
41+
fn is_source_output_parallel(&self, _is_receiver_serial: bool) -> bool {
42+
false
43+
}
44+
45+
fn spawn_source(
46+
&mut self,
47+
mut output_recv: Receiver<SourceOutput>,
48+
state: &StreamingExecutionState,
49+
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
50+
unrestricted_row_count: Option<oneshot::Sender<polars_utils::IdxSize>>,
51+
) {
52+
// We only spawn this once, so this is all fine.
53+
let output_schema = self.output_schema.clone();
54+
let get_batch_fn = self.get_batch_fn.take().unwrap();
55+
let state = state.clone();
56+
join_handles.push(spawn(TaskPriority::Low, async move {
57+
let mut seq = MorselSeq::default();
58+
let mut n_rows_seen = 0;
59+
60+
'phase_loop: while let Ok(phase_output) = output_recv.recv().await {
61+
let mut sender = phase_output.port.serial();
62+
let source_token = SourceToken::new();
63+
let wait_group = WaitGroup::default();
64+
65+
loop {
66+
let df = (get_batch_fn)(&state)?;
67+
let Some(df) = df else {
68+
if let Some(unrestricted_row_count) = unrestricted_row_count {
69+
if unrestricted_row_count.send(n_rows_seen).is_err() {
70+
return Ok(());
71+
}
72+
}
73+
74+
if n_rows_seen == 0 {
75+
let morsel = Morsel::new(
76+
DataFrame::empty_with_schema(output_schema.as_ref()),
77+
seq,
78+
source_token.clone(),
79+
);
80+
if sender.send(morsel).await.is_err() {
81+
return Ok(());
82+
}
83+
}
84+
85+
break 'phase_loop;
86+
};
87+
88+
let num_rows = IdxSize::try_from(df.height()).map_err(|_| {
89+
polars_err!(bigidx, ctx = "batch source", size = df.height())
90+
})?;
91+
n_rows_seen = n_rows_seen.checked_add(num_rows).ok_or_else(|| {
92+
polars_err!(
93+
bigidx,
94+
ctx = "batch source",
95+
size = n_rows_seen as usize + num_rows as usize
96+
)
97+
})?;
98+
99+
let mut morsel = Morsel::new(df, seq, source_token.clone());
100+
morsel.set_consume_token(wait_group.token());
101+
seq = seq.successor();
102+
103+
if sender.send(morsel).await.is_err() {
104+
return Ok(());
105+
}
106+
107+
wait_group.wait().await;
108+
if source_token.stop_requested() {
109+
phase_output.outcome.stop();
110+
continue 'phase_loop;
111+
}
112+
}
113+
}
114+
115+
Ok(())
116+
}));
117+
}
118+
}

crates/polars-stream/src/nodes/io_sources/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::nodes::compute_node_prelude::*;
1717

1818
pub mod multi_file_reader;
1919

20+
pub mod batch;
2021
#[cfg(feature = "csv")]
2122
pub mod csv;
2223
#[cfg(feature = "ipc")]

crates/polars-stream/src/physical_plan/fmt.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ fn visualize_plan_rec(
4646
),
4747
&[][..],
4848
),
49+
#[cfg(feature = "python")]
50+
PhysNodeKind::PythonScan { .. } => ("python-scan".to_string(), &[][..]),
4951
PhysNodeKind::SinkMultiple { sinks } => {
5052
for sink in sinks {
5153
visualize_plan_rec(*sink, phys_sm, expr_arena, visited, out);

crates/polars-stream/src/physical_plan/lower_ir.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,9 @@ pub fn lower_ir(
722722
},
723723

724724
#[cfg(feature = "python")]
725-
IR::PythonScan { .. } => todo!(),
725+
IR::PythonScan { options } => PhysNodeKind::PythonScan {
726+
options: options.clone(),
727+
},
726728

727729
IR::Cache {
728730
input,

crates/polars-stream/src/physical_plan/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ pub enum PhysNodeKind {
227227
file_options: Box<FileScanOptions>,
228228
},
229229

230+
#[cfg(feature = "python")]
231+
PythonScan {
232+
options: polars_plan::plans::python::PythonOptions,
233+
},
234+
230235
GroupBy {
231236
input: PhysStream,
232237
key: Vec<ExprIR>,
@@ -284,6 +289,8 @@ fn visit_node_inputs_mut(
284289
| PhysNodeKind::MultiScan { .. }
285290
| PhysNodeKind::FileScan { .. }
286291
| PhysNodeKind::InputIndependentSelect { .. } => {},
292+
#[cfg(feature = "python")]
293+
PhysNodeKind::PythonScan { .. } => {},
287294
PhysNodeKind::Select { input, .. }
288295
| PhysNodeKind::WithRowIndex { input, .. }
289296
| PhysNodeKind::Reduce { input, .. }

0 commit comments

Comments
 (0)