#!/usr/bin/python3 import argparse import subprocess from sys import argv from pathlib import Path from logger import get_logger from progress import get_progress_bar, is_tty COMPRESS_OPTIONS = [ "-c:v", "libx265", "-crf", "18", "-preset", "medium", "-c:a", "copy", "-tag:v", "hvc1", ] itsme = Path(argv[0]).name logger = get_logger(itsme) def check_ffmpeg(): try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) subprocess.run(['ffprobe', '-version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): logger.error("ffmpeg not found or not working. Please install ffmpeg.") raise SystemExit(1) def get_duration(file_path: Path): cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path) ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) try: dur = float(result.stdout.strip()) if dur <= 0: raise ValueError(f"Invalid duration: {dur}") return dur except (ValueError, TypeError) as e: logger.error(f"Cannot determine duration of {file_path}: {e}") raise def set_progress(line: str, dur, pbar): if not line.startswith("out_time_ms="): return try: us = int(line.split('=')[1]) if us < 0: return current_sec = us / 1_000_000 pbar.n = min(current_sec, dur) pbar.refresh() except (ValueError, IndexError, OverflowError): return def shorten_name(filename: str, flag: int = 1): stem, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '') stem = stem.replace(' ', '').upper() ext = ext[:3].upper() bstem = stem.encode(errors='ignore') for i in range(8, 0, -1): try: cut = bstem[:i].decode() if cut != stem: cut += f'~{flag}' stem = cut break except UnicodeDecodeError: continue return f'{stem}.{ext}' if ext else stem def start_ffmpeg(infile: Path, outfile: Path): cmd = [ "ffmpeg", "-y", "-i", shorten_name(infile.name), "-progress", "pipe:1", "-nostats", "-loglevel", "error", *COMPRESS_OPTIONS, shorten_name(outfile.name, flag=2) ] logger.debug(f"Command: {' '.join(cmd)}") cmd[3] = str(infile) cmd[-1] = str(outfile) return subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, text=True, encoding='utf-8') def clean_on_failure(proc: subprocess.Popen, outfile: Path): if proc.poll() is None: proc.kill() proc.wait() if outfile.exists(): try: outfile.unlink() except OSError: pass def hevc_encode(infile: Path, outfile: Path, progress: bool = True): dur = get_duration(infile) logger.debug(f"Duration: {dur:.2f} seconds") proc = start_ffmpeg(infile, outfile) pbar = get_progress_bar( total=dur, unit='s', bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})' ) if progress and is_tty() else None try: for line in iter(proc.stdout.readline, ''): set_progress(line.strip(), dur, pbar) if proc.poll() is not None: break proc.wait() if proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, "ffmpeg") except ( KeyboardInterrupt, subprocess.CalledProcessError ): # Issue: maybe also interrupted by CtrlC clean_on_failure(proc, outfile) raise finally: if pbar: pbar.close() proc.stdout.close() def build_file_list(srcdir, extensions): exts = {ext.lower().lstrip('.') for ext in extensions} return [ path.absolute() for path in Path(srcdir).rglob('*') if path.is_file() and path.suffix.lower().lstrip('.') in exts ] def make_output_path(infile: Path, outdir: Path = None, relroot: str = None): if not outdir: return infile.with_suffix('.hevc.mp4') outdir = Path(outdir) if not relroot: return outdir / infile.with_suffix('.hevc.mp4').name rel_path = infile.relative_to( Path(relroot).absolute()).with_suffix('.hevc.mp4') return outdir / rel_path def resolve_output(infile: Path, outfile: Path, rm_original: bool = False): final_out = outfile.with_name( outfile.name.replace(".hevc.mp4", ".mp4")) is_path_conflict = final_out.resolve() == infile.resolve() if is_path_conflict: logger.warning( f"Filename conflict!\n i: {infile}\n o: {final_out}") if not rm_original: logger.debug("Renaming original to avoid conflict.") infile.rename(infile.with_stem(infile.stem + '.bak')) logger.debug(f"Output file: {final_out}") outfile.replace(final_out) if rm_original and not is_path_conflict: logger.debug(f"Remove original: {infile}") infile.unlink() def parse_args(): parser = argparse.ArgumentParser( prog=itsme, description='Encode video files to HEVC CRF 18.', formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument('-d', '--dir', dest='srcdir', help='Source directory') parser.add_argument('-o', '--outdir', dest='outdir', help='Output directory') parser.add_argument('-f', '--ext', dest='extensions', action='append', default=[], help='Extensions to search for. Can be repeated.') parser.add_argument('-r', '--rm-original', dest='remove_original', action='store_true', help='Remove original after encoding') parser.add_argument('-s', '--silent', dest='silent', action='store_true', help='Keep silent (no progress bar)') parser.add_argument('files', nargs='*', help='Input files (with no -d/--dir)') args = parser.parse_args() if args.srcdir and args.files: parser.error('cannot mix -d/--dir with input files') if not args.srcdir and not args.files: parser.error('either -d/--dir or input files must be provided') if args.srcdir and not args.extensions: args.extensions = ['mp4'] return args def main(): check_ffmpeg() args = parse_args() if args.srcdir: files = build_file_list(args.srcdir, args.extensions) errmsg = (f'No files found in {args.srcdir} ' f'with extensions: {args.extensions}') else: files = [Path(f).absolute() for f in args.files if Path(f).is_file()] errmsg = 'No input files provided' if not files: raise SystemExit(errmsg) if len(files) > 1: files.sort(key=lambda x: x.stat().st_size) for idx, infile in enumerate(files, 1): logger.info(f"[{idx}/{len(files)}] {infile.name}") outfile = make_output_path(infile, args.outdir, args.srcdir) outfile.parent.mkdir(parents=True, exist_ok=True) try: outfile.touch(0o644) hevc_encode(infile, outfile, not args.silent) except (OSError, PermissionError) as e: logger.error(f"Job failed: {e}") raise isize = infile.stat().st_size / (1024 * 1024) osize = outfile.stat().st_size / (1024 * 1024) logger.info(f"Size deductions: {osize - isize:+.2f} MB") logger.debug(f" i: {isize:.2f} MB -> o: {osize:.2f} MB") resolve_output(infile, outfile, args.remove_original) if __name__ == '__main__': main()