Skip to content

Commit 2fa719e

Browse files
committed
broken with ICE
1 parent bead9b5 commit 2fa719e

File tree

9 files changed

+402
-320
lines changed

9 files changed

+402
-320
lines changed

examples/cc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ where G::Timestamp: LeastUpperBound+Hash {
6868

6969
inner.join_map_u(&edges, |_k,l,d| (*d,*l))
7070
.concat(&nodes)
71-
.group_u(|_, s, t| { t.push((*s.peek().unwrap().0, 1)); } )
71+
.group_u(|_, mut s, t| { t.push((*s.peek().unwrap().0, 1)); } )
7272
})
7373
}

examples/scc.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use timely::dataflow::*;
1010
use timely::dataflow::operators::*;
1111

1212
use differential_dataflow::operators::*;
13-
use differential_dataflow::operators::group::{GroupUnsigned, GroupBy};
13+
use differential_dataflow::operators::group::{GroupUnsigned};
1414
use differential_dataflow::operators::join::{JoinUnsigned};
1515
use differential_dataflow::collection::LeastUpperBound;
1616

@@ -90,9 +90,9 @@ fn _trim_and_flip<G: Scope>(graph: &Stream<G, (Edge, i32)>) -> Stream<G, (Edge,
9090
where G::Timestamp: LeastUpperBound {
9191
graph.iterate(|edges| {
9292
let inner = edges.scope().enter(&graph).map_in_place(|x| x.0 = ((x.0).1, (x.0).0));
93-
edges.map(|((x,_),w)| (x,w))
93+
edges.map(|((x,_),w)| ((x,()),w))
9494
// .threshold(|&x| x, |i| (Vec::new(), i), |_, w| if w > 0 { 1 } else { 0 })
95-
.group_by_u(|x|(x,()), |&x,_| (x, ()), |_,_,target| target.push(((),1)))
95+
.group_u(|_,_,target| target.push(((),1)))
9696
.join_map_u(&inner, |&d,_,&s| (s,d))
9797
})
9898
.map_in_place(|x| x.0 = ((x.0).1, (x.0).0))

src/collection/count.rs

Lines changed: 76 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,17 @@
11
//! Like `Count` but with the value type specialized to `()`.
22
3-
use std::fmt::Debug;
4-
5-
use collection::{close_under_lub, LeastUpperBound, Lookup};
3+
use ::Data;
4+
use collection::{LeastUpperBound, Lookup};
65
use collection::compact::Compact;
6+
use collection::trace::{Traceable, TraceRef};
77

8-
#[derive(Copy, Clone, Debug)]
9-
pub struct Offset {
10-
dataz: u32,
11-
}
12-
13-
impl Offset {
14-
#[inline(always)]
15-
fn new(offset: usize) -> Offset {
16-
assert!(offset < ((!0u32) as usize)); // note strict inequality
17-
Offset { dataz: (!0u32) - offset as u32 }
18-
}
19-
#[inline(always)]
20-
fn val(&self) -> usize { ((!0u32) - self.dataz) as usize }
21-
}
22-
23-
struct ListEntry {
24-
time: u32,
25-
wgts: i32,
26-
next: Option<Offset>,
27-
}
28-
29-
pub struct Count<K, T, L> {
30-
phantom: ::std::marker::PhantomData<K>,
31-
links: Vec<ListEntry>,
32-
times: Vec<T>,
33-
pub keys: L,
34-
temp: Vec<T>,
35-
}
36-
37-
impl<K, L, T> Count<K, T, L> where K: Ord, L: Lookup<K, Offset>, T: LeastUpperBound+Debug {
38-
8+
impl<K, L, T> Traceable for Count<K, T, L> where K: Data+Ord+'static, L: Lookup<K, Offset>+'static, T: LeastUpperBound+'static {
9+
type Key = K;
10+
type Index = T;
11+
type Value = ();
12+
3913
/// Installs a supplied set of keys and values as the differences for `time`.
40-
pub fn set_difference(&mut self, time: T, accumulation: Compact<K, ()>) {
14+
fn set_difference(&mut self, time: T, accumulation: Compact<K, ()>) {
4115

4216
// extract the relevant fields
4317
let keys = accumulation.keys;
@@ -78,52 +52,59 @@ impl<K, L, T> Count<K, T, L> where K: Ord, L: Lookup<K, Offset>, T: LeastUpperBo
7852

7953
self.times.push(time);
8054
}
55+
}
56+
57+
impl<'a,K,L,T> TraceRef<'a,K,T,()> for &'a Count<K,T,L> where K: Ord+'a, L: Lookup<K, Offset>+'a, T: LeastUpperBound+'a {
58+
type VIterator = WeightIterator<'a>;
59+
type TIterator = CountIterator<'a,K,T,L>;
60+
fn trace(self, key: &K) -> Self::TIterator {
61+
CountIterator {
62+
trace: self,
63+
next0: self.keys.get_ref(key).map(|&x|x),
64+
// silly: (),
65+
}
66+
}
67+
}
68+
69+
#[derive(Copy, Clone, Debug)]
70+
pub struct Offset {
71+
dataz: u32,
72+
}
8173

82-
/// Enumerates the differences for `key` at `time`.
83-
pub fn get_diff(&self, key: &K, time: &T) -> i32 {
84-
self.trace(key)
85-
.filter(|x| x.0 == time)
86-
.map(|x| x.1)
87-
.next()
88-
.unwrap_or(0)
74+
impl Offset {
75+
#[inline(always)]
76+
fn new(offset: usize) -> Offset {
77+
assert!(offset < ((!0u32) as usize)); // note strict inequality
78+
Offset { dataz: (!0u32) - offset as u32 }
8979
}
80+
#[inline(always)]
81+
fn val(&self) -> usize { ((!0u32) - self.dataz) as usize }
82+
}
83+
84+
struct ListEntry {
85+
time: u32,
86+
wgts: i32,
87+
next: Option<Offset>,
88+
}
89+
90+
pub struct Count<K, T, L> {
91+
phantom: ::std::marker::PhantomData<K>,
92+
links: Vec<ListEntry>,
93+
times: Vec<T>,
94+
pub keys: L,
95+
// temp: Vec<T>,
96+
silly: (),
97+
}
98+
99+
impl<K, L, T> Count<K, T, L> where K: Data+Ord+'static, L: Lookup<K, Offset>+'static, T: LeastUpperBound+'static {
90100

91101
pub fn get_count(&self, key: &K, time: &T) -> i32 {
92102
let mut sum = 0;
93-
for wgt in self.trace(key).filter(|x| x.0 <= time).map(|x| x.1) {
103+
for wgt in Traceable::trace(self, key).filter(|x| x.0 <= time).map(|mut x| x.1.next().unwrap().1) {
94104
sum += wgt;
95105
}
96106
sum
97107
}
98-
99-
// TODO : this could do a better job of returning newly interesting times: those times that are
100-
// TODO : now in the least upper bound, but were not previously so. The main risk is that the
101-
// TODO : easy way to do this computes the LUB before and after, but this can be expensive:
102-
// TODO : the LUB with `index` is often likely to be smaller than the LUB without it.
103-
/// Lists times that are the least upper bound of `time` and any subset of existing times.
104-
pub fn interesting_times<'a>(&'a mut self, key: &K, index: T) -> &'a [T] {
105-
// panic!();
106-
let mut temp = ::std::mem::replace(&mut self.temp, Vec::new());
107-
temp.clear();
108-
temp.push(index);
109-
for (time, _) in self.trace(key) {
110-
let lub = time.least_upper_bound(&temp[0]);
111-
if !temp.contains(&lub) {
112-
temp.push(lub);
113-
}
114-
}
115-
close_under_lub(&mut temp);
116-
::std::mem::replace(&mut self.temp, temp);
117-
&self.temp[..]
118-
}
119-
120-
/// Enumerates pairs of time `&T` and differences `DifferenceIterator<V>` for `key`.
121-
pub fn trace<'a>(&'a self, key: &K) -> CountIterator<'a, K, T, L> {
122-
CountIterator {
123-
trace: self,
124-
next0: self.keys.get_ref(key).map(|&x|x),
125-
}
126-
}
127108
}
128109

