Skip to content

Commit addf446

Browse files
committed
fix: more robustness in the face of a trampling-herd of threads loading a single index.
The motivating example is here: praetorian-inc/noseyparker#179 Previously, it was possible for a trampling herd of threads to consolidate the disk state. Most of them would be 'needs-init' threads which could notice that the initialization already happened, and just use that. But a thread might be late for the party and somehow manages to not get any newly loaded index, and thus tries to consolidate with what's on disk again. Then it would again determine no change, and return nothing, causing the caller to abort and not find objects it should find because it wouldn't see the index that it should have seen. The reason the thread got into this mess is that the 'is-load-ongoing' flagging was racy itself, so it would not wait for ongoing loads and just conclude nothing happened. An extra delay (by yielding) now assures it either seees the loading state and waits for it, sees the newly loaded indices. Note that this issue can be reproduced with: ``` './target/release/gix -r repo-with-one-pack -t10 --trace odb stats --extra-header-lookup' ```
1 parent 7b3dc92 commit addf446

File tree

5 files changed

+142
-63
lines changed

5 files changed

+142
-63
lines changed

Diff for: gitoxide-core/src/repository/odb.rs

+67-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::io;
2+
use std::sync::atomic::Ordering;
23

34
use anyhow::bail;
45

@@ -50,6 +51,8 @@ pub mod statistics {
5051
pub struct Options {
5152
pub format: OutputFormat,
5253
pub thread_limit: Option<usize>,
54+
/// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
55+
pub extra_header_lookup: bool,
5356
}
5457
}
5558

