""" Path Juggler - Core Logic This module contains the file watching and processing logic, separated from any UI concerns. """ import os import re import shutil import subprocess import time import logging from pathlib import Path from threading import Thread, Event from typing import Callable from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # Configuration WATCH_FOLDERS = { "AM": Path.home() / "Movies" / "AM", "PM": Path.home() / "Movies" / "PM", } DESTINATION_BASE = Path.home() / "Movies" / "Honey Dailies Transfer" VOLUMES_PATH = Path("/Volumes") VOLUME_PATTERN = re.compile(r"^HONEY \d+$") PROJECT_FOLDER = "Honey" FOOTAGE_FOLDER = "02 FOOTAGE" PROXIES_FOLDER = "03 EDITORIAL PROXIES" # Regex to parse filenames like A080C002_251209R2.mxf FILENAME_PATTERN = re.compile( r"^([A-Z])(\d{3})C(\d{3})_(\d{6})([A-Z0-9]*)\.mxf$", re.IGNORECASE ) logger = logging.getLogger(__name__) def is_file_open(file_path: Path) -> bool: """Check if a file is currently open by any process using lsof.""" try: result = subprocess.run( ["lsof", str(file_path)], capture_output=True, text=True, timeout=10 ) return result.returncode == 0 except subprocess.TimeoutExpired: logger.warning(f"lsof timed out for {file_path}") return True except Exception as e: logger.warning(f"lsof failed: {e}") return True def get_file_owner_process(file_path: Path) -> str | None: """Get the name of the process that has the file open.""" try: result = subprocess.run( ["lsof", "-t", str(file_path)], capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout.strip(): pid = result.stdout.strip().split('\n')[0] ps_result = subprocess.run( ["ps", "-p", pid, "-o", "comm="], capture_output=True, text=True, timeout=5 ) if ps_result.returncode == 0: return ps_result.stdout.strip() except Exception: pass return None def wait_for_file_ready( file_path: Path, stop_event: Event | None = None, check_interval: float = 2.0, stable_duration: float = 5.0, timeout: float = 3600.0 ) -> bool: """Wait until a file is no longer being written to.""" start_time = time.time() last_size = -1 size_stable_since = None logged_process = False logger.info(f"Waiting for file to be ready: {file_path.name}") while time.time() - start_time < timeout: # Check if we should stop if stop_event and stop_event.is_set(): logger.info(f"Stop requested, abandoning: {file_path.name}") return False if not file_path.exists(): logger.warning(f"File disappeared: {file_path.name}") return False if not is_file_open(file_path): logger.info(f"File closed by all processes: {file_path.name}") return True if not logged_process: owner = get_file_owner_process(file_path) if owner: logger.info(f"File held by: {owner}") logged_process = True current_size = file_path.stat().st_size if current_size == last_size: if size_stable_since is None: size_stable_since = time.time() elif time.time() - size_stable_since >= stable_duration: time.sleep(1) if not is_file_open(file_path): logger.info(f"File ready (closed): {file_path.name}") return True else: size_stable_since = None size_mb = current_size / (1024 * 1024) logger.debug(f"File growing: {file_path.name} ({size_mb:.1f} MB)") last_size = current_size time.sleep(check_interval) logger.error(f"Timeout waiting for file: {file_path.name}") return False def get_honey_volumes() -> list[Path]: """Find all mounted HONEY volumes.""" if not VOLUMES_PATH.exists(): return [] volumes = [] for item in VOLUMES_PATH.iterdir(): if item.is_dir() and VOLUME_PATTERN.match(item.name): volumes.append(item) return sorted(volumes) def parse_filename(filename: str) -> dict | None: """Parse an MXF filename to extract camera and reel info.""" match = FILENAME_PATTERN.match(filename) if not match: return None camera, reel, clip, date, suffix = match.groups() return { "camera": camera.upper(), "reel": reel, "clip": clip, "date": date, "suffix": suffix, "reel_prefix": f"{camera.upper()}{reel}", "original_filename": filename, } def find_bin_folder(reel_prefix: str, volumes: list[Path]) -> dict | None: """Search HONEY volumes for a bin folder matching the reel prefix.""" for volume in volumes: project_path = volume / PROJECT_FOLDER if not project_path.exists(): continue for day_folder in project_path.iterdir(): if not day_folder.is_dir(): continue footage_path = day_folder / FOOTAGE_FOLDER if not footage_path.exists(): continue for cam_folder in footage_path.iterdir(): if not cam_folder.is_dir(): continue for bin_folder in cam_folder.iterdir(): if not bin_folder.is_dir(): continue if bin_folder.name.upper().startswith(reel_prefix.upper()): return { "volume": volume, "day_folder": day_folder.name, "camera_folder": cam_folder.name, "bin_folder": bin_folder.name, "full_path": bin_folder, } return None def build_destination_path(bin_info: dict, am_pm: str, destination_base: Path) -> Path: """Build the destination path for the proxy file.""" day_with_ampm = f"{bin_info['day_folder']} {am_pm}" return ( destination_base / day_with_ampm / PROXIES_FOLDER / bin_info["camera_folder"] / bin_info["bin_folder"] ) def process_file(file_path: Path, am_pm: str, dry_run: bool = False) -> bool: """Process a single MXF file: find matching bin and move to destination.""" filename = file_path.name file_info = parse_filename(filename) if not file_info: logger.warning(f"Could not parse filename: {filename}") return False logger.info(f"Processing: {filename} (Reel: {file_info['reel_prefix']}, {am_pm})") volumes = get_honey_volumes() if not volumes: logger.error("No HONEY volumes found!") return False bin_info = find_bin_folder(file_info["reel_prefix"], volumes) if not bin_info: logger.error(f"Could not find bin folder for {file_info['reel_prefix']}") return False logger.info(f"Found bin: {bin_info['bin_folder']} in {bin_info['day_folder']}") dest_folder = build_destination_path(bin_info, am_pm, DESTINATION_BASE) dest_file = dest_folder / filename logger.info(f"Destination: {dest_file}") if dry_run: logger.info("[DRY RUN] Would create folder and move file") return True dest_folder.mkdir(parents=True, exist_ok=True) try: shutil.move(str(file_path), str(dest_file)) logger.info(f"✓ Moved: {filename}") return True except Exception as e: logger.error(f"Failed to move file: {e}") return False class MXFHandler(FileSystemEventHandler): """Watchdog handler for new MXF files.""" def __init__(self, am_pm: str, stop_event: Event, dry_run: bool = False): self.am_pm = am_pm self.stop_event = stop_event self.dry_run = dry_run self.processing = set() def on_created(self, event): if event.is_directory: return file_path = Path(event.src_path) if file_path.suffix.lower() != ".mxf": return if str(file_path) in self.processing: return self.processing.add(str(file_path)) try: if wait_for_file_ready(file_path, self.stop_event): process_file(file_path, self.am_pm, self.dry_run) finally: self.processing.discard(str(file_path)) def on_moved(self, event): if event.is_directory: return dest_path = Path(event.dest_path) if dest_path.suffix.lower() == ".mxf": if str(dest_path) not in self.processing: self.processing.add(str(dest_path)) try: if wait_for_file_ready(dest_path, self.stop_event): process_file(dest_path, self.am_pm, self.dry_run) finally: self.processing.discard(str(dest_path)) class PathJuggler: """ Main controller class for Path Juggler. Can be used by CLI or GUI. """ def __init__(self, dry_run: bool = False): self.dry_run = dry_run self.observer: Observer | None = None self.stop_event = Event() self.running = False self._thread: Thread | None = None def setup_folders(self): """Ensure watch and destination folders exist.""" for name, folder in WATCH_FOLDERS.items(): folder.mkdir(parents=True, exist_ok=True) logger.info(f"Watch folder ({name}): {folder}") DESTINATION_BASE.mkdir(parents=True, exist_ok=True) logger.info(f"Destination: {DESTINATION_BASE}") def process_existing(self): """Process any MXF files already in watch folders.""" for am_pm, folder in WATCH_FOLDERS.items(): if not folder.exists(): continue for file_path in folder.glob("*.mxf"): if self.stop_event.is_set(): return if is_file_open(file_path): logger.info(f"Skipping (still being written): {file_path.name}") continue process_file(file_path, am_pm, self.dry_run) def start(self): """Start watching for files.""" if self.running: logger.warning("Already running") return self.stop_event.clear() self.running = True self.setup_folders() # Log volumes volumes = get_honey_volumes() if volumes: logger.info(f"Found volumes: {[v.name for v in volumes]}") else: logger.warning("No HONEY volumes currently mounted") # Process existing files logger.info("Checking for existing files...") self.process_existing() # Start observer self.observer = Observer() for am_pm, folder in WATCH_FOLDERS.items(): handler = MXFHandler(am_pm, self.stop_event, self.dry_run) self.observer.schedule(handler, str(folder), recursive=False) logger.info(f"Watching: {folder}") self.observer.start() logger.info("Watcher started - waiting for new files...") def stop(self): """Stop watching for files.""" if not self.running: return logger.info("Stopping watcher...") self.stop_event.set() if self.observer: self.observer.stop() self.observer.join(timeout=5) self.observer = None self.running = False logger.info("Watcher stopped") def start_async(self): """Start watching in a background thread.""" self._thread = Thread(target=self.start, daemon=True) self._thread.start() def is_running(self) -> bool: return self.running