Skip to content

Commit e6114f9

Browse files
refactor(rust): Remove unused multiscan code (#22183)
1 parent 999812e commit e6114f9

File tree

19 files changed

+464
-2038
lines changed

19 files changed

+464
-2038
lines changed

crates/polars-stream/src/nodes/io_sinks/csv.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use crate::async_executor::spawn;
1414
use crate::async_primitives::connector::Receiver;
1515
use crate::execute::StreamingExecutionState;
1616
use crate::nodes::io_sinks::parallelize_receive_task;
17-
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};
17+
use crate::nodes::io_sinks::phase::PhaseOutcome;
18+
use crate::nodes::{JoinHandle, TaskPriority};
1819

1920
pub struct CsvSinkNode {
2021
target: SinkTarget,

crates/polars-stream/src/nodes/io_sinks/ipc.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use crate::async_primitives::connector::{Receiver, connector};
2424
use crate::async_primitives::distributor_channel::distributor_channel;
2525
use crate::async_primitives::linearizer::Linearizer;
2626
use crate::execute::StreamingExecutionState;
27-
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};
27+
use crate::nodes::io_sinks::phase::PhaseOutcome;
28+
use crate::nodes::{JoinHandle, TaskPriority};
2829

2930
pub struct IpcSinkNode {
3031
target: SinkTarget,

crates/polars-stream/src/nodes/io_sinks/json.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use crate::async_executor::spawn;
1111
use crate::async_primitives::connector::Receiver;
1212
use crate::execute::StreamingExecutionState;
1313
use crate::nodes::io_sinks::parallelize_receive_task;
14-
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};
14+
use crate::nodes::io_sinks::phase::PhaseOutcome;
15+
use crate::nodes::{JoinHandle, TaskPriority};
1516

1617
pub struct NDJsonSinkNode {
1718
target: SinkTarget,

crates/polars-stream/src/nodes/io_sinks/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use polars_core::prelude::Column;
88
use polars_core::schema::SchemaRef;
99
use polars_error::PolarsResult;
1010

11-
use super::{
12-
ComputeNode, JoinHandle, Morsel, PhaseOutcome, PortState, RecvPort, SendPort, TaskScope,
13-
};
11+
use super::{ComputeNode, JoinHandle, Morsel, PortState, RecvPort, SendPort, TaskScope};
1412
use crate::async_executor::{AbortOnDropHandle, spawn};
1513
use crate::async_primitives::connector::{Receiver, Sender, connector};
1614
use crate::async_primitives::distributor_channel;
@@ -19,6 +17,9 @@ use crate::async_primitives::wait_group::WaitGroup;
1917
use crate::execute::StreamingExecutionState;
2018
use crate::nodes::TaskPriority;
2119

20+
mod phase;
21+
use phase::PhaseOutcome;
22+
2223
#[cfg(feature = "csv")]
2324
pub mod csv;
2425
#[cfg(feature = "ipc")]

crates/polars-stream/src/nodes/io_sinks/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use crate::async_primitives::connector::{Receiver, connector};
2727
use crate::async_primitives::distributor_channel::distributor_channel;
2828
use crate::async_primitives::linearizer::Linearizer;
2929
use crate::execute::StreamingExecutionState;
30-
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};
30+
use crate::nodes::io_sinks::phase::PhaseOutcome;
31+
use crate::nodes::{JoinHandle, TaskPriority};
3132