@@ -59,7 +62,11 @@ pub fn statistics(
5962
mut progress: impl gix::Progress,
6063
out: impl io::Write,
6164
mut err: impl io::Write,
62-
statistics::Options { format, thread_limit }: statistics::Options,
65+
statistics::Options {
66+
format,
67+
thread_limit,
68+
extra_header_lookup,
69+
}: statistics::Options,
6370
) -> anyhow::Result<()> {
6471
use bytesize::ByteSize;
6572
use gix::odb::{find, HeaderExt};
@@ -76,6 +83,10 @@ pub fn statistics(
7683
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
7784
#[derive(Default)]
7885
struct Statistics {
86+
/// All objects that were used to produce these statistics.
87+
/// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
88+
#[cfg_attr(feature = "serde", serde(skip_serializing))]
89+
ids: Option<Vec<gix::ObjectId>>,
7990
total_objects: usize,
8091
loose_objects: usize,
8192
packed_objects: usize,
@@ -135,14 +146,17 @@ pub fn statistics(
135146
}
136147

137148
impl gix::parallel::Reduce for Reduce {
138-
type Input = Result<Vec<gix::odb::find::Header>, anyhow::Error>;
149+
type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
139150
type FeedProduce = ();
140151
type Output = Statistics;
141152
type Error = anyhow::Error;
142153

143154
fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
144-
for item in items? {
155+
for (id, item) in items? {
145156
self.stats.consume(item);
157+
if let Some(ids) = self.stats.ids.as_mut() {
158+
ids.push(id);
159+
}
146160
}
147161
Ok(())
148162
}
@@ -154,9 +168,9 @@ pub fn statistics(
154168
}
155169

156170
let cancelled = || anyhow::anyhow!("Cancelled by user");
157-
let object_ids = repo.objects.store_ref().iter()?.filter_map(Result::ok);
171+
let object_ids = repo.objects.iter()?.filter_map(Result::ok);
158172
let chunk_size = 1_000;
159-
let stats = if gix::parallel::num_threads(thread_limit) > 1 {
173+
let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
160174
gix::parallel::in_parallel(
161175
gix::interrupt::Iter::new(
162176
gix::features::iter::Chunks {
@@ -166,19 +180,30 @@ pub fn statistics(
166180
cancelled,
167181
),
168182
thread_limit,
169-
move |_| (repo.objects.clone().into_inner(), counter),
183+
{
184+
let objects = repo.objects.clone();
185+
move |_| (objects.clone().into_inner(), counter)
186+
},
170187
|ids, (handle, counter)| {
171188
let ids = ids?;
172-
counter.fetch_add(ids.len(), std::sync::atomic::Ordering::Relaxed);
189+
counter.fetch_add(ids.len(), Ordering::Relaxed);
173190
let out = ids
174191
.into_iter()
175-
.map(|id| handle.header(id))
192+
.map(|id| handle.header(id).map(|hdr| (id, hdr)))
176193
.collect::<Result<Vec<_>, _>>()?;
177194
Ok(out)
178195
},
179-
Reduce::default(),
196+
Reduce {
197+
stats: Statistics {
198+
ids: extra_header_lookup.then(Vec::new),
199+
..Default::default()
200+
},
201+
},
180202
)?
181203
} else {
204+
if extra_header_lookup {
205+
bail!("extra-header-lookup is only meaningful in threaded mode");
206+
}
182207
let mut stats = Statistics::default();
183208

184209
for (count, id) in object_ids.enumerate() {
@@ -193,6 +218,39 @@ pub fn statistics(
193218

194219
progress.show_throughput(start);
195220

221+
if let Some(mut ids) = stats.ids.take() {
222+
// Critical to re-open the repo to assure we don't have any ODB state and start fresh.
223+
let start = std::time::Instant::now();
224+
let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
225+
progress.set_name("re-counting".into());
226+
progress.init(Some(ids.len()), gix::progress::count("objects"));
227+
let counter = progress.counter();
228+
counter.store(0, Ordering::Relaxed);
229+
let errors = gix::parallel::in_parallel_with_slice(
230+
&mut ids,
231+
thread_limit,
232+
{
233+
let objects = repo.objects.clone();
234+
move |_| (objects.clone().into_inner(), counter, false)
235+
},
236+
|id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
237+
counter.fetch_add(1, Ordering::Relaxed);
238+
if let Err(_err) = odb.header(id) {
239+
*has_error = true;
240+
gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found");
241+
}
242+
Ok(())
243+
},
244+
|| Some(std::time::Duration::from_millis(100)),
245+
|(_, _, has_error)| has_error,
246+
)?;
247+
248+
progress.show_throughput(start);
249+
if errors.contains(&true) {
250+
bail!("At least one object couldn't be looked up even though it must exist");
251+
}
252+
}
253+
196254
#[cfg(feature = "serde")]
197255
{
198256
serde_json::to_writer_pretty(out, &stats)?;

Diff for: gix-odb/src/store_impls/dynamic/load_index.rs

+62-49
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
ops::Deref,
55
path::{Path, PathBuf},
66
sync::{
7-
atomic::{AtomicU16, AtomicUsize, Ordering},
7+
atomic::{AtomicU16, Ordering},
88
Arc,
99
},
1010
time::SystemTime,
@@ -86,7 +86,7 @@ impl super::Store {
8686
Ok(Some(self.collect_snapshot()))
8787
} else {
8888
// always compare to the latest state
89-
// Nothing changed in the mean time, try to load another index…
89+
// Nothing changed in the meantime, try to load another index…
9090
if self.load_next_index(index) {
9191
Ok(Some(self.collect_snapshot()))
9292
} else {
@@ -119,7 +119,7 @@ impl super::Store {
119119
let slot = &self.files[index.slot_indices[slot_map_index]];
120120
let _lock = slot.write.lock();
121121
if slot.generation.load(Ordering::SeqCst) > index.generation {
122-
// There is a disk consolidation in progress which just overwrote a slot that cold be disposed with some other
122+
// There is a disk consolidation in progress which just overwrote a slot that could be disposed with some other
123123
// index, one we didn't intend to load.
124124
// Continue with the next slot index in the hope there is something else we can do…
125125
continue 'retry_with_next_slot_index;
@@ -128,14 +128,18 @@ impl super::Store {
128128
let bundle_mut = Arc::make_mut(&mut bundle);
129129
if let Some(files) = bundle_mut.as_mut() {
130130
// these are always expected to be set, unless somebody raced us. We handle this later by retrying.
131-
let _loaded_count = IncOnDrop(&index.loaded_indices);
132-
match files.load_index(self.object_hash) {
131+
let res = {
132+
let res = files.load_index(self.object_hash);
133+
slot.files.store(bundle);
134+
index.loaded_indices.fetch_add(1, Ordering::SeqCst);
135+
res
136+
};
137+
match res {
133138
Ok(_) => {
134-
slot.files.store(bundle);
135139
break 'retry_with_next_slot_index;
136140
}
137-
Err(_) => {
138-
slot.files.store(bundle);
141+
Err(_err) => {
142+
gix_features::trace::error!(err=?_err, "Failed to load index file - some objects may seem to not exist");
139143
continue 'retry_with_next_slot_index;
140144
}
141145
}
@@ -145,9 +149,14 @@ impl super::Store {
145149
// There can be contention as many threads start working at the same time and take all the
146150
// slots to load indices for. Some threads might just be left-over and have to wait for something
147151
// to change.
148-
let num_load_operations = index.num_indices_currently_being_loaded.deref();
149152
// TODO: potentially hot loop - could this be a condition variable?
150-
while num_load_operations.load(Ordering::Relaxed) != 0 {
153+
// This is a timing-based fix for the case that the `num_indices_being_loaded` isn't yet incremented,
154+
// and we might break out here without actually waiting for the loading operation. Then we'd fail to
155+
// observe a change and the underlying handler would not have all the indices it needs at its disposal.
156+
// Yielding means we will definitely loose enough time to observe the ongoing operation,
157+
// or its effects.
158+
std::thread::yield_now();
159+
while index.num_indices_currently_being_loaded.load(Ordering::SeqCst) != 0 {
151160
std::thread::yield_now()
152161
}
153162
break 'retry_with_next_slot_index;
@@ -197,7 +206,7 @@ impl super::Store {
197206

198207
// We might not be able to detect by pointer if the state changed, as this itself is racy. So we keep track of double-initialization
199208
// using a flag, which means that if `needs_init` was true we saw the index uninitialized once, but now that we are here it's
200-
// initialized meaning that somebody was faster and we couldn't detect it by comparisons to the index.
209+
// initialized meaning that somebody was faster, and we couldn't detect it by comparisons to the index.
201210
// If so, make sure we collect the snapshot instead of returning None in case nothing actually changed, which is likely with a
202211
// race like this.
203212
if !was_uninitialized && needs_init {
@@ -397,18 +406,19 @@ impl super::Store {
397406
// generation stays the same, as it's the same value still but scheduled for eventual removal.
398407
}
399408
} else {
409+
// set the generation before we actually change the value, otherwise readers of old generations could observe the new one.
410+
// We rather want them to turn around here and update their index, which, by that time, might actually already be available.
411+
// If not, they would fail unable to load a pack or index they need, but that's preferred over returning wrong objects.
412+
// Safety: can't race as we hold the lock, have to set the generation beforehand to help avoid others to observe the value.
413+
slot.generation.store(generation, Ordering::SeqCst);
400414
*files_mut = None;
401415
};
402416
slot.files.store(files);
403-
if !needs_stable_indices {
404-
// Not racy due to lock, generation must be set after unsetting the slot value AND storing it.
405-
slot.generation.store(generation, Ordering::SeqCst);
406-
}
407417
}
408418

409419
let new_index = self.index.load();
410420
Ok(if index.state_id() == new_index.state_id() {
411-
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops
421+
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops.
412422
None
413423
} else {
414424
if load_new_index {
@@ -619,34 +629,44 @@ impl super::Store {
619629
}
620630

621631
pub(crate) fn collect_snapshot(&self) -> Snapshot {
632+
// We don't observe changes-on-disk in our 'wait-for-load' loop.
633+
// That loop is meant to help assure the marker (which includes the amount of loaded indices) matches
634+
// the actual amount of indices we collect.
622635
let index = self.index.load();
623-
let indices = if index.is_initialized() {
624-
index
625-
.slot_indices
626-
.iter()
627-
.map(|idx| (*idx, &self.files[*idx]))
628-
.filter_map(|(id, file)| {
629-
let lookup = match (**file.files.load()).as_ref()? {
630-
types::IndexAndPacks::Index(bundle) => handle::SingleOrMultiIndex::Single {
631-
index: bundle.index.loaded()?.clone(),
632-
data: bundle.data.loaded().cloned(),
633-
},
634-
types::IndexAndPacks::MultiIndex(multi) => handle::SingleOrMultiIndex::Multi {
635-
index: multi.multi_index.loaded()?.clone(),
636-
data: multi.data.iter().map(|f| f.loaded().cloned()).collect(),
637-
},
638-
};
639-
handle::IndexLookup { file: lookup, id }.into()
640-
})
641-
.collect()
642-
} else {
643-
Vec::new()
644-
};
636+
loop {
637+
if index.num_indices_currently_being_loaded.deref().load(Ordering::SeqCst) != 0 {
638+
std::thread::yield_now();
639+
continue;
640+
}
641+
let marker = index.marker();
642+
let indices = if index.is_initialized() {
643+
index
644+
.slot_indices
645+
.iter()
646+
.map(|idx| (*idx, &self.files[*idx]))
647+
.filter_map(|(id, file)| {
648+
let lookup = match (**file.files.load()).as_ref()? {
649+
types::IndexAndPacks::Index(bundle) => handle::SingleOrMultiIndex::Single {
650+
index: bundle.index.loaded()?.clone(),
651+
data: bundle.data.loaded().cloned(),
652+
},
653+
types::IndexAndPacks::MultiIndex(multi) => handle::SingleOrMultiIndex::Multi {
654+
index: multi.multi_index.loaded()?.clone(),
655+
data: multi.data.iter().map(|f| f.loaded().cloned()).collect(),
656+
},
657+
};
658+
handle::IndexLookup { file: lookup, id }.into()
659+
})
660+
.collect()
661+
} else {
662+
Vec::new()
663+
};
645664

646-
Snapshot {
647-
indices,
648-
loose_dbs: Arc::clone(&index.loose_dbs),
649-
marker: index.marker(),
665+
return Snapshot {
666+
indices,
667+
loose_dbs: Arc::clone(&index.loose_dbs),
668+
marker,
669+
};
650670
}
651671
}
652672
}
@@ -669,13 +689,6 @@ impl<'a> Drop for IncOnNewAndDecOnDrop<'a> {
669689
}
670690
}
671691

672-
struct IncOnDrop<'a>(&'a AtomicUsize);
673-
impl<'a> Drop for IncOnDrop<'a> {
674-
fn drop(&mut self) {
675-
self.0.fetch_add(1, Ordering::SeqCst);
676-
}
677-
}
678-
679692
pub(crate) enum Either {
680693
IndexPath(PathBuf),
681694
MultiIndexFile(Arc<gix_pack::multi_index::File>),

Diff for: gix-odb/src/store_impls/dynamic/types.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub(crate) type AtomicGeneration = AtomicU32;
1818

1919
/// A way to indicate which pack indices we have seen already and which of them are loaded, along with an idea
2020
/// of whether stored `PackId`s are still usable.
21-
#[derive(Default, Copy, Clone)]
21+
#[derive(Default, Copy, Clone, Debug)]
2222
pub struct SlotIndexMarker {
2323
/// The generation the `loaded_until_index` belongs to. Indices of different generations are completely incompatible.
2424
/// This value changes once the internal representation is compacted, something that may happen only if there is no handle
@@ -262,7 +262,7 @@ impl IndexAndPacks {
262262
}
263263
}
264264

265-
/// If we are garbage, put ourselves into the loaded state. Otherwise put ourselves back to unloaded.
265+
/// If we are garbage, put ourselves into the loaded state. Otherwise, put ourselves back to unloaded.
266266
pub(crate) fn put_back(&mut self) {
267267
match self {
268268
IndexAndPacks::Index(bundle) => {

Diff for: src/plumbing/main.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ pub fn main() -> Result<()> {
11531153
),
11541154
},
11551155
Subcommands::Odb(cmd) => match cmd {
1156-
odb::Subcommands::Stats => prepare_and_run(
1156+
odb::Subcommands::Stats { extra_header_lookup } => prepare_and_run(
11571157
"odb-stats",
11581158
trace,
11591159
auto_verbose,
@@ -1166,7 +1166,11 @@ pub fn main() -> Result<()> {
11661166
progress,
11671167
out,
11681168
err,
1169-
core::repository::odb::statistics::Options { format, thread_limit },
1169+
core::repository::odb::statistics::Options {
1170+
format,
1171+
thread_limit,
1172+
extra_header_lookup,
1173+
},
11701174
)
11711175
},
11721176
),

Diff for: src/plumbing/options/mod.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,11 @@ pub mod odb {
586586
Info,
587587
/// Count and obtain information on all, possibly duplicate, objects in the database.
588588
#[clap(visible_alias = "statistics")]
589-
Stats,
589+
Stats {
590+
/// Lookup headers again, but without preloading indices.
591+
#[clap(long)]
592+
extra_header_lookup: bool,
593+
},
590594
}
591595
}
592596

0 commit comments

Comments
 (0)