Skip to content

File watching using notify #1310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ fuzzy-matcher = "0.3"
gh-emoji = { version = "1.0", optional = true }
itertools = "0.10"
log = "0.4"
notify = "5.0"
notify-debouncer-mini = "0.2"
once_cell = "1"
rayon-core = "1.9"
ron = "0.8"
Expand Down
4 changes: 3 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ allow = [
"MIT",
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause"
"BSD-3-Clause",
"CC0-1.0",
"ISC"
]
copyleft = "warn"
allow-osi-fsf-free = "neither"
Expand Down
2 changes: 1 addition & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl App {
theme: Theme,
key_config: KeyConfig,
) -> Self {
log::trace!("open repo at: {:?}", repo);
log::trace!("open repo at: {:?}", &repo);

let queue = Queue::new();
let theme = Rc::new(theme);
Expand Down
25 changes: 15 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ mod strings;
mod tabs;
mod ui;
mod version;
mod watcher;

use crate::{app::App, args::process_cmdline};
use anyhow::{bail, Result};
use app::QuitState;
use asyncgit::{sync::RepoPath, AsyncGitNotification};
use asyncgit::{
sync::{utils::repo_work_dir, RepoPath},
AsyncGitNotification,
};
use backtrace::Backtrace;
use crossbeam_channel::{tick, unbounded, Receiver, Select};
use crossterm::{
Expand All @@ -67,14 +71,14 @@ use tui::{
Terminal,
};
use ui::style::Theme;
use watcher::RepoWatcher;

static TICK_INTERVAL: Duration = Duration::from_secs(5);
static SPINNER_INTERVAL: Duration = Duration::from_millis(80);

///
#[derive(Clone)]
pub enum QueueEvent {
Tick,
Notify,
SpinnerUpdate,
AsyncEvent(AsyncNotification),
InputEvent(InputEvent),
Expand Down Expand Up @@ -161,7 +165,8 @@ fn run_app(
let (tx_app, rx_app) = unbounded();

let rx_input = input.receiver();
let ticker = tick(TICK_INTERVAL);
let watcher = RepoWatcher::new(repo_work_dir(&repo)?.as_str())?;
let rx_watcher = watcher.receiver();
let spinner_ticker = tick(SPINNER_INTERVAL);

let mut app = App::new(
Expand All @@ -179,13 +184,13 @@ fn run_app(
loop {
let event = if first_update {
first_update = false;
QueueEvent::Tick
QueueEvent::Notify
} else {
select_event(
&rx_input,
&rx_git,
&rx_app,
&ticker,
&rx_watcher,
&spinner_ticker,
)?
};
Expand All @@ -208,7 +213,7 @@ fn run_app(
}
app.event(ev)?;
}
QueueEvent::Tick => app.update()?,
QueueEvent::Notify => app.update()?,
QueueEvent::AsyncEvent(ev) => {
if !matches!(
ev,
Expand Down Expand Up @@ -282,15 +287,15 @@ fn select_event(
rx_input: &Receiver<InputEvent>,
rx_git: &Receiver<AsyncGitNotification>,
rx_app: &Receiver<AsyncAppNotification>,
rx_ticker: &Receiver<Instant>,
rx_notify: &Receiver<()>,
rx_spinner: &Receiver<Instant>,
) -> Result<QueueEvent> {
let mut sel = Select::new();

sel.recv(rx_input);
sel.recv(rx_git);
sel.recv(rx_app);
sel.recv(rx_ticker);
sel.recv(rx_notify);
sel.recv(rx_spinner);

let oper = sel.select();
Expand All @@ -304,7 +309,7 @@ fn select_event(
2 => oper.recv(rx_app).map(|e| {
QueueEvent::AsyncEvent(AsyncNotification::App(e))
}),
3 => oper.recv(rx_ticker).map(|_| QueueEvent::Tick),
3 => oper.recv(rx_notify).map(|_| QueueEvent::Notify),
4 => oper.recv(rx_spinner).map(|_| QueueEvent::SpinnerUpdate),
_ => bail!("unknown select source"),
}?;
Expand Down
70 changes: 70 additions & 0 deletions src/watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use anyhow::Result;
use crossbeam_channel::{unbounded, Sender};
use notify::{Error, RecommendedWatcher, RecursiveMode};
use notify_debouncer_mini::{
new_debouncer, DebouncedEvent, Debouncer,
};
use std::{
path::Path, sync::mpsc::RecvError, thread, time::Duration,
};

pub struct RepoWatcher {
receiver: crossbeam_channel::Receiver<()>,
#[allow(dead_code)]
debouncer: Debouncer<RecommendedWatcher>,
}

impl RepoWatcher {
pub fn new(workdir: &str) -> Result<Self> {
let (tx, rx) = std::sync::mpsc::channel();

let mut debouncer =
new_debouncer(Duration::from_secs(2), None, tx)?;

debouncer
.watcher()
.watch(Path::new(workdir), RecursiveMode::Recursive)?;

let (out_tx, out_rx) = unbounded();

thread::spawn(move || {
if let Err(e) = Self::forwarder(&rx, &out_tx) {
//maybe we need to restart the forwarder now?
log::error!("notify receive error: {}", e);
}
});

Ok(Self {
debouncer,
receiver: out_rx,
})
}

///
pub fn receiver(&self) -> crossbeam_channel::Receiver<()> {
self.receiver.clone()
}

fn forwarder(
receiver: &std::sync::mpsc::Receiver<
Result<Vec<DebouncedEvent>, Vec<Error>>,
>,
sender: &Sender<()>,
) -> Result<(), RecvError> {
loop {
let ev = receiver.recv()?;

if let Ok(ev) = ev {
log::debug!("notify events: {}", ev.len());

for (idx, ev) in ev.iter().enumerate() {
log::debug!("notify [{}]: {:?}", idx, ev);
}

if !ev.is_empty() {
sender.send(()).expect("send error");
}
}
}
}
}