#!/usr/bin/python3 import argparse import subprocess from sys import argv from pathlib import Path from logger import get_logger from progress import get_progress_bar, is_tty COMPRESS_OPTIONS = [ "-c:v", "libx265", "-crf", "18", "-preset", "medium", "-c:a", "copy", "-tag:v", "hvc1", ] itsme = Path(argv[0]).name logger = get_logger(itsme) def check_ffmpeg(): try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) subprocess.run(['ffprobe', '-version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): logger.error("ffmpeg not found or not working. Please install ffmpeg.") raise SystemExit(1) def get_duration(file_path: Path): cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path) ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) try: dur = float(result.stdout.strip()) if dur <= 0: raise ValueError(f"Invalid duration: {dur}") return dur except (ValueError, TypeError) as e: logger.error(f"Cannot determine duration of {file_path}: {e}") raise def read_progress(ostream_line, dur, pbar): if not ostream_line.startswith("out_time_ms="): return try: us = int(ostream_line.split('=')[1]) if us < 0: return secs = us / 1_000_000 if secs > dur * 1.1: # Allow 10% overshoot pbar.n = dur else: pbar.n = secs pbar.refresh() except (ValueError, OverflowError): pass def hevc_encode(infile: Path, outfile: Path, progress: bool = True): # too heavy bro. dur = get_duration(infile) cmd = [ "ffmpeg", "-y", "-i", str(infile), "-progress", "pipe:1", "-nostats", *COMPRESS_OPTIONS, str(outfile) ] logger.debug(f"Duration: {dur:.2f} seconds") logger.debug(f"Command: {' '.join(cmd)}") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', bufsize=1) progress = progress and is_tty() pbar = None if not progress else get_progress_bar( total=dur, unit='s', bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})') try: while True: line = proc.stdout.readline() if not line and proc.poll() is not None: break if not pbar: continue read_progress(line.strip(), dur, pbar) proc.wait() if proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, cmd) except ( KeyboardInterrupt, subprocess.CalledProcessError ) as e: if proc.poll() is None: proc.kill() proc.wait() logger.error( f"Encoding failed with code {proc.returncode}:" f" {e.__class__.__name__}") if outfile.exists(): try: outfile.unlink() except OSError: pass raise finally: if pbar: pbar.close() def build_file_list(srcdir, extensions): exts = {ext.lower().lstrip('.') for ext in extensions} return [ path.absolute() for path in Path(srcdir).rglob('*') if path.is_file() and path.suffix.lower().lstrip('.') in exts ] def make_output_path(infile: Path, outdir: Path = None, relroot: str = None): if not outdir: return infile.with_suffix('.hevc.mp4') outdir = Path(outdir) if not relroot: return outdir / infile.with_suffix('.hevc.mp4').name rel_path = infile.relative_to( Path(relroot).absolute()).with_suffix('.hevc.mp4') return outdir / rel_path def parse_args(): parser = argparse.ArgumentParser( prog=itsme, description='Encode video files to HEVC CRF 18.', formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument('-d', '--dir', dest='srcdir', help='Source directory') parser.add_argument('-o', '--outdir', dest='outdir', help='Output directory') parser.add_argument('-f', '--ext', dest='extensions', action='append', default=[], help='Extensions to search for. Can be repeated.') parser.add_argument('-r', '--rm-original', dest='remove_original', action='store_true', help='Remove original after encoding') parser.add_argument('-s', '--silent', dest='silent', action='store_true', help='Keep silent (no progress bar)') parser.add_argument('files', nargs='*', help='Input files (with no -d/--dir)') args = parser.parse_args() if args.srcdir and args.files: parser.error('cannot mix -d/--dir with input files') if not args.srcdir and not args.files: parser.error('either -d/--dir or input files must be provided') if args.srcdir and not args.extensions: args.extensions = ['mp4'] return args def main(): check_ffmpeg() args = parse_args() if args.srcdir: files = build_file_list(args.srcdir, args.extensions) errmsg = (f'No files found in {args.srcdir} ' f'with extensions: {args.extensions}') else: files = [Path(f).absolute() for f in args.files if Path(f).is_file()] errmsg = 'No input files provided' if not files: raise SystemExit(errmsg) files.sort(key=lambda x: x.stat().st_size) for idx, infile in enumerate(files, 1): logger.info(f"[{idx}/{len(files)}] {infile.name}") outfile = make_output_path(infile, args.outdir, args.srcdir) outfile.parent.mkdir(parents=True, exist_ok=True) try: outfile.touch(0o644) hevc_encode(infile, outfile, not args.silent) except (OSError, PermissionError) as e: logger.error(f"Job failed: {e}") raise final_out = outfile.with_name( outfile.name.replace(".hevc.mp4", ".mp4")) if final_out.resolve() == infile.resolve(): logger.warning( f"Filename conflict!\n i: {infile}\n o: {final_out}") if not args.remove_original: logger.debug("renaming original to avoid conflict.") infile.rename(infile.with_stem(infile.stem + '.bak')) outfile.replace(final_out) else: outfile.replace(final_out) if args.remove_original: logger.debug(f"Remove original: {infile}") infile.unlink() logger.debug(f"Output file: {final_out}") if __name__ == '__main__': main()