Skip to content

Commit f015c15

Browse files
author
dking
committed
Multiplexers can take function pointers directly
This makes multiplexers slightly more ergonomic. Due to rust-lang/rust#28229 (functions implement Copy but not Clone) it's not currently possible to do with closures yet but it's better than nothing
1 parent 09b7922 commit f015c15

File tree

2 files changed

+93
-16
lines changed

2 files changed

+93
-16
lines changed

src/bin/bench.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,43 @@
11
extern crate time;
22

33
extern crate rs_ducts;
4+
45
use rs_ducts::multiplex;
56
use rs_ducts::map;
67

78
use rs_ducts::Pipeline;
89

10+
// tuneables for different workloads
11+
const THREAD_COUNT: usize = 1000;
912
const WORK_COUNT: u64 = 1000;
1013
const WORK_FACTOR: u64 = 34;
1114
const BUFFSIZE: usize = 5;
1215

1316
fn bench_single() {
1417
let source: Vec<u64> = (1..WORK_COUNT).collect();
1518

16-
Pipeline::new(source, 5)
17-
.map(|x| fib(WORK_FACTOR) + x, BUFFSIZE)
18-
.drain();
19+
Pipeline::new(source, 5).map(fib_work, BUFFSIZE).drain();
1920
}
2021

22+
2123
fn bench_multi() {
2224
let source: Vec<u64> = (1..WORK_COUNT).collect();
2325

24-
let mappers = (0..4)
25-
.map(|_| map::Mapper::new(|x| fib(WORK_FACTOR) + x))
26-
.collect();
2726
Pipeline::new(source, BUFFSIZE)
28-
.then(multiplex::Multiplex::new(mappers, BUFFSIZE), BUFFSIZE)
27+
.then(multiplex::Multiplex::from(map::Mapper::new(fib_work),
28+
THREAD_COUNT,
29+
BUFFSIZE),
30+
BUFFSIZE)
2931
.drain();
3032
}
3133

34+
3235
// just something expensive
36+
fn fib_work(n: u64) -> u64 {
37+
fib(WORK_FACTOR) + n
38+
}
39+
40+
3341
fn fib(n: u64) -> u64 {
3442
if n == 0 || n == 1 {
3543
1
@@ -38,6 +46,7 @@ fn fib(n: u64) -> u64 {
3846
}
3947
}
4048

49+
4150
pub fn timeit<F>(name: &str, func: F)
4251
where F: FnOnce() -> () + Copy
4352
{

src/lib.rs

Lines changed: 77 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use std::sync::mpsc;
22
use std::thread;
33

4+
45
#[derive(Debug)]
56
pub struct Pipeline<Output>
67
where Output: Send + 'static
78
{
89
rx: mpsc::Receiver<Output>,
910
}
1011

12+
1113
impl<Output> Pipeline<Output>
1214
where Output: Send
1315
{
@@ -73,6 +75,7 @@ impl<Output> Pipeline<Output>
7375
}
7476
}
7577

78+
7679
impl<Output> IntoIterator for Pipeline<Output>
7780
where Output: Send
7881
{
@@ -85,6 +88,7 @@ impl<Output> IntoIterator for Pipeline<Output>
8588
}
8689
}
8790

91+
8892
pub trait PipelineEntry<In, Out> {
8993
fn process<I: IntoIterator<Item = In>>(self,
9094
rx: I,
@@ -134,8 +138,22 @@ pub mod map {
134138
}
135139
}
136140
}
141+
142+
impl<In, Out, Func> Clone for Mapper<In, Out, Func>
143+
where Func: Fn(In) -> Out + Copy
144+
{
145+
fn clone(&self) -> Self {
146+
Mapper::new(self.func)
147+
}
148+
}
149+
150+
impl<In, Out, Func> Copy for Mapper<In, Out, Func>
151+
where Func: Fn(In) -> Out + Copy
152+
{
153+
}
137154
}
138155

