From fe236a2cc03cab4c40997371388a1e4033b291cb Mon Sep 17 00:00:00 2001 From: "SilverAg.L" Date: Tue, 21 Apr 2026 19:59:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9B=E6=97=A5=E5=BF=97=E8=A1=A8?= =?UTF-8?q?=E8=BF=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmps_hevc_crf18.py | 155 ++++----------------------------------------- ffwrapper.py | 123 +++++++++++++++++++++++++++++++++++ utils.py | 27 ++++++++ 3 files changed, 162 insertions(+), 143 deletions(-) mode change 100644 => 100755 cmps_hevc_crf18.py create mode 100644 ffwrapper.py create mode 100644 utils.py diff --git a/cmps_hevc_crf18.py b/cmps_hevc_crf18.py old mode 100644 new mode 100755 index ade7d4e..837e5a1 --- a/cmps_hevc_crf18.py +++ b/cmps_hevc_crf18.py @@ -1,141 +1,12 @@ #!/usr/bin/python3 import argparse -import subprocess - -from sys import argv from pathlib import Path +from ffwrapper import check_ffmpeg, hevc_encode from logger import get_logger -from progress import get_progress_bar, is_tty +from utils import format_size -COMPRESS_OPTIONS = [ - "-c:v", "libx265", "-crf", "18", "-preset", "medium", - "-c:a", "copy", - "-tag:v", "hvc1", -] - -itsme = Path(argv[0]).name -logger = get_logger(itsme) - - -def check_ffmpeg(): - try: - subprocess.run(['ffmpeg', '-version'], - capture_output=True, check=True) - subprocess.run(['ffprobe', '-version'], - capture_output=True, check=True) - except (subprocess.CalledProcessError, FileNotFoundError): - logger.error("ffmpeg not found or not working. Please install ffmpeg.") - raise SystemExit(1) - - -def get_duration(file_path: Path): - cmd = [ - 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', - '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path) - ] - result = subprocess.run( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) - try: - dur = float(result.stdout.strip()) - if dur <= 0: - raise ValueError(f"Invalid duration: {dur}") - return dur - except (ValueError, TypeError) as e: - logger.error(f"Cannot determine duration of {file_path}: {e}") - raise - - -def set_progress(line: str, dur, pbar): - if not line.startswith("out_time_ms="): - return - try: - us = int(line.split('=')[1]) - if us < 0: - return - current_sec = us / 1_000_000 - pbar.n = min(current_sec, dur) - pbar.refresh() - except (ValueError, IndexError, OverflowError): - return - - -def shorten_name(filename: str, flag: int = 1): - stem, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '') - stem = stem.replace(' ', '').upper() - ext = ext[:3].upper() - bstem = stem.encode(errors='ignore') - for i in range(8, 0, -1): - try: - cut = bstem[:i].decode() - if cut != stem: - cut += f'~{flag}' - stem = cut - break - except UnicodeDecodeError: - continue - return f'{stem}.{ext}' if ext else stem - - -def start_ffmpeg(infile: Path, outfile: Path): - cmd = [ - "ffmpeg", "-y", "-i", shorten_name(infile.name), - "-progress", "pipe:1", "-nostats", "-loglevel", "error", - *COMPRESS_OPTIONS, shorten_name(outfile.name, flag=2) - ] - logger.debug(f"Command: {' '.join(cmd)}") - cmd[3] = str(infile) - cmd[-1] = str(outfile) - return subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - bufsize=1, - text=True, - encoding='utf-8') - - -def clean_on_failure(proc: subprocess.Popen, outfile: Path): - if proc.poll() is None: - proc.kill() - proc.wait() - if outfile.exists(): - try: - outfile.unlink() - except OSError: - pass - - -def hevc_encode(infile: Path, outfile: Path, progress: bool = True): - dur = get_duration(infile) - logger.debug(f"Duration: {dur:.2f} seconds") - - proc = start_ffmpeg(infile, outfile) - pbar = get_progress_bar( - total=dur, unit='s', - bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})' - ) if progress and is_tty() else None - - try: - for line in iter(proc.stdout.readline, ''): - set_progress(line.strip(), dur, pbar) - if proc.poll() is not None: - break - - proc.wait() - if proc.returncode != 0: - raise subprocess.CalledProcessError(proc.returncode, "ffmpeg") - except ( - KeyboardInterrupt, - subprocess.CalledProcessError - ): - # Issue: maybe also interrupted by CtrlC - clean_on_failure(proc, outfile) - raise - finally: - if pbar: - pbar.close() - proc.stdout.close() +logger = get_logger('cmps_hevc_crf18') def build_file_list(srcdir, extensions): @@ -162,12 +33,13 @@ def resolve_output(infile: Path, outfile: Path, rm_original: bool = False): outfile.name.replace(".hevc.mp4", ".mp4")) is_path_conflict = final_out.resolve() == infile.resolve() if is_path_conflict: - logger.warning( - f"Filename conflict!\n i: {infile}\n o: {final_out}") + logger.warning("Path conflict!") + logger.debug(f" original: {infile}") + logger.debug(f" conflict: {outfile.name} -> {final_out.name}") if not rm_original: - logger.debug("Renaming original to avoid conflict.") + logger.debug("Renaming original to avoid overwriting.") infile.rename(infile.with_stem(infile.stem + '.bak')) - logger.debug(f"Output file: {final_out}") + logger.info(f"Resolved output: {final_out}") outfile.replace(final_out) if rm_original and not is_path_conflict: logger.debug(f"Remove original: {infile}") @@ -176,7 +48,6 @@ def resolve_output(infile: Path, outfile: Path, rm_original: bool = False): def parse_args(): parser = argparse.ArgumentParser( - prog=itsme, description='Encode video files to HEVC CRF 18.', formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -233,12 +104,10 @@ def main(): except (OSError, PermissionError) as e: logger.error(f"Job failed: {e}") raise - - isize = infile.stat().st_size / (1024 * 1024) - osize = outfile.stat().st_size / (1024 * 1024) - logger.info(f"Size deductions: {osize - isize:+.2f} MB") - logger.debug(f" i: {isize:.2f} MB -> o: {osize:.2f} MB") - + isize, osize = infile.stat().st_size, outfile.stat().st_size + delta = (osize - isize) / 1024 ** 2 + logger.info(f"Δ: {delta:+.2f} MB" + f" ({format_size(isize)} -> {format_size(osize)})") resolve_output(infile, outfile, args.remove_original) diff --git a/ffwrapper.py b/ffwrapper.py new file mode 100644 index 0000000..48fc85e --- /dev/null +++ b/ffwrapper.py @@ -0,0 +1,123 @@ +# @File : ffwrapper.py +# @Time : 2026/04/21 17:34:20 +# @Author : SilverAg.L + +import subprocess +from pathlib import Path + +from logger import get_logger +from progress import get_progress_bar, is_tty +from utils import shorten_name + +logger = get_logger('cmps_hevc_crf18') + + +def check_ffmpeg(): + try: + subprocess.run(['ffmpeg', '-version'], + capture_output=True, check=True) + subprocess.run(['ffprobe', '-version'], + capture_output=True, check=True) + except (subprocess.CalledProcessError, FileNotFoundError): + logger.error("ffmpeg not found or not working. Please install ffmpeg.") + raise SystemExit(1) + + +def get_duration(file_path: Path): + cmd = [ + 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path) + ] + result = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + try: + dur = float(result.stdout.strip()) + if dur <= 0: + raise ValueError(f"Invalid duration: {dur}") + return dur + except (ValueError, TypeError) as e: + logger.error(f"Cannot determine duration of {file_path}: {e}") + raise + + +COMPRESS_OPTIONS = [ + "-c:v", "libx265", "-crf", "18", "-preset", "medium", + "-c:a", "copy", + "-tag:v", "hvc1", +] + + +def start_ffmpeg(infile: Path, outfile: Path): + cmd = [ + "ffmpeg", "-y", "-i", shorten_name(infile.name), + "-progress", "pipe:1", "-nostats", "-loglevel", "error", + *COMPRESS_OPTIONS, shorten_name(outfile.name, flag=2) + ] + logger.debug(f"Command: {' '.join(cmd)}") + cmd[3] = str(infile) + cmd[-1] = str(outfile) + return subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + bufsize=1, + text=True, + encoding='utf-8') + + +def set_progress(line: str, dur, pbar): + if not line.startswith("out_time_ms="): + return + try: + us = int(line.split('=')[1]) + if us < 0: + return + current_sec = us / 1_000_000 + pbar.n = min(current_sec, dur) + pbar.refresh() + except (ValueError, IndexError, OverflowError): + return + + +def clean_on_failure(proc: subprocess.Popen, outfile: Path): + if proc.poll() is None: + proc.kill() + proc.wait() + if outfile.exists(): + try: + outfile.unlink() + except OSError: + pass + + +def hevc_encode(infile: Path, outfile: Path, progress: bool = True): + dur = get_duration(infile) + logger.debug(f"Duration: {dur:.2f} seconds") + + proc = start_ffmpeg(infile, outfile) + pbar = get_progress_bar( + total=dur, unit='s', + bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})' + ) if progress and is_tty() else None + + try: + for line in iter(proc.stdout.readline, ''): + set_progress(line.strip(), dur, pbar) + if proc.poll() is not None: + break + + proc.wait() + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, "ffmpeg") + except ( + KeyboardInterrupt, + subprocess.CalledProcessError + ): + # Issue: cleaning may also be interrupted by CtrlC + # (especially in Windows) + clean_on_failure(proc, outfile) + raise + finally: + if pbar: + pbar.close() + proc.stdout.close() diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..e1e8d9e --- /dev/null +++ b/utils.py @@ -0,0 +1,27 @@ +# @File : utils.py +# @Time : 2026/04/21 17:24:21 +# @Author : SilverAg.L + +def shorten_name(filename: str, flag: int = 1): + stem, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '') + stem = stem.replace(' ', '').upper() + ext = ext[:3].upper() + bstem = stem.encode(errors='ignore') + for i in range(8, 0, -1): + try: + cut = bstem[:i].decode() + if cut != stem: + cut += f'~{flag}' + stem = cut + break + except UnicodeDecodeError: + continue + return f'{stem}.{ext}' if ext else stem + + +def format_size(bytes: int) -> str: + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes < 1024: + return f"{bytes:.2f} {unit}" + bytes /= 1024 + return f"{bytes:.2f} PB"