129110
impl<K: Eq, L: Lookup<K, Offset>, T> Count<K, T, L> {
@@ -133,7 +114,8 @@ impl<K: Eq, L: Lookup<K, Offset>, T> Count<K, T, L> {
133114
links: Vec::new(),
134115
times: Vec::new(),
135116
keys: l,
136-
temp: Vec::new(),
117+
// temp: Vec::new(),
118+
silly: (),
137119
}
138120
}
139121
}
@@ -146,18 +128,35 @@ pub struct CountIterator<'a, K: Eq+'a, T: 'a, L: Lookup<K, Offset>+'a> {
146128
}
147129

148130
impl<'a, K: Eq, T, L> Iterator for CountIterator<'a, K, T, L>
149-
where K: Ord+'a,
150-
T: LeastUpperBound+Debug+'a,
131+
where K: Ord+'a,
132+
T: LeastUpperBound+'a,
151133
L: Lookup<K, Offset>+'a {
152-
type Item = (&'a T, i32);
134+
type Item = (&'a T, WeightIterator<'a>);
153135

154136
#[inline]
155137
fn next(&mut self) -> Option<Self::Item> {
156138
self.next0.map(|position| {
157139
let time_index = self.trace.links[position.val()].time as usize;
158-
let result = (&self.trace.times[time_index], self.trace.links[position.val()].wgts);
140+
let result = (&self.trace.times[time_index], WeightIterator { weight: self.trace.links[position.val()].wgts, silly: &self.trace.silly });
159141
self.next0 = self.trace.links[position.val()].next;
160142
result
161143
})
162144
}
163145
}
146+
147+
pub struct WeightIterator<'a> {
148+
weight: i32,
149+
silly: &'a (),
150+
}
151+
152+
impl<'a> Iterator for WeightIterator<'a> {
153+
type Item = (&'a (), i32);
154+
fn next(&mut self) -> Option<(&'a (), i32)> {
155+
if self.weight == 0 { None }
156+
else {
157+
let result = self.weight;
158+
self.weight = 0;
159+
Some((self.silly, result))
160+
}
161+
}
162+
}

src/collection/trace.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
use std::iter::Peekable;
22

3+
use ::Data;
34
use collection::{close_under_lub, LeastUpperBound, Lookup};
5+
use collection::compact::Compact;
46

57
use iterators::merge::{Merge, MergeIterator};
68
use iterators::coalesce::{Coalesce, CoalesceIterator};
7-
use collection::compact::Compact;
89

910
// Test implementation which uses references rather than clones
1011

1112
pub trait Traceable where for<'a> &'a Self: TraceRef<'a, Self::Key, Self::Index, Self::Value> {
1213

13-
type Key: Ord+'static;
14+
type Key: Data+Ord+'static;
1415
type Index: LeastUpperBound+'static;
15-
type Value: Ord+'static;
16+
type Value: Data+Ord+'static;
17+
18+
// type PartKey: Unsigned; // the keys are partitioned and likely ordered by this unsigned integer
19+
20+
// // indicates the part for a key
21+
// fn part(&self, key: &Self::Key) -> Self::PartKey;
1622

23+
// TODO : Should probably allow the trace to determine how it receives data.
24+
// TODO : Radix sorting and such might live in the trace, rather than in `Arrange`.
1725
/// Introduces differences in `accumulation` at `time`.
1826
fn set_difference(&mut self, time: Self::Index, accumulation: Compact<Self::Key, Self::Value>);
1927

@@ -46,7 +54,7 @@ pub trait Traceable where for<'a> &'a Self: TraceRef<'a, Self::Key, Self::Index,
4654
}
4755

