Files
path-juggler/path_juggler_core.py

399 lines
12 KiB
Python

"""
Path Juggler - Core Logic
This module contains the file watching and processing logic,
separated from any UI concerns.
"""
import os
import re
import shutil
import subprocess
import time
import logging
from pathlib import Path
from threading import Thread, Event
from typing import Callable, Optional
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Configuration
WATCH_FOLDERS = {
"AM": Path.home() / "Movies" / "AM",
"PM": Path.home() / "Movies" / "PM",
}
DESTINATION_BASE = Path.home() / "Movies" / "Honey Dailies Transfer"
VOLUMES_PATH = Path("/Volumes")
VOLUME_PATTERN = re.compile(r"^HONEY \d+$")
PROJECT_FOLDER = "Honey"
FOOTAGE_FOLDER = "02 FOOTAGE"
PROXIES_FOLDER = "03 EDITORIAL PROXIES"
# Regex to parse filenames like A080C002_251209R2.mxf
FILENAME_PATTERN = re.compile(
r"^([A-Z])(\d{3})C(\d{3})_(\d{6})([A-Z0-9]*)\.mxf$",
re.IGNORECASE
)
logger = logging.getLogger(__name__)
def is_file_open(file_path: Path) -> bool:
"""Check if a file is currently open by any process using lsof."""
try:
result = subprocess.run(
["lsof", str(file_path)],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except subprocess.TimeoutExpired:
logger.warning(f"lsof timed out for {file_path}")
return True
except Exception as e:
logger.warning(f"lsof failed: {e}")
return True
def get_file_owner_process(file_path: Path) -> Optional[str]:
"""Get the name of the process that has the file open."""
try:
result = subprocess.run(
["lsof", "-t", str(file_path)],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 and result.stdout.strip():
pid = result.stdout.strip().split('\n')[0]
ps_result = subprocess.run(
["ps", "-p", pid, "-o", "comm="],
capture_output=True,
text=True,
timeout=5
)
if ps_result.returncode == 0:
return ps_result.stdout.strip()
except Exception:
pass
return None
def wait_for_file_ready(
file_path: Path,
stop_event: Optional[Event] = None,
check_interval: float = 2.0,
stable_duration: float = 5.0,
timeout: float = 3600.0
) -> bool:
"""Wait until a file is no longer being written to."""
start_time = time.time()
last_size = -1
size_stable_since = None
logged_process = False
logger.info(f"Waiting for file to be ready: {file_path.name}")
while time.time() - start_time < timeout:
# Check if we should stop
if stop_event and stop_event.is_set():
logger.info(f"Stop requested, abandoning: {file_path.name}")
return False
if not file_path.exists():
logger.warning(f"File disappeared: {file_path.name}")
return False
if not is_file_open(file_path):
logger.info(f"File closed by all processes: {file_path.name}")
return True
if not logged_process:
owner = get_file_owner_process(file_path)
if owner:
logger.info(f"File held by: {owner}")
logged_process = True
current_size = file_path.stat().st_size
if current_size == last_size:
if size_stable_since is None:
size_stable_since = time.time()
elif time.time() - size_stable_since >= stable_duration:
time.sleep(1)
if not is_file_open(file_path):
logger.info(f"File ready (closed): {file_path.name}")
return True
else:
size_stable_since = None
size_mb = current_size / (1024 * 1024)
logger.debug(f"File growing: {file_path.name} ({size_mb:.1f} MB)")
last_size = current_size
time.sleep(check_interval)
logger.error(f"Timeout waiting for file: {file_path.name}")
return False
def get_honey_volumes() -> list[Path]:
"""Find all mounted HONEY volumes."""
if not VOLUMES_PATH.exists():
return []
volumes = []
for item in VOLUMES_PATH.iterdir():
if item.is_dir() and VOLUME_PATTERN.match(item.name):
volumes.append(item)
return sorted(volumes)
def parse_filename(filename: str) -> Optional[dict]:
"""Parse an MXF filename to extract camera and reel info."""
match = FILENAME_PATTERN.match(filename)
if not match:
return None
camera, reel, clip, date, suffix = match.groups()
return {
"camera": camera.upper(),
"reel": reel,
"clip": clip,
"date": date,
"suffix": suffix,
"reel_prefix": f"{camera.upper()}{reel}",
"original_filename": filename,
}
def find_bin_folder(reel_prefix: str, volumes: list[Path]) -> Optional[dict]:
"""Search HONEY volumes for a bin folder matching the reel prefix."""
for volume in volumes:
project_path = volume / PROJECT_FOLDER
if not project_path.exists():
continue
for day_folder in project_path.iterdir():
if not day_folder.is_dir():
continue
footage_path = day_folder / FOOTAGE_FOLDER
if not footage_path.exists():
continue
for cam_folder in footage_path.iterdir():
if not cam_folder.is_dir():
continue
for bin_folder in cam_folder.iterdir():
if not bin_folder.is_dir():
continue
if bin_folder.name.upper().startswith(reel_prefix.upper()):
return {
"volume": volume,
"day_folder": day_folder.name,
"camera_folder": cam_folder.name,
"bin_folder": bin_folder.name,
"full_path": bin_folder,
}
return None
def build_destination_path(bin_info: dict, am_pm: str, destination_base: Path) -> Path:
"""Build the destination path for the proxy file."""
day_with_ampm = f"{bin_info['day_folder']} {am_pm}"
return (
destination_base
/ day_with_ampm
/ PROXIES_FOLDER
/ bin_info["camera_folder"]
/ bin_info["bin_folder"]
)
def process_file(file_path: Path, am_pm: str, dry_run: bool = False) -> bool:
"""Process a single MXF file: find matching bin and move to destination."""
filename = file_path.name
file_info = parse_filename(filename)
if not file_info:
logger.warning(f"Could not parse filename: {filename}")
return False
logger.info(f"Processing: {filename} (Reel: {file_info['reel_prefix']}, {am_pm})")
volumes = get_honey_volumes()
if not volumes:
logger.error("No HONEY volumes found!")
return False
bin_info = find_bin_folder(file_info["reel_prefix"], volumes)
if not bin_info:
logger.error(f"Could not find bin folder for {file_info['reel_prefix']}")
return False
logger.info(f"Found bin: {bin_info['bin_folder']} in {bin_info['day_folder']}")
dest_folder = build_destination_path(bin_info, am_pm, DESTINATION_BASE)
dest_file = dest_folder / filename
logger.info(f"Destination: {dest_file}")
if dry_run:
logger.info("[DRY RUN] Would create folder and move file")
return True
dest_folder.mkdir(parents=True, exist_ok=True)
try:
shutil.move(str(file_path), str(dest_file))
logger.info(f"✓ Moved: {filename}")
return True
except Exception as e:
logger.error(f"Failed to move file: {e}")
return False
class MXFHandler(FileSystemEventHandler):
"""Watchdog handler for new MXF files."""
def __init__(self, am_pm: str, stop_event: Event, dry_run: bool = False):
self.am_pm = am_pm
self.stop_event = stop_event
self.dry_run = dry_run
self.processing = set()
def on_created(self, event):
if event.is_directory:
return
file_path = Path(event.src_path)
if file_path.suffix.lower() != ".mxf":
return
if str(file_path) in self.processing:
return
self.processing.add(str(file_path))
try:
if wait_for_file_ready(file_path, self.stop_event):
process_file(file_path, self.am_pm, self.dry_run)
finally:
self.processing.discard(str(file_path))
def on_moved(self, event):
if event.is_directory:
return
dest_path = Path(event.dest_path)
if dest_path.suffix.lower() == ".mxf":
if str(dest_path) not in self.processing:
self.processing.add(str(dest_path))
try:
if wait_for_file_ready(dest_path, self.stop_event):
process_file(dest_path, self.am_pm, self.dry_run)
finally:
self.processing.discard(str(dest_path))
class PathJuggler:
"""
Main controller class for Path Juggler.
Can be used by CLI or GUI.
"""
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
self.observer: Optional[Observer] = None
self.stop_event = Event()
self.running = False
self._thread: Optional[Thread] = None
def setup_folders(self):
"""Ensure watch and destination folders exist."""
for name, folder in WATCH_FOLDERS.items():
folder.mkdir(parents=True, exist_ok=True)
logger.info(f"Watch folder ({name}): {folder}")
DESTINATION_BASE.mkdir(parents=True, exist_ok=True)
logger.info(f"Destination: {DESTINATION_BASE}")
def process_existing(self):
"""Process any MXF files already in watch folders."""
for am_pm, folder in WATCH_FOLDERS.items():
if not folder.exists():
continue
for file_path in folder.glob("*.mxf"):
if self.stop_event.is_set():
return
if is_file_open(file_path):
logger.info(f"Skipping (still being written): {file_path.name}")
continue
process_file(file_path, am_pm, self.dry_run)
def start(self):
"""Start watching for files."""
if self.running:
logger.warning("Already running")
return
self.stop_event.clear()
self.running = True
self.setup_folders()
# Log volumes
volumes = get_honey_volumes()
if volumes:
logger.info(f"Found volumes: {[v.name for v in volumes]}")
else:
logger.warning("No HONEY volumes currently mounted")
# Process existing files
logger.info("Checking for existing files...")
self.process_existing()
# Start observer
self.observer = Observer()
for am_pm, folder in WATCH_FOLDERS.items():
handler = MXFHandler(am_pm, self.stop_event, self.dry_run)
self.observer.schedule(handler, str(folder), recursive=False)
logger.info(f"Watching: {folder}")
self.observer.start()
logger.info("Watcher started - waiting for new files...")
def stop(self):
"""Stop watching for files."""
if not self.running:
return
logger.info("Stopping watcher...")
self.stop_event.set()
if self.observer:
self.observer.stop()
self.observer.join(timeout=5)
self.observer = None
self.running = False
logger.info("Watcher stopped")
def start_async(self):
"""Start watching in a background thread."""
self._thread = Thread(target=self.start, daemon=True)
self._thread.start()
def is_running(self) -> bool:
return self.running