156+
139157
pub mod filter {
140158
use std::marker::PhantomData;
141159
use std::sync::mpsc;
@@ -180,6 +198,10 @@ pub mod filter {
180198

181199

182200
pub mod multiplex {
201+
// work around https://github.com/rust-lang/rust/issues/28229
202+
// (functions implement Copy but not Clone)
203+
#![cfg_attr(feature="cargo-clippy", allow(expl_impl_clone_on_copy))]
204+
183205
use std::marker::PhantomData;
184206
use std::sync::mpsc;
185207
use std::sync::{Arc, Mutex};
@@ -189,7 +211,7 @@ pub mod multiplex {
189211

190212
#[derive(Debug)]
191213
pub struct Multiplex<In, Out, Entry>
192-
where Entry: PipelineEntry<In, Out>
214+
where Entry: PipelineEntry<In, Out> + Send
193215
{
194216
entries: Vec<Entry>,
195217
buffsize: usize,
@@ -199,6 +221,14 @@ pub mod multiplex {
199221
out_: PhantomData<Out>,
200222
}
201223

224+
impl<In, Out, Entry> Multiplex<In, Out, Entry>
225+
where Entry: PipelineEntry<In, Out> + Send + Copy
226+
{
227+
pub fn from(entry: Entry, workers: usize, buffsize: usize) -> Self {
228+
Self::new((0..workers).map(|_| entry).collect(), buffsize)
229+
}
230+
}
231+
202232
impl<In, Out, Entry> Multiplex<In, Out, Entry>
203233
where Entry: PipelineEntry<In, Out> + Send
204234
{
@@ -324,21 +354,59 @@ mod tests {
324354
assert_eq!(produced, expect);
325355
}
326356

357+
// just something expensive
358+
fn fib_work(n: u64) -> u64 {
359+
const WORK_FACTOR: u64 = 10;
360+
fib(WORK_FACTOR) + n
361+
}
362+
363+
fn fib(n: u64) -> u64 {
364+
if n == 0 || n == 1 {
365+
1
366+
} else {
367+
fib(n - 1) + fib(n - 2)
368+
}
369+
}
370+
371+
#[test]
372+
fn multiplex_map_function() {
373+
// we have two signatures for Multiplex, one that takes a function
374+
// pointer and one that can take a closure. THis is the function pointer
375+
// side
376+
377+
let buffsize: usize = 10;
378+
let workers: usize = 10;
379+
380+
let source: Vec<u64> = (1..1000).collect();
381+
let expect: Vec<u64> =
382+
source.clone().into_iter().map(fib_work).collect();
383+
384+
let pbb: Pipeline<u64> = Pipeline::new(source, buffsize)
385+
.then(multiplex::Multiplex::from(map::Mapper::new(fib_work),
386+
workers,
387+
buffsize),
388+
buffsize);
389+
let mut produced: Vec<u64> = pbb.into_iter().collect();
390+
391+
produced.sort(); // these may arrive out of order
392+
assert_eq!(produced, expect);
393+
}
394+
327395
#[test]
328-
fn multiplex_map() {
396+
fn multiplex_map_closure() {
329397
let buffsize: usize = 10;
398+
let workers: usize = 10;
330399

331400
let source: Vec<i32> = (1..1000).collect();
332401
let expect: Vec<i32> = source.iter().map(|x| x * 2).collect();
333402

334403
let pbb: Pipeline<i32> = Pipeline::new(source, buffsize)
335-
// TOOD multiplex takes a list of PipelineEntry but it would be
336-
// nicer if it just took one and was able to clone it
337-
.then(
338-
multiplex::Multiplex::new(
339-
(0..10).map(|_| map::Mapper::new(|i| i*2)).collect(),
340-
buffsize),
341-
buffsize);
404+
.then(multiplex::Multiplex::new((0..workers)
405+
.map(|_| {
406+
map::Mapper::new(|i| i * 2)
407+
}).collect(),
408+
buffsize),
409+
buffsize);
342410
let mut produced: Vec<i32> = pbb.into_iter().collect();
343411

344412
produced.sort(); // these may arrive out of order

0 commit comments

Comments
 (0)