|
| 1 | +import time |
| 2 | +from watchdog.observers import Observer |
| 3 | +from watchdog.events import FileSystemEventHandler |
| 4 | +import subprocess |
| 5 | +from threading import Timer |
| 6 | + |
| 7 | +WATCHER_FILES = [ |
| 8 | + "src/schemas.py", |
| 9 | + "src/main.py", |
| 10 | +] |
| 11 | + |
| 12 | + |
| 13 | +class MyHandler(FileSystemEventHandler): |
| 14 | + def __init__(self): |
| 15 | + super().__init__() |
| 16 | + self.debounce_timer = None # Timer to debounce multiple events |
| 17 | + self.last_modified = 0 |
| 18 | + |
| 19 | + def on_modified(self, event): |
| 20 | + # Check if the event is for a file and not a directory |
| 21 | + if not event.is_directory: |
| 22 | + if event.src_path in WATCHER_FILES: # Specify the file(s) you want to track |
| 23 | + # If there's an existing debounce timer, cancel it |
| 24 | + if self.debounce_timer: |
| 25 | + self.debounce_timer.cancel() |
| 26 | + |
| 27 | + # Start a new timer (e.g., 1 second delay) to wait before executing |
| 28 | + self.debounce_timer = Timer(1.0, self.execute_command, [event.src_path]) |
| 29 | + self.debounce_timer.start() |
| 30 | + |
| 31 | + def execute_command(self, file_path): |
| 32 | + print(f"File {file_path} has been modified and saved.") |
| 33 | + |
| 34 | + try: |
| 35 | + # Execute your first command |
| 36 | + subprocess.run( |
| 37 | + ["poetry", "run", "python", "-m", "commands.generate_openapi_schema"], |
| 38 | + check=True, |
| 39 | + ) |
| 40 | + |
| 41 | + except subprocess.CalledProcessError as e: |
| 42 | + print(f"An error occurred: {e}") |
| 43 | + |
| 44 | + |
| 45 | +if __name__ == "__main__": |
| 46 | + path = "src" # Directory to watch |
| 47 | + event_handler = MyHandler() |
| 48 | + observer = Observer() |
| 49 | + observer.schedule(event_handler, path, recursive=False) |
| 50 | + |
| 51 | + observer.start() |
| 52 | + try: |
| 53 | + while True: |
| 54 | + time.sleep(1) |
| 55 | + except KeyboardInterrupt: |
| 56 | + observer.stop() |
| 57 | + observer.join() |
0 commit comments