3233
pub struct ParquetSinkNode {
3334
target: SinkTarget,

crates/polars-stream/src/nodes/io_sinks/partition/by_key.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ use crate::async_executor::{AbortOnDropHandle, spawn};
1919
use crate::execute::StreamingExecutionState;
2020
use crate::morsel::SourceToken;
2121
use crate::nodes::io_sinks::partition::{SinkSender, open_new_sink};
22+
use crate::nodes::io_sinks::phase::PhaseOutcome;
2223
use crate::nodes::io_sinks::{SinkInputPort, SinkNode, parallelize_receive_task};
23-
use crate::nodes::{JoinHandle, Morsel, MorselSeq, PhaseOutcome, TaskPriority};
24+
use crate::nodes::{JoinHandle, Morsel, MorselSeq, TaskPriority};
2425

2526
type Linearized =
2627
Priority<Reverse<MorselSeq>, (SourceToken, Vec<(Buffer<u8>, Vec<Column>, DataFrame)>)>;

crates/polars-stream/src/nodes/io_sinks/partition/max_size.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ use crate::async_primitives::connector::Receiver;
1818
use crate::async_primitives::distributor_channel::distributor_channel;
1919
use crate::execute::StreamingExecutionState;
2020
use crate::nodes::io_sinks::partition::{SinkSender, open_new_sink};
21+
use crate::nodes::io_sinks::phase::PhaseOutcome;
2122
use crate::nodes::io_sinks::{SinkInputPort, SinkNode};
22-
use crate::nodes::{JoinHandle, Morsel, PhaseOutcome, TaskPriority};
23+
use crate::nodes::{JoinHandle, Morsel, TaskPriority};
2324

2425
pub struct MaxSizePartitionSinkNode {
2526
input_schema: SchemaRef,

crates/polars-stream/src/nodes/io_sinks/partition/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use crate::async_executor::{AbortOnDropHandle, spawn};
1717
use crate::async_primitives::wait_group::WaitGroup;
1818
use crate::async_primitives::{connector, distributor_channel};
1919
use crate::execute::StreamingExecutionState;
20-
use crate::nodes::{Morsel, PhaseOutcome, TaskPriority};
20+
use crate::nodes::io_sinks::phase::PhaseOutcome;
21+
use crate::nodes::{Morsel, TaskPriority};
2122

2223
pub mod by_key;
2324
pub mod max_size;

crates/polars-stream/src/nodes/io_sinks/partition/parted.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ use crate::async_primitives::connector::Receiver;
1818
use crate::async_primitives::distributor_channel::distributor_channel;
1919
use crate::execute::StreamingExecutionState;
2020
use crate::nodes::io_sinks::partition::{SinkSender, open_new_sink};
21+
use crate::nodes::io_sinks::phase::PhaseOutcome;
2122
use crate::nodes::io_sinks::{SinkInputPort, SinkNode};
22-
use crate::nodes::{JoinHandle, Morsel, PhaseOutcome, TaskPriority};
23+
use crate::nodes::{JoinHandle, Morsel, TaskPriority};
2324

2425
pub struct PartedPartitionSinkNode {
2526
// This is not be the same as the input_schema, e.g. when include_key=false then this will not
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use std::sync::Arc;
2+
use std::sync::atomic::{AtomicBool, Ordering};
3+
4+
use crate::async_primitives::wait_group::WaitToken;
5+
6+
/// The outcome of a phase in a task.
7+
///
8+
/// This indicates whether a task finished (and does not need to be started again) or has stopped
9+
/// prematurely. When this is dropped without calling `stop`, it is assumed that the task is
10+
/// finished (most likely because it errored).
11+
pub struct PhaseOutcome {
12+
// This is used to see when phase is finished.
13+
#[expect(unused)]
14+
consume_token: WaitToken,
15+
16+
outcome_token: PhaseOutcomeToken,
17+
}
18+
19+
impl PhaseOutcome {
20+
pub fn new_shared_wait(consume_token: WaitToken) -> (PhaseOutcomeToken, Self) {
21+
let outcome_token = PhaseOutcomeToken::new();
22+
(
23+
outcome_token.clone(),
24+
Self {
25+
consume_token,
26+
outcome_token,
27+
},
28+
)
29+
}
30+
31+
/// Phase ended before the task finished and needs to be called again.
32+
pub fn stopped(self) {
33+
self.outcome_token.stop();
34+
}
35+
}
36+
37+
/// Token that contains the outcome of a phase.
38+
///
39+
/// Namely, this indicates whether a phase finished completely or whether it was stopped before
40+
/// that.
41+
#[derive(Clone)]
42+
pub struct PhaseOutcomeToken {
43+
/// - `false` -> finished / panicked
44+
/// - `true` -> stopped before finishing
45+
stop: Arc<AtomicBool>,
46+
}
47+
48+
impl PhaseOutcomeToken {
49+
pub fn new() -> Self {
50+
Self {
51+
stop: Arc::new(AtomicBool::new(false)),
52+
}
53+
}
54+
55+
/// Indicate that the phase was stopped before finishing.
56+
pub fn stop(&self) {
57+
self.stop.store(true, Ordering::Relaxed);
58+
}
59+
60+
/// Returns whether the phase was stopped before finishing.
61+
pub fn was_stopped(&self) -> bool {
62+
self.stop.load(Ordering::Relaxed)
63+
}
64+
65+
/// Returns whether the phase was finished completely.
66+
pub fn did_finish(&self) -> bool {
67+
!self.was_stopped()
68+
}
69+
}

0 commit comments

Comments
 (0)