Skip to content

Commit 0af5077

Browse files
committed
A mad attempt to use thread-local everywhere and avoid Sync… (#263)
…which seems to work except for one caveat: Progress now demands to be Sync even though there is no need and it shouldn't have to think that. Have to go back and try to fix that, as it ultimately bubbles up to every method that uses such method generically, including trait bounds which really shouldn't have to be that strict.
1 parent 82ea1b8 commit 0af5077

File tree

23 files changed

+155
-85
lines changed

23 files changed

+155
-85
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ gitoxide-core-tools = ["gitoxide-core/organize", "gitoxide-core/estimate-hours"]
4141
gitoxide-core-blocking-client = ["gitoxide-core/blocking-client"]
4242
gitoxide-core-async-client = ["gitoxide-core/async-client", "futures-lite"]
4343
http-client-curl = ["git-transport-for-configuration-only/http-client-curl"]
44-
fast = ["git-features/threading", "git-features/parallel", "git-features/fast-sha1", "git-features/zlib-ng-compat"]
44+
fast = ["git-features/parallel", "git-features/fast-sha1", "git-features/zlib-ng-compat"]
4545

4646
pretty-cli = ["clap",
4747
"gitoxide-core/serde1",

cargo-features.md

+2-3
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,8 @@ All feature toggles are additive.
106106
* Use scoped threads and channels to parallelize common workloads on multiple objects. If enabled, it is used everywhere
107107
where it makes sense.
108108
* As caches are likely to be used and instantiated per thread, more memory will be used on top of the costs for threads.
109-
* **threading**
110-
* If set, the `threading` module will contain thread-safe primitives for shared ownership and mutation, otherwise these will be their single threaded counterparts.
111-
* This way, single-threaded applications don't have to pay for threaded primitives.
109+
* The `threading` module will contain thread-safe primitives for shared ownership and mutation, otherwise these will be their single threaded counterparts.
110+
* This way, single-threaded applications don't have to pay for threaded primitives.
112111
* **crc32**
113112
* provide a proven and fast `crc32` implementation.
114113
* **io-pipe**

git-features/Cargo.toml

+2-4
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@ test = false
1313

1414
[features]
1515
default = []
16-
threading = ["parking_lot"]
1716
progress = ["prodash"]
18-
parallel = ["crossbeam-utils", "crossbeam-channel", "num_cpus", "jwalk"]
17+
parallel = ["crossbeam-utils", "crossbeam-channel", "num_cpus", "jwalk", "parking_lot"]
1918
fast-sha1 = ["sha-1"]
2019
io-pipe = ["bytes"]
2120
crc32 = ["crc32fast"]
@@ -53,13 +52,12 @@ required-features = ["io-pipe"]
5352
git-hash = { version ="^0.8.0", path = "../git-hash" }
5453

5554

56-
# 'threading' feature
57-
parking_lot = { version = "0.11.0", default-features = false, optional = true }
5855

5956
# 'parallel' feature
6057
crossbeam-utils = { version = "0.8.5", optional = true }
6158
crossbeam-channel = { version = "0.5.0", optional = true }
6259
num_cpus = { version = "1.13.0", optional = true }
60+
parking_lot = { version = "0.11.0", default-features = false, optional = true }
6361

6462
jwalk = { version = "0.6.0", optional = true }
6563
walkdir = { version = "2.3.1", optional = true } # used when parallel is off

git-features/src/parallel/mod.rs

+23
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ fn num_threads(thread_limit: Option<usize>) -> usize {
129129
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
130130
///
131131
/// For parameters, see the documentation of [`in_parallel()`]
132+
#[cfg(feature = "parallel")]
132133
pub fn in_parallel_if<I, S, O, R>(
133134
condition: impl FnOnce() -> bool,
134135
input: impl Iterator<Item = I> + Send,
@@ -149,6 +150,28 @@ where
149150
}
150151
}
151152

153+
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
154+
///
155+
/// For parameters, see the documentation of [`in_parallel()`]
156+
///
157+
/// Note that the non-parallel version is equivalent to [`in_parallel()`].
158+
#[cfg(not(feature = "parallel"))]
159+
pub fn in_parallel_if<I, S, O, R>(
160+
_condition: impl FnOnce() -> bool,
161+
input: impl Iterator<Item = I>,
162+
thread_limit: Option<usize>,
163+
new_thread_state: impl Fn(usize) -> S,
164+
consume: impl Fn(I, &mut S) -> O,
165+
reducer: R,
166+
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
167+
where
168+
R: Reduce<Input = O>,
169+
I: Send,
170+
O: Send,
171+
{
172+
serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
173+
}
174+
152175
///
153176
pub mod reduce;
154177
pub use reduce::Reduce;

git-features/src/threading.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! That way, single-threaded applications will not have to use thread-safe primitives, and simply do not specify the 'threading' feature.
44
5-
#[cfg(feature = "threading")]
5+
#[cfg(feature = "parallel")]
66
mod _impl {
77
use std::sync::Arc;
88

@@ -36,7 +36,7 @@ mod _impl {
3636
}
3737
}
3838

39-
#[cfg(not(feature = "threading"))]
39+
#[cfg(not(feature = "parallel"))]
4040
mod _impl {
4141
use std::{
4242
cell::{Ref, RefCell, RefMut},

git-pack/src/bundle/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ mod verify {
4141
&self,
4242
verify_mode: crate::index::verify::Mode,
4343
traversal: crate::index::traverse::Algorithm,
44-
make_pack_lookup_cache: impl Fn() -> C + Send + Sync,
44+
make_pack_lookup_cache: impl Fn() -> C + Send + Clone,
4545
thread_limit: Option<usize>,
4646
progress: Option<P>,
4747
should_interrupt: Arc<AtomicBool>,
@@ -51,6 +51,7 @@ mod verify {
5151
>
5252
where
5353
P: Progress,
54+
<P as Progress>::SubProgress: Sync,
5455
C: crate::cache::DecodeEntry,
5556
{
5657
self.index.verify_integrity(

git-pack/src/bundle/write/mod.rs

+26-11
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,19 @@ impl crate::Bundle {
3737
/// * the resulting pack may be empty, that is, contains zero objects in some situations. This is a valid reply by a server and should
3838
/// be accounted for.
3939
/// - Empty packs always have the same name and not handling this case will result in at most one superfluous pack.
40-
pub fn write_to_directory(
40+
pub fn write_to_directory<P>(
4141
pack: impl io::BufRead,
4242
directory: Option<impl AsRef<Path>>,
43-
mut progress: impl Progress,
43+
mut progress: P,
4444
should_interrupt: &AtomicBool,
4545
thin_pack_base_object_lookup_fn: Option<ThinPackLookupFn>,
4646
options: Options,
47-
) -> Result<Outcome, Error> {
47+
) -> Result<Outcome, Error>
48+
where
49+
P: Progress + Sync,
50+
<P as Progress>::SubProgress: Sync,
51+
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
52+
{
4853
let mut read_progress = progress.add_child("read pack");
4954
read_progress.init(None, progress::bytes());
5055
let pack = progress::Read {
@@ -131,15 +136,20 @@ impl crate::Bundle {
131136
/// As it sends portions of the input to a thread it requires the 'static lifetime for the interrupt flags. This can only
132137
/// be satisfied by a static AtomicBool which is only suitable for programs that only run one of these operations at a time
133138
/// or don't mind that all of them abort when the flag is set.
134-
pub fn write_to_directory_eagerly(
139+
pub fn write_to_directory_eagerly<P>(
135140
pack: impl io::Read + Send + 'static,
136141
pack_size: Option<u64>,
137142
directory: Option<impl AsRef<Path>>,
138-
mut progress: impl Progress,
143+
mut progress: P,
139144
should_interrupt: &'static AtomicBool,
140145
thin_pack_base_object_lookup_fn: Option<ThinPackLookupFnSend>,
141146
options: Options,
142-
) -> Result<Outcome, Error> {
147+
) -> Result<Outcome, Error>
148+
where
149+
P: Progress + Sync,
150+
<P as Progress>::SubProgress: Sync,
151+
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
152+
{
143153
let mut read_progress = progress.add_child("read pack");
144154
read_progress.init(pack_size.map(|s| s as usize), progress::bytes());
145155
let pack = progress::Read {
@@ -212,9 +222,9 @@ impl crate::Bundle {
212222
})
213223
}
214224

215-
fn inner_write(
225+
fn inner_write<P>(
216226
directory: Option<impl AsRef<Path>>,
217-
mut progress: impl Progress,
227+
mut progress: P,
218228
Options {
219229
thread_limit,
220230
iteration_mode: _,
@@ -223,7 +233,12 @@ impl crate::Bundle {
223233
data_file: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
224234
pack_entries_iter: impl Iterator<Item = Result<data::input::Entry, data::input::Error>>,
225235
should_interrupt: &AtomicBool,
226-
) -> Result<(crate::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error> {
236+
) -> Result<(crate::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error>
237+
where
238+
P: Progress + Sync,
239+
<P as Progress>::SubProgress: Sync,
240+
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
241+
{
227242
let indexing_progress = progress.add_child("create index file");
228243
Ok(match directory {
229244
Some(directory) => {
@@ -280,8 +295,8 @@ impl crate::Bundle {
280295

281296
fn new_pack_file_resolver(
282297
data_file: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
283-
) -> io::Result<impl Fn(data::EntryRange, &mut Vec<u8>) -> Option<()> + Send + Sync> {
284-
let mapped_file = FileBuffer::open(data_file.lock().with_mut(|f| f.path().to_owned())?)?;
298+
) -> io::Result<impl Fn(data::EntryRange, &mut Vec<u8>) -> Option<()> + Send + Clone> {
299+
let mapped_file = Arc::new(FileBuffer::open(data_file.lock().with_mut(|f| f.path().to_owned())?)?);
285300
let pack_data_lookup = move |range: std::ops::Range<u64>, out: &mut Vec<u8>| -> Option<()> {
286301
mapped_file
287302
.get(range.start as usize..range.end as usize)

git-pack/src/cache/delta/traverse/mod.rs

+24-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
sync::atomic::{AtomicBool, Ordering},
44
};
55

6+
use git_features::threading::{get_mut, MutableOnDemand, OwnShared};
67
use git_features::{
78
parallel,
89
parallel::in_parallel_if,
@@ -82,41 +83,46 @@ where
8283
thread_limit: Option<usize>,
8384
should_interrupt: &AtomicBool,
8485
pack_entries_end: u64,
85-
new_thread_state: impl Fn() -> S + Send + Sync,
86+
new_thread_state: impl Fn() -> S + Send + Clone,
8687
inspect_object: MBFN,
8788
) -> Result<VecDeque<Item<T>>, Error>
8889
where
89-
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Sync,
90-
P: Progress + Send,
91-
MBFN: Fn(&mut T, &mut <P as Progress>::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Sync,
90+
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
91+
P: Progress + Send + Sync,
92+
MBFN: Fn(&mut T, &mut <P as Progress>::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Clone,
9293
E: std::error::Error + Send + Sync + 'static,
9394
{
9495
self.set_pack_entries_end(pack_entries_end);
9596
let (chunk_size, thread_limit, _) = parallel::optimize_chunk_size_and_thread_limit(1, None, thread_limit, None);
96-
let object_progress = parking_lot::Mutex::new(object_progress);
97+
let object_progress = OwnShared::new(MutableOnDemand::new(object_progress));
9798

9899
let num_objects = self.items.len();
99100
in_parallel_if(
100101
should_run_in_parallel,
101102
self.iter_root_chunks(chunk_size),
102103
thread_limit,
103-
|thread_index| {
104-
(
105-
Vec::<u8>::with_capacity(4096),
106-
object_progress.lock().add_child(format!("thread {}", thread_index)),
107-
new_thread_state(),
108-
)
104+
{
105+
let object_progress = object_progress.clone();
106+
move |thread_index| {
107+
(
108+
Vec::<u8>::with_capacity(4096),
109+
get_mut(&object_progress).add_child(format!("thread {}", thread_index)),
110+
new_thread_state(),
111+
resolve.clone(),
112+
inspect_object.clone(),
113+
)
114+
}
109115
},
110-
|root_nodes, state| resolve::deltas(root_nodes, state, &resolve, &inspect_object),
111-
Reducer::new(num_objects, &object_progress, size_progress, should_interrupt),
116+
move |root_nodes, state| resolve::deltas(root_nodes, state),
117+
Reducer::new(num_objects, object_progress, size_progress, should_interrupt),
112118
)?;
113119
Ok(self.into_items())
114120
}
115121
}
116122

117123
struct Reducer<'a, P> {
118124
item_count: usize,
119-
progress: &'a parking_lot::Mutex<P>,
125+
progress: OwnShared<MutableOnDemand<P>>,
120126
start: std::time::Instant,
121127
size_progress: P,
122128
should_interrupt: &'a AtomicBool,
@@ -128,11 +134,11 @@ where
128134
{
129135
pub fn new(
130136
num_objects: usize,
131-
progress: &'a parking_lot::Mutex<P>,
137+
progress: OwnShared<MutableOnDemand<P>>,
132138
mut size_progress: P,
133139
should_interrupt: &'a AtomicBool,
134140
) -> Self {
135-
progress.lock().init(Some(num_objects), progress::count("objects"));
141+
get_mut(&progress).init(Some(num_objects), progress::count("objects"));
136142
size_progress.init(None, progress::bytes());
137143
Reducer {
138144
item_count: 0,
@@ -157,15 +163,15 @@ where
157163
let (num_objects, decompressed_size) = input?;
158164
self.item_count += num_objects;
159165
self.size_progress.inc_by(decompressed_size as usize);
160-
self.progress.lock().set(self.item_count);
166+
get_mut(&self.progress).set(self.item_count);
161167
if self.should_interrupt.load(Ordering::SeqCst) {
162168
return Err(Error::Interrupted);
163169
}
164170
Ok(())
165171
}
166172

167173
fn finalize(mut self) -> Result<Self::Output, Self::Error> {
168-
self.progress.lock().show_throughput(self.start);
174+
get_mut(&self.progress).show_throughput(self.start);
169175
self.size_progress.show_throughput(self.start);
170176
Ok(())
171177
}

git-pack/src/cache/delta/traverse/resolve.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@ use crate::{
1212

1313
pub(crate) fn deltas<T, F, P, MBFN, S, E>(
1414
nodes: crate::cache::delta::Chunk<'_, T>,
15-
(bytes_buf, ref mut progress, state): &mut (Vec<u8>, P, S),
16-
resolve: F,
17-
modify_base: MBFN,
15+
(bytes_buf, ref mut progress, state, resolve, modify_base): &mut (Vec<u8>, P, S, F, MBFN),
1816
) -> Result<(usize, u64), Error>
1917
where
20-
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Sync,
18+
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
2119
P: Progress,
2220
MBFN: Fn(&mut T, &mut P, Context<'_, S>) -> Result<(), E>,
2321
T: Default,

git-pack/src/data/output/entry/iter_from_counts.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::data::{output, output::ChunkId};
4242
pub fn iter_from_counts<Find, Cache>(
4343
mut counts: Vec<output::Count>,
4444
db: Find,
45-
make_cache: impl Fn() -> Cache + Send + Clone + Sync + 'static,
45+
make_cache: impl Fn() -> Cache + Send + Clone + 'static,
4646
mut progress: impl Progress,
4747
Options {
4848
version,

git-pack/src/index/traverse/indexed.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ impl index::File {
2424
&self,
2525
check: SafetyCheck,
2626
thread_limit: Option<usize>,
27-
new_processor: impl Fn() -> Processor + Send + Sync,
27+
new_processor: impl Fn() -> Processor + Send + Clone,
2828
mut progress: P,
2929
pack: &crate::data::File,
3030
should_interrupt: Arc<AtomicBool>,
3131
) -> Result<(git_hash::ObjectId, index::traverse::Outcome, P), Error<E>>
3232
where
3333
P: Progress,
34+
<P as Progress>::SubProgress: Sync,
3435
Processor: FnMut(
3536
git_object::Kind,
3637
&[u8],
@@ -78,7 +79,7 @@ impl index::File {
7879
thread_limit,
7980
&should_interrupt,
8081
pack.pack_end() as u64,
81-
|| new_processor(),
82+
move || new_processor(),
8283
|data,
8384
progress,
8485
Context {

git-pack/src/index/traverse/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ impl index::File {
7979
&self,
8080
pack: &crate::data::File,
8181
progress: Option<P>,
82-
new_processor: impl Fn() -> Processor + Send + Sync,
83-
new_cache: impl Fn() -> C + Send + Sync,
82+
new_processor: impl Fn() -> Processor + Send + Clone,
83+
new_cache: impl Fn() -> C + Send + Clone,
8484
Options {
8585
algorithm,
8686
thread_limit,
@@ -90,6 +90,7 @@ impl index::File {
9090
) -> Result<(git_hash::ObjectId, Outcome, Option<P>), Error<E>>
9191
where
9292
P: Progress,
93+
<P as Progress>::SubProgress: Sync,
9394
C: crate::cache::DecodeEntry,
9495
E: std::error::Error + Send + Sync + 'static,
9596
Processor: FnMut(

0 commit comments

Comments
 (0)