diff --git a/asyncgit/src/tags.rs b/asyncgit/src/tags.rs index 544e9f3a35..31b58c879c 100644 --- a/asyncgit/src/tags.rs +++ b/asyncgit/src/tags.rs @@ -1,4 +1,5 @@ use crate::{ + asyncjob::{AsyncJob, AsyncSingleJob, RunParams}, error::Result, hash, sync::{self}, @@ -6,10 +7,7 @@ use crate::{ }; use crossbeam_channel::Sender; use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; use sync::Tags; @@ -23,39 +21,36 @@ struct TagsResult { /// pub struct AsyncTags { - last: Arc>>, + last: Option<(Instant, TagsResult)>, sender: Sender, - pending: Arc, + job: AsyncSingleJob, } impl AsyncTags { /// pub fn new(sender: &Sender) -> Self { Self { - last: Arc::new(Mutex::new(None)), + last: None, sender: sender.clone(), - pending: Arc::new(AtomicUsize::new(0)), + job: AsyncSingleJob::new(sender.clone()), } } /// last fetched result - pub fn last(&mut self) -> Result> { - let last = self.last.lock()?; - - Ok(last.clone().map(|last| last.1.tags)) + pub fn last(&self) -> Result> { + Ok(self.last.as_ref().map(|result| result.1.tags.clone())) } /// pub fn is_pending(&self) -> bool { - self.pending.load(Ordering::Relaxed) > 0 + self.job.is_pending() } - fn is_outdated(&self, dur: Duration) -> Result { - let last = self.last.lock()?; - - Ok(last + /// + fn is_outdated(&self, dur: Duration) -> bool { + self.last .as_ref() - .map_or(true, |(last_time, _)| last_time.elapsed() > dur)) + .map_or(true, |(last_time, _)| last_time.elapsed() > dur) } /// @@ -66,70 +61,83 @@ impl AsyncTags { ) -> Result<()> { log::trace!("request"); - if !force && self.is_pending() { + if !force && self.job.is_pending() { return Ok(()); } - let outdated = self.is_outdated(dur)?; + let outdated = self.is_outdated(dur); if !force && !outdated { return Ok(()); } - let arc_last = Arc::clone(&self.last); - let sender = self.sender.clone(); - let arc_pending = Arc::clone(&self.pending); - - self.pending.fetch_add(1, Ordering::Relaxed); - - rayon_core::spawn(move || { - let notify = Self::getter(&arc_last, outdated) - .expect("error getting tags"); - - arc_pending.fetch_sub(1, Ordering::Relaxed); - - sender - .send(if notify { - AsyncGitNotification::Tags - } else { - AsyncGitNotification::FinishUnchanged - }) - .expect("error sending notify"); - }); + if outdated { + self.job.spawn(AsyncTagsJob::new( + self.last + .as_ref() + .map_or(0, |(_, result)| result.hash), + )); + } else { + self.sender + .send(AsyncGitNotification::FinishUnchanged)?; + } Ok(()) } +} - fn getter( - arc_last: &Arc>>, - outdated: bool, - ) -> Result { - let tags = sync::get_tags(CWD)?; +enum JobState { + Request(u64), + Response(Result<(Instant, TagsResult)>), +} - let hash = hash(&tags); +/// +#[derive(Clone, Default)] +pub struct AsyncTagsJob { + state: Arc>>, +} - if !outdated - && Self::last_hash(arc_last) - .map(|last| last == hash) - .unwrap_or_default() - { - return Ok(false); +/// +impl AsyncTagsJob { + /// + pub fn new(last_hash: u64) -> Self { + Self { + state: Arc::new(Mutex::new(Some(JobState::Request( + last_hash, + )))), } + } +} - { - let mut last = arc_last.lock()?; - let now = Instant::now(); - *last = Some((now, TagsResult { hash, tags })); - } +impl AsyncJob for AsyncTagsJob { + type Notification = AsyncGitNotification; + type Progress = (); - Ok(true) - } + fn run( + &mut self, + _params: RunParams, + ) -> Result { + let mut notification = AsyncGitNotification::FinishUnchanged; + if let Ok(mut state) = self.state.lock() { + *state = state.take().map(|state| match state { + JobState::Request(last_hash) => { + let tags = sync::get_tags(CWD); + + JobState::Response(tags.map(|tags| { + let hash = hash(&tags); + if last_hash != hash { + notification = AsyncGitNotification::Tags; + } + + (Instant::now(), TagsResult { hash, tags }) + })) + } + JobState::Response(result) => { + JobState::Response(result) + } + }); + } - fn last_hash( - last: &Arc>>, - ) -> Option { - last.lock() - .ok() - .and_then(|last| last.as_ref().map(|(_, last)| last.hash)) + Ok(notification) } }