399 lines
12 KiB
Python
399 lines
12 KiB
Python
"""
|
|
Path Juggler - Core Logic
|
|
|
|
This module contains the file watching and processing logic,
|
|
separated from any UI concerns.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from threading import Thread, Event
|
|
from typing import Callable, Optional
|
|
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
# Configuration
|
|
WATCH_FOLDERS = {
|
|
"AM": Path.home() / "Movies" / "AM",
|
|
"PM": Path.home() / "Movies" / "PM",
|
|
}
|
|
DESTINATION_BASE = Path.home() / "Movies" / "Honey Dailies Transfer"
|
|
VOLUMES_PATH = Path("/Volumes")
|
|
VOLUME_PATTERN = re.compile(r"^HONEY \d+$")
|
|
PROJECT_FOLDER = "Honey"
|
|
FOOTAGE_FOLDER = "02 FOOTAGE"
|
|
PROXIES_FOLDER = "03 EDITORIAL PROXIES"
|
|
|
|
# Regex to parse filenames like A080C002_251209R2.mxf
|
|
FILENAME_PATTERN = re.compile(
|
|
r"^([A-Z])(\d{3})C(\d{3})_(\d{6})([A-Z0-9]*)\.mxf$",
|
|
re.IGNORECASE
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def is_file_open(file_path: Path) -> bool:
|
|
"""Check if a file is currently open by any process using lsof."""
|
|
try:
|
|
result = subprocess.run(
|
|
["lsof", str(file_path)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
return result.returncode == 0
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning(f"lsof timed out for {file_path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"lsof failed: {e}")
|
|
return True
|
|
|
|
|
|
def get_file_owner_process(file_path: Path) -> Optional[str]:
|
|
"""Get the name of the process that has the file open."""
|
|
try:
|
|
result = subprocess.run(
|
|
["lsof", "-t", str(file_path)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
pid = result.stdout.strip().split('\n')[0]
|
|
ps_result = subprocess.run(
|
|
["ps", "-p", pid, "-o", "comm="],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if ps_result.returncode == 0:
|
|
return ps_result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def wait_for_file_ready(
|
|
file_path: Path,
|
|
stop_event: Optional[Event] = None,
|
|
check_interval: float = 2.0,
|
|
stable_duration: float = 5.0,
|
|
timeout: float = 3600.0
|
|
) -> bool:
|
|
"""Wait until a file is no longer being written to."""
|
|
start_time = time.time()
|
|
last_size = -1
|
|
size_stable_since = None
|
|
logged_process = False
|
|
|
|
logger.info(f"Waiting for file to be ready: {file_path.name}")
|
|
|
|
while time.time() - start_time < timeout:
|
|
# Check if we should stop
|
|
if stop_event and stop_event.is_set():
|
|
logger.info(f"Stop requested, abandoning: {file_path.name}")
|
|
return False
|
|
|
|
if not file_path.exists():
|
|
logger.warning(f"File disappeared: {file_path.name}")
|
|
return False
|
|
|
|
if not is_file_open(file_path):
|
|
logger.info(f"File closed by all processes: {file_path.name}")
|
|
return True
|
|
|
|
if not logged_process:
|
|
owner = get_file_owner_process(file_path)
|
|
if owner:
|
|
logger.info(f"File held by: {owner}")
|
|
logged_process = True
|
|
|
|
current_size = file_path.stat().st_size
|
|
|
|
if current_size == last_size:
|
|
if size_stable_since is None:
|
|
size_stable_since = time.time()
|
|
elif time.time() - size_stable_since >= stable_duration:
|
|
time.sleep(1)
|
|
if not is_file_open(file_path):
|
|
logger.info(f"File ready (closed): {file_path.name}")
|
|
return True
|
|
else:
|
|
size_stable_since = None
|
|
size_mb = current_size / (1024 * 1024)
|
|
logger.debug(f"File growing: {file_path.name} ({size_mb:.1f} MB)")
|
|
|
|
last_size = current_size
|
|
time.sleep(check_interval)
|
|
|
|
logger.error(f"Timeout waiting for file: {file_path.name}")
|
|
return False
|
|
|
|
|
|
def get_honey_volumes() -> list[Path]:
|
|
"""Find all mounted HONEY volumes."""
|
|
if not VOLUMES_PATH.exists():
|
|
return []
|
|
|
|
volumes = []
|
|
for item in VOLUMES_PATH.iterdir():
|
|
if item.is_dir() and VOLUME_PATTERN.match(item.name):
|
|
volumes.append(item)
|
|
|
|
return sorted(volumes)
|
|
|
|
|
|
def parse_filename(filename: str) -> Optional[dict]:
|
|
"""Parse an MXF filename to extract camera and reel info."""
|
|
match = FILENAME_PATTERN.match(filename)
|
|
if not match:
|
|
return None
|
|
|
|
camera, reel, clip, date, suffix = match.groups()
|
|
return {
|
|
"camera": camera.upper(),
|
|
"reel": reel,
|
|
"clip": clip,
|
|
"date": date,
|
|
"suffix": suffix,
|
|
"reel_prefix": f"{camera.upper()}{reel}",
|
|
"original_filename": filename,
|
|
}
|
|
|
|
|
|
def find_bin_folder(reel_prefix: str, volumes: list[Path]) -> Optional[dict]:
|
|
"""Search HONEY volumes for a bin folder matching the reel prefix."""
|
|
for volume in volumes:
|
|
project_path = volume / PROJECT_FOLDER
|
|
if not project_path.exists():
|
|
continue
|
|
|
|
for day_folder in project_path.iterdir():
|
|
if not day_folder.is_dir():
|
|
continue
|
|
|
|
footage_path = day_folder / FOOTAGE_FOLDER
|
|
if not footage_path.exists():
|
|
continue
|
|
|
|
for cam_folder in footage_path.iterdir():
|
|
if not cam_folder.is_dir():
|
|
continue
|
|
|
|
for bin_folder in cam_folder.iterdir():
|
|
if not bin_folder.is_dir():
|
|
continue
|
|
|
|
if bin_folder.name.upper().startswith(reel_prefix.upper()):
|
|
return {
|
|
"volume": volume,
|
|
"day_folder": day_folder.name,
|
|
"camera_folder": cam_folder.name,
|
|
"bin_folder": bin_folder.name,
|
|
"full_path": bin_folder,
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def build_destination_path(bin_info: dict, am_pm: str, destination_base: Path) -> Path:
|
|
"""Build the destination path for the proxy file."""
|
|
day_with_ampm = f"{bin_info['day_folder']} {am_pm}"
|
|
|
|
return (
|
|
destination_base
|
|
/ day_with_ampm
|
|
/ PROXIES_FOLDER
|
|
/ bin_info["camera_folder"]
|
|
/ bin_info["bin_folder"]
|
|
)
|
|
|
|
|
|
def process_file(file_path: Path, am_pm: str, dry_run: bool = False) -> bool:
|
|
"""Process a single MXF file: find matching bin and move to destination."""
|
|
filename = file_path.name
|
|
|
|
file_info = parse_filename(filename)
|
|
if not file_info:
|
|
logger.warning(f"Could not parse filename: {filename}")
|
|
return False
|
|
|
|
logger.info(f"Processing: {filename} (Reel: {file_info['reel_prefix']}, {am_pm})")
|
|
|
|
volumes = get_honey_volumes()
|
|
if not volumes:
|
|
logger.error("No HONEY volumes found!")
|
|
return False
|
|
|
|
bin_info = find_bin_folder(file_info["reel_prefix"], volumes)
|
|
if not bin_info:
|
|
logger.error(f"Could not find bin folder for {file_info['reel_prefix']}")
|
|
return False
|
|
|
|
logger.info(f"Found bin: {bin_info['bin_folder']} in {bin_info['day_folder']}")
|
|
|
|
dest_folder = build_destination_path(bin_info, am_pm, DESTINATION_BASE)
|
|
dest_file = dest_folder / filename
|
|
|
|
logger.info(f"Destination: {dest_file}")
|
|
|
|
if dry_run:
|
|
logger.info("[DRY RUN] Would create folder and move file")
|
|
return True
|
|
|
|
dest_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
shutil.move(str(file_path), str(dest_file))
|
|
logger.info(f"✓ Moved: {filename}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to move file: {e}")
|
|
return False
|
|
|
|
|
|
class MXFHandler(FileSystemEventHandler):
|
|
"""Watchdog handler for new MXF files."""
|
|
|
|
def __init__(self, am_pm: str, stop_event: Event, dry_run: bool = False):
|
|
self.am_pm = am_pm
|
|
self.stop_event = stop_event
|
|
self.dry_run = dry_run
|
|
self.processing = set()
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
return
|
|
|
|
file_path = Path(event.src_path)
|
|
|
|
if file_path.suffix.lower() != ".mxf":
|
|
return
|
|
|
|
if str(file_path) in self.processing:
|
|
return
|
|
|
|
self.processing.add(str(file_path))
|
|
|
|
try:
|
|
if wait_for_file_ready(file_path, self.stop_event):
|
|
process_file(file_path, self.am_pm, self.dry_run)
|
|
finally:
|
|
self.processing.discard(str(file_path))
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory:
|
|
return
|
|
|
|
dest_path = Path(event.dest_path)
|
|
if dest_path.suffix.lower() == ".mxf":
|
|
if str(dest_path) not in self.processing:
|
|
self.processing.add(str(dest_path))
|
|
try:
|
|
if wait_for_file_ready(dest_path, self.stop_event):
|
|
process_file(dest_path, self.am_pm, self.dry_run)
|
|
finally:
|
|
self.processing.discard(str(dest_path))
|
|
|
|
|
|
class PathJuggler:
|
|
"""
|
|
Main controller class for Path Juggler.
|
|
Can be used by CLI or GUI.
|
|
"""
|
|
|
|
def __init__(self, dry_run: bool = False):
|
|
self.dry_run = dry_run
|
|
self.observer: Optional[Observer] = None
|
|
self.stop_event = Event()
|
|
self.running = False
|
|
self._thread: Optional[Thread] = None
|
|
|
|
def setup_folders(self):
|
|
"""Ensure watch and destination folders exist."""
|
|
for name, folder in WATCH_FOLDERS.items():
|
|
folder.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Watch folder ({name}): {folder}")
|
|
|
|
DESTINATION_BASE.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Destination: {DESTINATION_BASE}")
|
|
|
|
def process_existing(self):
|
|
"""Process any MXF files already in watch folders."""
|
|
for am_pm, folder in WATCH_FOLDERS.items():
|
|
if not folder.exists():
|
|
continue
|
|
|
|
for file_path in folder.glob("*.mxf"):
|
|
if self.stop_event.is_set():
|
|
return
|
|
if is_file_open(file_path):
|
|
logger.info(f"Skipping (still being written): {file_path.name}")
|
|
continue
|
|
process_file(file_path, am_pm, self.dry_run)
|
|
|
|
def start(self):
|
|
"""Start watching for files."""
|
|
if self.running:
|
|
logger.warning("Already running")
|
|
return
|
|
|
|
self.stop_event.clear()
|
|
self.running = True
|
|
|
|
self.setup_folders()
|
|
|
|
# Log volumes
|
|
volumes = get_honey_volumes()
|
|
if volumes:
|
|
logger.info(f"Found volumes: {[v.name for v in volumes]}")
|
|
else:
|
|
logger.warning("No HONEY volumes currently mounted")
|
|
|
|
# Process existing files
|
|
logger.info("Checking for existing files...")
|
|
self.process_existing()
|
|
|
|
# Start observer
|
|
self.observer = Observer()
|
|
|
|
for am_pm, folder in WATCH_FOLDERS.items():
|
|
handler = MXFHandler(am_pm, self.stop_event, self.dry_run)
|
|
self.observer.schedule(handler, str(folder), recursive=False)
|
|
logger.info(f"Watching: {folder}")
|
|
|
|
self.observer.start()
|
|
logger.info("Watcher started - waiting for new files...")
|
|
|
|
def stop(self):
|
|
"""Stop watching for files."""
|
|
if not self.running:
|
|
return
|
|
|
|
logger.info("Stopping watcher...")
|
|
self.stop_event.set()
|
|
|
|
if self.observer:
|
|
self.observer.stop()
|
|
self.observer.join(timeout=5)
|
|
self.observer = None
|
|
|
|
self.running = False
|
|
logger.info("Watcher stopped")
|
|
|
|
def start_async(self):
|
|
"""Start watching in a background thread."""
|
|
self._thread = Thread(target=self.start, daemon=True)
|
|
self._thread.start()
|
|
|
|
def is_running(self) -> bool:
|
|
return self.running
|