Skip to content

Commit 70c1740

Browse files
authored
Expr: Remove demand field from join and flatmap. (#8279)
Remove demand field from join and flatmap. Record that demand has become obsolete for join and flatmap.
1 parent 52458bd commit 70c1740

File tree

21 files changed

+57
-186
lines changed

21 files changed

+57
-186
lines changed

doc/developer/arrangements.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,6 @@ It is not uncommon to re-use this arrangement, as we find that such groupings ar
255255
The `mz_arrangement_sharing` logging source reports the number of times each arrangement is shared.
256256
An arrangement is identified by the worker and operator that created it.
257257

258-
## Caveats: Demand Analysis
259-
260-
When users present queries and views to us, we can determine that some fields are not required.
261-
We blank out any field that is not required.
262-
This can reduce the number of distinct `data` in an arrangement, which will reduce the size of the arrangement.
263-
264258
## Caveats: Delta Joins
265259

266260
In certain circumstances, we plan `Join` operators using a different pattern which avoids the intermediate arrangements.
@@ -278,3 +272,15 @@ But, we would also need to create similar dataflow graphs for each of `In2`, `In
278272
In each case they only require arrangements of the input, but they may be by different keys.
279273

280274
If a `Join` is implemented by a Delta Join pattern, it will create zero additional arrangements.
275+
276+
## Caveats: Demand Analysis
277+
278+
> Obsolete for `Join` and `FlatMap` as of v0.9.4. We now delete fields that are
279+
> not required instead of blanking them out. Plans prior to v0.9.4 will show
280+
> something like `| | demand = (#6, #8, #12, #15, #22, #23, #27)` for the
281+
> `Join` and `FlatMap` operators, listing which field will be blanked out.
282+
283+
When users present queries and views to us, we can determine that some fields
284+
are not required and blank out them out. This can reduce the number of distinct
285+
`data` in an arrangement, which will reduce the size of the arrangement.
286+
Currently, we blank out fields not required when importing sources.

doc/user/content/sql/explain.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ example is the choice of implementation in the `Join` operator.
165165
| | delta %0 %1.(#1) %2.(#0)
166166
| | delta %1 %0.(#0) %2.(#0)
167167
| | delta %2 %1.(#0) %0.(#0)
168-
| | demand = (#6, #8, #12, #15, #22, #23, #27)
169168
| Filter (#6 = "BUILDING"), (#12 < 1995-03-15), (#27 > 1995-03-15)
170169
| Reduce group=(#8, #12, #15) sum((#22 * (100dec - #23)))
171170
| Project (#0, #3, #1, #2)

src/dataflow/src/render/join/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ struct JoinBuildState {
233233
/// The linear operator logic (maps, filters, and projection) that remains to be applied
234234
/// to the output of the join.
235235
///
236-
/// We we advance through the construction of the join dataflow, we may be able to peel
236+
/// When we advance through the construction of the join dataflow, we may be able to peel
237237
/// off some of this work, ideally reducing `mfp` to something nearly the identity.
238238
mfp: MapFilterProject,
239239
}

src/dataflow/src/render/mod.rs

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -954,17 +954,7 @@ pub mod plan {
954954
b_keys,
955955
)
956956
}
957-
MirRelationExpr::FlatMap {
958-
input,
959-
func,
960-
exprs,
961-
demand,
962-
} => {
963-
// Map the demand into the MapFilterProject.
964-
// TODO: Remove this once demand is removed.
965-
if let Some(demand) = demand {
966-
prepend_mfp_demand(&mut mfp, expr, demand);
967-
}
957+
MirRelationExpr::FlatMap { input, func, exprs } => {
968958
let (input, _keys) = Plan::from_mir(input, arrangements)?;
969959
// This stage can absorb arbitrary MFP instances.
970960
let mfp = mfp.take();
@@ -982,15 +972,8 @@ pub mod plan {
982972
MirRelationExpr::Join {
983973
inputs,
984974
equivalences,
985-
demand,
986975
implementation,
987976
} => {
988-
// Map the demand into the MapFilterProject.
989-
// TODO: Remove this once demand is removed.
990-
if let Some(demand) = demand {
991-
prepend_mfp_demand(&mut mfp, expr, demand);
992-
}
993-
994977
let input_mapper = JoinInputMapper::new(inputs);
995978

996979
// Plan each of the join inputs independently.
@@ -1324,37 +1307,6 @@ pub mod plan {
13241307
}
13251308
}
13261309

1327-
/// Pre-prends a MapFilterProject instance with a transform that blanks out all but the columns in `demand`.
1328-
fn prepend_mfp_demand(
1329-
mfp: &mut MapFilterProject,
1330-
relation_expr: &MirRelationExpr,
1331-
demand: &[usize],
1332-
) {
1333-
let output_arity = relation_expr.arity();
1334-
// Determine dummy columns for un-demanded outputs, and a projection.
1335-
let mut dummies = Vec::new();
1336-
let mut demand_projection = Vec::new();
1337-
for (column, typ) in relation_expr.typ().column_types.into_iter().enumerate() {
1338-
if demand.contains(&column) {
1339-
demand_projection.push(column);
1340-
} else {
1341-
demand_projection.push(output_arity + dummies.len());
1342-
dummies.push(MirScalarExpr::literal_ok(Datum::Dummy, typ.scalar_type));
1343-
}
1344-
}
1345-
1346-
let (map, filter, project) = mfp.as_map_filter_project();
1347-
1348-
let map_filter_project = MapFilterProject::new(output_arity)
1349-
.map(dummies)
1350-
.project(demand_projection)
1351-
.map(map)
1352-
.filter(filter)
1353-
.project(project);
1354-
1355-
*mfp = map_filter_project;
1356-
}
1357-
13581310
/// Helper method to convert linear operators to MapFilterProject instances.
13591311
///
13601312
/// This method produces a `MapFilterProject` instance that first applies any predicates,

src/expr-test-util/tests/testdata/rel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ build
8787
(join
8888
[(get y) (get y)]
8989
[[#0 #3]]
90-
[0 1 5]
9190
(delta_query [[[0 [#0]] [1 [#0]]]
9291
[[1 [#0]] [0 [#0]]]]))
9392
----
@@ -103,7 +102,6 @@ build
103102
| | implementation = DeltaQuery
104103
| | delta %0 %0.(#0) %1.(#0)
105104
| | delta %1 %1.(#0) %0.(#0)
106-
| | demand = (#0, #1, #5)
107105
----
108106
----
109107

src/expr-test-util/tests/testdata/tospec

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ cat
2727
----
2828
ok
2929

30-
(Reduce (Filter (Join [(get u6) (get u16) (get u19) (get u26) (get u34) (get u31) (get u37)] [] null Unimplemented) [(CallBinary Eq #0 #25) (CallBinary Eq #1 #23) (CallBinary Eq #2 #24) (CallBinary Eq #21 (CallUnary CastInt16ToInt32 #61)) (CallBinary Eq #22 #30) (CallBinary Eq #23 #31) (CallBinary Eq #24 #32) (CallBinary Eq #32 #41) (CallBinary Eq #34 #40) (CallBinary Eq #57 (CallUnary CastInt16ToInt32 #58)) (CallBinary Eq #61 #65) (CallBinary Eq #67 #69) (CallBinary Eq #70 ("EUROPE" String)) (CallBinary Gte (CallUnary CastDateToTimestamp #26) ("2007-01-02 00:00:00" Timestamp))]) [#66] [(SumNumeric #38 false)] false null)
30+
(Reduce (Filter (Join [(get u6) (get u16) (get u19) (get u26) (get u34) (get u31) (get u37)] [] Unimplemented) [(CallBinary Eq #0 #25) (CallBinary Eq #1 #23) (CallBinary Eq #2 #24) (CallBinary Eq #21 (CallUnary CastInt16ToInt32 #61)) (CallBinary Eq #22 #30) (CallBinary Eq #23 #31) (CallBinary Eq #24 #32) (CallBinary Eq #32 #41) (CallBinary Eq #34 #40) (CallBinary Eq #57 (CallUnary CastInt16ToInt32 #58)) (CallBinary Eq #61 #65) (CallBinary Eq #67 #69) (CallBinary Eq #70 ("EUROPE" String)) (CallBinary Gte (CallUnary CastDateToTimestamp #26) ("2007-01-02 00:00:00" Timestamp))]) [#66] [(SumNumeric #38 false)] false null)
3131
----
3232
----
3333

@@ -45,26 +45,30 @@ cat
4545
ok
4646

4747
build
48-
(Filter
49-
(Join [(get u6) (get u16) (get u19) (get u26) (get u34) (get
50-
u31) (get u37)] [] null Unimplemented)
51-
[(CallBinary Eq #0 #25)
52-
(CallBinary Eq #1 #23)
53-
(CallBinary Eq #2 #24)
54-
(CallBinary Eq #21 (CallUnary CastInt16ToInt32 #61))
55-
(CallBinary Eq #22 #30)
56-
(CallBinary Eq #23 #31)
57-
(CallBinary Eq #24 #32)
58-
(CallBinary Eq #32 #41)
59-
(CallBinary Eq #34 #40)
60-
(CallBinary Eq #57 (CallUnary CastInt16ToInt32 #58))
61-
(CallBinary Eq #61 #65)
62-
(CallBinary Eq #67 #69)
63-
(CallBinary Eq #70 ("EUROPE" String))
64-
(CallBinary Gte (CallUnary CastDateToTimestamp #26)
65-
("2007-01-02 00:00:00" Timestamp))
66-
]
67-
)
48+
(Reduce
49+
(Filter
50+
(Join [(get u6) (get u16) (get u19) (get u26) (get u34) (get u31) (get
51+
u37)] [] Unimplemented)
52+
[(CallBinary Eq #0 #25)
53+
(CallBinary Eq #1 #23)
54+
(CallBinary Eq #2 #24)
55+
(CallBinary Eq #21 (CallUnary CastInt16ToInt32 #61))
56+
(CallBinary Eq #22 #30)
57+
(CallBinary Eq #23 #31)
58+
(CallBinary Eq #24 #32)
59+
(CallBinary Eq #32 #41)
60+
(CallBinary Eq #34 #40)
61+
(CallBinary Eq #57 (CallUnary CastInt16ToInt32 #58))
62+
(CallBinary Eq #61 #65)
63+
(CallBinary Eq #67 #69)
64+
(CallBinary Eq #70 ("EUROPE" String))
65+
(CallBinary Gte (CallUnary CastDateToTimestamp #26) ("2007-01-02
66+
00:00:00" Timestamp))
67+
])
68+
[#66]
69+
[(SumNumeric #38 false)]
70+
false
71+
null)
6872
----
6973
----
7074
%0 =
@@ -92,5 +96,7 @@ build
9296
| Join %0 %1 %2 %3 %4 %5 %6
9397
| | implementation = Unimplemented
9498
| Filter (#0 = #25), (#1 = #23), (#2 = #24), (#21 = i16toi32(#61)), (#22 = #30), (#23 = #31), (#24 = #32), (#32 = #41), (#34 = #40), (#57 = i16toi32(#58)), (#61 = #65), (#67 = #69), (#70 = "EUROPE"), (datetots(#26) >= 2007-01-02 00:00:00)
99+
| Reduce group=(#66)
100+
| | agg sum(#38)
95101
----
96102
----

src/expr/src/explain.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -233,22 +233,13 @@ impl<'a> ViewExplanation<'a> {
233233
writeln!(f, "| Project {}", bracketed("(", ")", Indices(outputs)))?
234234
}
235235
Map { scalars, .. } => writeln!(f, "| Map {}", separated(", ", scalars))?,
236-
FlatMap {
237-
func,
238-
exprs,
239-
demand,
240-
..
241-
} => {
236+
FlatMap { func, exprs, .. } => {
242237
writeln!(f, "| FlatMap {}({})", func, separated(", ", exprs))?;
243-
if let Some(demand) = demand {
244-
writeln!(f, "| | demand = {}", bracketed("(", ")", Indices(demand)))?;
245-
}
246238
}
247239
Filter { predicates, .. } => writeln!(f, "| Filter {}", separated(", ", predicates))?,
248240
Join {
249241
inputs,
250242
equivalences,
251-
demand,
252243
implementation,
253244
} => {
254245
write!(
@@ -278,9 +269,6 @@ impl<'a> ViewExplanation<'a> {
278269
writeln!(f)?;
279270
write!(f, "| | implementation = ")?;
280271
self.fmt_join_implementation(f, inputs, implementation)?;
281-
if let Some(demand) = demand {
282-
writeln!(f, "| | demand = {}", bracketed("(", ")", Indices(demand)))?;
283-
}
284272
}
285273
Reduce {
286274
group_key,

src/expr/src/relation/mod.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,6 @@ pub enum MirRelationExpr {
9797
func: TableFunc,
9898
/// The argument to the table func
9999
exprs: Vec<MirScalarExpr>,
100-
/// Output columns demanded by the surrounding expression.
101-
///
102-
/// The input columns are often discarded and can be very
103-
/// expensive to reproduce, so restricting what we produce
104-
/// as output can be a substantial win.
105-
///
106-
/// See `transform::Demand` for more details.
107-
#[serde(default)]
108-
demand: Option<Vec<usize>>,
109100
},
110101
/// Keep rows from a dataflow where all the predicates are true
111102
///
@@ -136,15 +127,6 @@ pub enum MirRelationExpr {
136127
/// inputs, but more general cases exist (e.g. complex functions of multiple columns
137128
/// from multiple inputs, or just constant literals).
138129
equivalences: Vec<Vec<MirScalarExpr>>,
139-
/// This optional field is a hint for which columns are
140-
/// actually used by operators that use this collection. Although the
141-
/// join does not have permission to change the schema, it can introduce
142-
/// dummy values at the end of its computation, avoiding the maintenance of values
143-
/// not present in this list (when it is non-None).
144-
///
145-
/// See `transform::Demand` for more details.
146-
#[serde(default)]
147-
demand: Option<Vec<usize>>,
148130
/// Join implementation information.
149131
#[serde(default)]
150132
implementation: JoinImplementation,
@@ -755,7 +737,6 @@ impl MirRelationExpr {
755737
input: Box::new(self),
756738
func,
757739
exprs,
758-
demand: None,
759740
}
760741
}
761742

@@ -843,7 +824,6 @@ impl MirRelationExpr {
843824
MirRelationExpr::Join {
844825
inputs,
845826
equivalences,
846-
demand: None,
847827
implementation: JoinImplementation::Unimplemented,
848828
}
849829
}
@@ -1227,7 +1207,6 @@ impl MirRelationExpr {
12271207
exprs,
12281208
input: _,
12291209
func: _,
1230-
demand: _,
12311210
} => {
12321211
for expr in exprs {
12331212
f(expr)?;
@@ -1237,7 +1216,6 @@ impl MirRelationExpr {
12371216
MirRelationExpr::Join {
12381217
equivalences: keys,
12391218
inputs: _,
1240-
demand: _,
12411219
implementation: _,
12421220
}
12431221
| MirRelationExpr::ArrangeBy { input: _, keys } => {

src/transform/src/column_knowledge.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,7 @@ impl ColumnKnowledge {
9898
}
9999
input_knowledge
100100
}
101-
MirRelationExpr::FlatMap {
102-
input,
103-
func,
104-
exprs,
105-
demand: _,
106-
} => {
101+
MirRelationExpr::FlatMap { input, func, exprs } => {
107102
let mut input_knowledge =
108103
ColumnKnowledge::harvest(input, knowledge, knowledge_stack)?;
109104
let input_typ = input.typ();

src/transform/src/demand.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,7 @@ impl Demand {
126126
input,
127127
func: _,
128128
exprs,
129-
demand,
130129
} => {
131-
let mut sorted = columns.iter().cloned().collect::<Vec<_>>();
132-
sorted.sort_unstable();
133-
*demand = Some(sorted);
134130
// A FlatMap which returns zero rows acts like a filter
135131
// so we always need to execute it
136132
for expr in exprs {
@@ -150,7 +146,6 @@ impl Demand {
150146
MirRelationExpr::Join {
151147
inputs,
152148
equivalences,
153-
demand: _,
154149
implementation: _,
155150
} => {
156151
let input_mapper = JoinInputMapper::new(inputs);

src/transform/src/fusion/join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ impl JoinBuilder {
190190
_ => MirRelationExpr::Join {
191191
inputs: self.inputs,
192192
equivalences: self.equivalences,
193-
demand: None,
194193
implementation: expr::JoinImplementation::Unimplemented,
195194
},
196195
};

src/transform/src/join_implementation.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ mod delta_queries {
208208
if let MirRelationExpr::Join {
209209
inputs,
210210
equivalences,
211-
demand: _,
212211
implementation,
213212
} = &mut new_join
214213
{
@@ -277,7 +276,6 @@ mod differential {
277276
if let MirRelationExpr::Join {
278277
inputs,
279278
equivalences,
280-
demand: _,
281279
implementation,
282280
} = &mut new_join
283281
{

src/transform/src/map_lifting.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,7 @@ impl LiteralLifting {
340340

341341
result
342342
}
343-
MirRelationExpr::FlatMap {
344-
input,
345-
func,
346-
exprs,
347-
demand: _,
348-
} => {
343+
MirRelationExpr::FlatMap { input, func, exprs } => {
349344
let literals = self.action(input, gets);
350345
if !literals.is_empty() {
351346
let input_arity = input.arity();
@@ -389,7 +384,6 @@ impl LiteralLifting {
389384
MirRelationExpr::Join {
390385
inputs,
391386
equivalences,
392-
demand,
393387
implementation,
394388
} => {
395389
// before lifting, save the original shape of the inputs
@@ -418,7 +412,6 @@ impl LiteralLifting {
418412
}
419413

420414
if input_literals.iter().any(|l| !l.is_empty()) {
421-
*demand = None;
422415
*implementation = expr::JoinImplementation::Unimplemented;
423416

424417
// We should be able to install any literals in the

0 commit comments

Comments
 (0)