4856
// TODO : Make sure the right assumptions are made about contents of stash.
49-
fn interesting_times<'a>(&'a mut self, key: &Self::Key, time: &Self::Index, stash: &mut Vec<Self::Index>) {
57+
fn interesting_times<'a>(&'a self, key: &Self::Key, time: &Self::Index, stash: &mut Vec<Self::Index>) {
5058
// add all times, but filter a bit if possible
5159
for iter in self.trace(key) {
5260
let lub = iter.0.least_upper_bound(time);
@@ -59,14 +67,14 @@ pub trait Traceable where for<'a> &'a Self: TraceRef<'a, Self::Key, Self::Index,
5967
}
6068

6169
pub trait TraceRef<'a,K,T:'a,V:'a> {
62-
type VIterator: Iterator<Item=(&'a V, i32)>;
63-
type TIterator: Iterator<Item=(&'a T, Self::VIterator)>;
70+
type VIterator: Iterator<Item=(&'a V, i32)>+'a;
71+
type TIterator: Iterator<Item=(&'a T, Self::VIterator)>+'a;
6472
fn trace(self, key: &K) -> Self::TIterator;
6573
}
6674

6775
pub type CollectionIterator<VIterator> = Peekable<CoalesceIterator<MergeIterator<VIterator>>>;
6876

69-
impl<K,V,L,T> Traceable for Trace<K, T, V, L> where K: Ord+'static, V: Ord+'static, L: Lookup<K, Offset>+'static, T: LeastUpperBound+'static {
77+
impl<K,V,L,T> Traceable for Trace<K, T, V, L> where K: Data+Ord+'static, V: Data+Ord+'static, L: Lookup<K, Offset>+'static, T: LeastUpperBound+'static {
7078
type Key = K;
7179
type Index = T;
7280
type Value = V;
@@ -122,7 +130,7 @@ impl<K,V,L,T> Traceable for Trace<K, T, V, L> where K: Ord+'static, V: Ord+'stat
122130
}
123131
}
124132

125-
impl<'a,K,V,L,T> TraceRef<'a,K,T,V> for &'a Trace<K,T,V,L> where K: Ord+'a, V: Ord+'a, L: Lookup<K, Offset>+'a, T: LeastUpperBound+'a {
133+
impl<'a,K,V,L,T> TraceRef<'a,K,T,V> for &'a Trace<K,T,V,L> where K: Data+Ord+'a, V: Data+Ord+'a, L: Lookup<K, Offset>+'a, T: LeastUpperBound+'a {
126134
type VIterator = DifferenceIterator<'a, V>;
127135
type TIterator = TraceIterator<'a,K,T,V,L>;
128136
fn trace(self, key: &K) -> Self::TIterator {

src/operators/arrange.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ use collection::count::Count;
2121
use collection::compact::Compact;
2222
use radix_sort::{RadixSorter, Unsigned};
2323

24+
/// A collection of `(K,V)` values as a timely stream and shared trace.
25+
///
26+
/// An `Arranged` performs the task of arranging a keyed collection once,
27+
/// allowing multiple differential operators to use the same trace. This
28+
/// saves on computation and memory, in exchange for some cognitive overhead
29+
/// in writing differential operators: each must pay enough care to signals
30+
/// from the `stream` field to know the subset of `trace` it has logically
31+
/// received.
2432
pub struct Arranged<G: Scope, T: Traceable<Index=G::Timestamp>>
2533
where
2634
T::Key: Data,
@@ -32,6 +40,7 @@ pub struct Arranged<G: Scope, T: Traceable<Index=G::Timestamp>>
3240
pub trace: Rc<RefCell<T>>,
3341
}
3442

43+
/// Arranges something as `(Key,Val)` pairs.
3544
pub trait ArrangeByKey<G: Scope, K: Data, V: Data> where G::Timestamp: LeastUpperBound {
3645
fn arrange_by_key<
3746
U: Unsigned+Default,
@@ -42,7 +51,6 @@ pub trait ArrangeByKey<G: Scope, K: Data, V: Data> where G::Timestamp: LeastUppe
4251
}
4352

4453
impl<G: Scope, K: Data, V: Data> ArrangeByKey<G, K, V> for Collection<G, (K, V)> where G::Timestamp: LeastUpperBound {
45-
4654
fn arrange_by_key<
4755
U: Unsigned+Default,
4856
KH: Fn(&K)->U+'static,
@@ -86,9 +94,6 @@ impl<G: Scope, K: Data, V: Data> ArrangeByKey<G, K, V> for Collection<G, (K, V)>
8694

8795
// 2a. fetch any data associated with this time.
8896
if let Some(mut queue) = inputs.remove_key(&index) {
89-
90-
// println!("got some data for {:?}; updating", index);
91-
9297
// sort things; radix if many, .sort_by if few.
9398
let compact = if queue.len() > 1 {
9499
for element in queue.into_iter() {
@@ -122,18 +127,19 @@ impl<G: Scope, K: Data, V: Data> ArrangeByKey<G, K, V> for Collection<G, (K, V)>
122127
}
123128

124129

125-
pub struct ArrangedBySelf<G: Scope, K: Data, L: Lookup<K, ::collection::count::Offset>+'static> {
126-
pub stream: Stream<G, (Vec<K>, Vec<u32>, Vec<((), i32)>)>,
127-
pub trace: Rc<RefCell<Count<K, G::Timestamp, L>>>,
128-
}
129-
130+
/// Arranges something as `(Key,())` pairs, logically by `Key`.
131+
///
132+
/// This trait provides an optimized implementation of `ArrangeByKey` in which
133+
/// the underlying trace does not support dynamic numbers of values for each key,
134+
/// which saves on computation and memory.
130135
pub trait ArrangeBySelf<G: Scope, K: Data> {
131136
fn arrange_by_self<
132137
U: Unsigned+Default,
133138
KH: Fn(&K)->U+'static,
134139
Look: Lookup<K, ::collection::count::Offset>+'static,
135140
LookG: Fn(u64)->Look,
136-
>(&self, key_h: KH, look: LookG) -> ArrangedBySelf<G, K, Look>;
141+
>(&self, key_h: KH, look: LookG) -> Arranged<G, Count<K, G::Timestamp, Look>>
142+
where G::Timestamp: LeastUpperBound;
137143
}
138144

139145
impl<G: Scope, K: Data> ArrangeBySelf<G, K> for Collection<G, K> where G::Timestamp: LeastUpperBound {
@@ -144,7 +150,8 @@ impl<G: Scope, K: Data> ArrangeBySelf<G, K> for Collection<G, K> where G::Timest
144150
Look: Lookup<K, ::collection::count::Offset>+'static,
145151
LookG: Fn(u64)->Look,
146152
>
147-
(&self, key_h: KH, look: LookG) -> ArrangedBySelf<G, K, Look> {
153+
(&self, key_h: KH, look: LookG) -> Arranged<G, Count<K, G::Timestamp, Look>>
154+
where G::Timestamp: LeastUpperBound {
148155

149156
let peers = self.scope().peers();
150157
let mut log_peers = 0;
@@ -208,6 +215,6 @@ impl<G: Scope, K: Data> ArrangeBySelf<G, K> for Collection<G, K> where G::Timest
208215
}
209216
});
210217

211-
ArrangedBySelf { stream: stream, trace: trace }
218+
Arranged { stream: stream, trace: trace }
212219
}
213220
}

0 commit comments

Comments
 (0)