#!/usr/bin/python3 import argparse import subprocess import re import logging from shutil import get_terminal_size from sys import argv, stdout from pathlib import Path try: import colorlog fmt_arr = [ "%(cyan)s%(asctime)s.%(msecs)03d%(reset)s", "%(log_color)s%(levelname)-7s%(reset)s", "%(blue)s%(message)s%(reset)s", ] fmt = colorlog.ColoredFormatter( " | ".join(fmt_arr), datefmt="%Y-%m-%d %H:%M:%S" ) except ImportError: fmt = logging.Formatter( "%(asctime)s.%(msecs)03d | %(levelname)-7s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) fmt.default_msec_format = "%s.%03d" handler = logging.StreamHandler() handler.setFormatter(fmt) logger = logging.getLogger('example') logger.addHandler(handler) logger.setLevel('DEBUG') itsme = Path(argv[0]).name class RawProgressBar: def __init__(self, total=100, unit='%'): self.total = total self.unit = unit self.current = 0 self.cols = get_terminal_size((40, 20)).columns def update(self, delta): self.current += delta self._print_bar() def _print_bar(self): cur = min(self.current, self.total) pct = min(cur / self.total if self.total > 0 else 0, 1.0) pct_str = f"{pct * 100:3.1f}%" rate_str = f"{cur:.2f}/{self.total:.2f}({self.unit})" fixed_len = len(pct_str) + len(rate_str) + 10 bar_width = max(self.cols - fixed_len, 10) filled_len = int(bar_width * pct) bar = '>' * filled_len + '-' * (bar_width - filled_len) output = f"\r {pct_str} [{bar}] {rate_str}" stdout.write(output) stdout.flush() def close(self): stdout.write('\n') stdout.flush() try: from tqdm import tqdm ProgressBar = tqdm except ImportError: ProgressBar = RawProgressBar def get_duration(file_path: Path): cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path) ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) return float(result.stdout.strip()) def time_to_seconds(time_str): h, m, s = map(float, time_str.split(':')) return h * 3600 + m * 60 + s def hevc_encode(infile: Path, outfile: Path, progress: bool = True): dur = get_duration(infile) cmd = [ "ffmpeg", "-y", "-i", str(infile), "-c:v", "libx265", "-crf", "18", "-preset", "medium", "-c:a", "copy", "-tag:v", "hvc1", str(outfile) ] logger.info(f"Encoding {Path(infile).name} to HEVC CRF 18...") logger.debug(f"Duration: {dur:.2f} seconds") logger.debug(f"Command: {' '.join(cmd)}") proc = subprocess.Popen( cmd, stderr=subprocess.PIPE, text=True, encoding='utf-8' ) pbar = ProgressBar(total=dur, unit='s') if progress else None last_time = 0 try: while True: line = proc.stderr.readline() if not line and proc.poll() is not None: break if not pbar: continue match = re.search(r"time=(\d{2}:\d{2}:\d{2}.\d{2})", line) if match: current_time = time_to_seconds(match.group(1)) pbar.update(current_time - last_time) last_time = current_time except KeyboardInterrupt: logger.warning("SIGINT received, terminating ffmpeg ...") proc.kill() proc.wait() if outfile.exists(): try: outfile.unlink() except OSError: pass raise finally: if pbar: pbar.close() if proc.returncode != 0: # logger.error(proc.stdout.read()) # logger.error(proc.stderr.readlines()) raise subprocess.CalledProcessError(proc.returncode, cmd) def build_file_list(srcdir, extensions): exts = {ext.lower().lstrip('.') for ext in extensions} return [ path.absolute() for path in Path(srcdir).rglob('*') if path.is_file() and path.suffix.lower().lstrip('.') in exts ] def make_output_path(infile: Path, outdir: Path = None, relroot: str = None): if not outdir: return infile.with_suffix('.hevc.mp4') outdir = Path(outdir) if not relroot: return outdir / infile.with_suffix('.hevc.mp4').name rel_path = infile.relative_to( Path(relroot).absolute()).with_suffix('.hevc.mp4') return outdir / rel_path def parse_args(): parser = argparse.ArgumentParser( prog=itsme, description='Encode video files to HEVC CRF 18.', formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument('-d', '--dir', dest='srcdir', help='Source directory') parser.add_argument('-o', '--outdir', dest='outdir', help='Output directory') parser.add_argument('-f', '--ext', dest='extensions', action='append', default=[], help='Extensions to search for. Can be repeated.') parser.add_argument('-r', '--rm-original', dest='remove_original', action='store_true', help='Remove original after encoding') parser.add_argument('-s', '--silent', dest='silent', action='store_true', help='Keep silent (no progress bar)') parser.add_argument('files', nargs='*', help='Input files (with no -d/--dir)') args = parser.parse_args() if args.srcdir and args.files: parser.error('cannot mix -d/--dir with input files') if not args.srcdir and not args.files: parser.error('either -d/--dir or input files must be provided') if args.srcdir and not args.extensions: args.extensions = ['mp4'] return args def main(): args = parse_args() if args.srcdir: files = build_file_list(args.srcdir, args.extensions) errmsg = (f'No files found in {args.srcdir} ' f'with extensions: {args.extensions}') else: files = [Path(f).absolute() for f in args.files if Path(f).is_file()] errmsg = 'No input files provided' if not files: raise SystemExit(errmsg) for infile in files: # print(f"Processing: {infile}") outfile = make_output_path(infile, args.outdir, args.srcdir) outfile.parent.mkdir(parents=True, exist_ok=True) hevc_encode(infile, outfile, not args.silent) final_out = outfile.with_name( outfile.name.replace(".hevc.mp4", ".mp4")) if args.remove_original: logger.debug(f"Remove original: {infile}") infile.unlink() elif final_out.resolve() == infile.resolve(): logger.warning(f"Filename conflict: {infile} <= {final_out}") infile.rename(infile.with_suffix('.bak.mp4')) logger.debug(f"Output file: {final_out}") outfile.replace(final_out) if __name__ == '__main__': main()