From fcfd425981bc8216c32fff6d1a81de9994ec40c9 Mon Sep 17 00:00:00 2001 From: "SilverAg.L" Date: Mon, 13 Apr 2026 21:05:23 +0800 Subject: [PATCH] rewrite for robustness. --- README.md | 56 ++++++------- cmps_hevc_crf18.py | 201 +++++++++++++++++++++------------------------ logger.py | 44 ++++++++++ progress.py | 65 +++++++++++++++ 4 files changed, 228 insertions(+), 138 deletions(-) create mode 100644 logger.py create mode 100644 progress.py diff --git a/README.md b/README.md index f3454df..5a0901f 100644 --- a/README.md +++ b/README.md @@ -5,44 +5,42 @@ 唯一不可或缺的只有`ffmpeg`。无论 Windows 还是 Linux,获取这个软件包并不难。 -可选第三方 Python 库:`tqdm`和`colorlog`,让终端输出更“摩登”一些。 +可选第三方 Python 库:`tqdm`和`colorlog`。没有也没关系,接口是兼容的。 -
+## 应用 +有两种命令行调用。当模块导入挑着用也没问题,只是没必要。 -fallback +- `cmps_hevc_crf18 [-d SRCDIR] [-o OUTDIR] [-f EXT1] [-f EXT2] [-r] [-s]` +- `cmps_hevc_crf18 [-r] [-s] [-o OUTDIR] files` -我的模块旨在作为脚本\*单文件运行\*,随意加第三方依赖自然意味着没办法开箱即用。因此,哪怕是部署上我的 Ubuntu Server,我也得考虑 ImportError 时的回滚方案。 +剩下的直接翻译 argparse 帮助文本得了: +``` +编码视频文件为 HEVC CRF 18(MP4)格式。 -- `colorlog`:其仓库文档就介绍了跟标准库组合使用的案例。仔细读就不难发现:传入不同`Formatter`即可。 -- `tqdm`:那自然是手撕一个`RawProgressBar`了。毕竟使用场景简单,我撕出来的进度条只实现了用到的方法和`__init__`参数。 +位置参数: + files 单独的视频文件(与 -d 互斥,二选一) -
+选项: + -h, --help 鹰文帮助 + -d, --dir SRCDIR 源文件夹(会在里面搜索指定后缀的文件,与 files 互斥) + -o, --outdir OUTDIR 输出文件夹(默认原地,跟 -d 合用会保留相对路径) + -f, --ext EXTENSIONS 指定的后缀名(.mp4 和 mp4 均可,可重复指定多个后缀,不区分大小写) + -r, --rm-original 跑完是否把原文件删了 + -s, --silent 静默(不显示进度条) +``` ## 背景 -最初看到类似的实践其实是在某个涩涩论坛上。原帖将近 180GB 的原始资源压制到 30+GB 且几乎不怎么影响画质,巨大的体积差引起了我的兴趣。 - -后来在 QQ 群里有人给出了这么个命令行: +实际就是命令行: ```sh ffmpeg -i input.mp4 -c:v libx265 -crf 18 -preset medium -c:a copy -tag:v hvc1 output.mp4 ``` -这一行命令参数就是窝脚本的核心逻辑了。和大多数在 Python 里封装 ffmpeg 调用的第三方库一样,我也只是包装一下。 +写那么多 Python 封装只是为了路径处理和好看一点的终端输出。 -## 后记 -前身也是个 Bash 脚本。奈何在 Bash 里折腾路径实在是有些疲累。有一次出现了我没考虑到的边界情况: -```bash -abs_src="/home/agxcoy/Downloads/blabla" -out_dir="~/Videos/blabla" - -relpath="subdir/sub.mp4" -# => mkdir -p "/home/agxcoy/Videos//home/agxcoy/Downloads/blabla/subdir/" +而在 Windows 里文件所有权并不复杂(当前用户通常都可以读写),我更倾向于用 PowerShell: +```powershell +get-childitem path -File -Recurse -Filter *.mp4 | foreach { + cmps_hevc $_.FullName # 依旧 ffmpeg,也就变换一下 input.mp4 output.mp4 + rename-item $_.FullName "$($_.Name).bak" +} ``` -气笑了。 - -然后最初是打算`os.walk`,但问了下 Google AI、VSCode Copilot,发现`pathlib`还真挺好用,就整个“翻译”完替换掉 Bash 版了。 - -## 已知问题 -事实上边界情况还没测试完全,现在也不过能正常走完我的基本流程罢了。 - -除此之外,进度条本身也有点问题。我应该先在外面四舍五入`current_time` `total_time`再丢进进度条里显示的。因为在`tqdm`那看到了`327.099...9/660.377007`这么长的文字。 - -再者就是性能问题了。我处理的视频文件基本以 GB 计。同时跑太多 ffmpeg 进程,内存不够分(我在 Server 上`tmux`挂后台,似乎日志写多了还出现了掉盘)。逐个逐个跑吧,性能使用率不高。我看有些 ffmpeg 封装库利用了线程池和 asyncio,值得参考。 +就算后续还要筛选(好比有的视频压制完比原文件还大),也比 Python 方便得多。 diff --git a/cmps_hevc_crf18.py b/cmps_hevc_crf18.py index 3897687..990a7d6 100755 --- a/cmps_hevc_crf18.py +++ b/cmps_hevc_crf18.py @@ -1,72 +1,32 @@ #!/usr/bin/python3 import argparse import subprocess -import re -import logging -from shutil import get_terminal_size -from sys import argv, stdout + +from sys import argv from pathlib import Path -try: - import colorlog - fmt_arr = [ - "%(cyan)s%(asctime)s.%(msecs)03d%(reset)s", - "%(log_color)s%(levelname)-7s%(reset)s", - "%(blue)s%(message)s%(reset)s", - ] - fmt = colorlog.ColoredFormatter( - " | ".join(fmt_arr), - datefmt="%Y-%m-%d %H:%M:%S" - ) -except ImportError: - fmt = logging.Formatter( - "%(asctime)s.%(msecs)03d | %(levelname)-7s | %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" - ) +from logger import get_logger +from progress import get_progress_bar, is_tty + +COMPRESS_OPTIONS = [ + "-c:v", "libx265", "-crf", "18", "-preset", "medium", + "-c:a", "copy", + "-tag:v", "hvc1", +] -fmt.default_msec_format = "%s.%03d" -handler = logging.StreamHandler() -handler.setFormatter(fmt) -logger = logging.getLogger('example') -logger.addHandler(handler) -logger.setLevel('DEBUG') itsme = Path(argv[0]).name +logger = get_logger(itsme) -class RawProgressBar: - def __init__(self, total=100, unit='%'): - self.total = total - self.unit = unit - self.current = 0 - self.cols = get_terminal_size((40, 20)).columns - - def update(self, delta): - self.current += delta - self._print_bar() - - def _print_bar(self): - cur = min(self.current, self.total) - pct = min(cur / self.total if self.total > 0 else 0, 1.0) - pct_str = f"{pct * 100:3.1f}%" - rate_str = f"{cur:.2f}/{self.total:.2f}({self.unit})" - fixed_len = len(pct_str) + len(rate_str) + 10 - bar_width = max(self.cols - fixed_len, 10) - filled_len = int(bar_width * pct) - bar = '>' * filled_len + '-' * (bar_width - filled_len) - output = f"\r {pct_str} [{bar}] {rate_str}" - stdout.write(output) - stdout.flush() - - def close(self): - stdout.write('\n') - stdout.flush() - - -try: - from tqdm import tqdm - ProgressBar = tqdm -except ImportError: - ProgressBar = RawProgressBar +def check_ffmpeg(): + try: + subprocess.run(['ffmpeg', '-version'], + capture_output=True, check=True) + subprocess.run(['ffprobe', '-version'], + capture_output=True, check=True) + except (subprocess.CalledProcessError, FileNotFoundError): + logger.error("ffmpeg not found or not working. Please install ffmpeg.") + raise SystemExit(1) def get_duration(file_path: Path): @@ -75,57 +35,72 @@ def get_duration(file_path: Path): '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path) ] result = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True - ) - return float(result.stdout.strip()) + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + try: + dur = float(result.stdout.strip()) + if dur <= 0: + raise ValueError(f"Invalid duration: {dur}") + return dur + except (ValueError, TypeError) as e: + logger.error(f"Cannot determine duration of {file_path}: {e}") + raise -def time_to_seconds(time_str): - h, m, s = map(float, time_str.split(':')) - return h * 3600 + m * 60 + s +def read_progress(ostream_line, dur, pbar): + if not ostream_line.startswith("out_time_ms="): + return + try: + us = int(ostream_line.split('=')[1]) + if us < 0: + return + secs = us / 1_000_000 + if secs > dur * 1.1: # Allow 10% overshoot + pbar.n = dur + else: + pbar.n = secs + pbar.refresh() + except (ValueError, OverflowError): + pass def hevc_encode(infile: Path, outfile: Path, progress: bool = True): + # too heavy bro. dur = get_duration(infile) cmd = [ - "ffmpeg", "-y", - "-i", str(infile), - "-c:v", "libx265", "-crf", "18", "-preset", "medium", - "-c:a", "copy", - "-tag:v", "hvc1", - str(outfile) + "ffmpeg", "-y", "-i", str(infile), + "-progress", "pipe:1", "-nostats", + *COMPRESS_OPTIONS, str(outfile) ] - logger.info(f"Encoding {Path(infile).name} to HEVC CRF 18...") logger.debug(f"Duration: {dur:.2f} seconds") logger.debug(f"Command: {' '.join(cmd)}") proc = subprocess.Popen( - cmd, - stderr=subprocess.PIPE, - text=True, - encoding='utf-8' - ) - - pbar = ProgressBar(total=dur, unit='s') if progress else None - last_time = 0 + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True, encoding='utf-8', bufsize=1) + progress = progress and is_tty() + pbar = None if not progress else get_progress_bar( + total=dur, unit='s', + bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})') try: while True: - line = proc.stderr.readline() + line = proc.stdout.readline() if not line and proc.poll() is not None: break if not pbar: continue - match = re.search(r"time=(\d{2}:\d{2}:\d{2}.\d{2})", line) - if match: - current_time = time_to_seconds(match.group(1)) - pbar.update(current_time - last_time) - last_time = current_time - except KeyboardInterrupt: - logger.warning("SIGINT received, terminating ffmpeg ...") - proc.kill() + read_progress(line.strip(), dur, pbar) proc.wait() + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, cmd) + except ( + KeyboardInterrupt, + subprocess.CalledProcessError + ) as e: + if proc.poll() is None: + proc.kill() + proc.wait() + logger.error( + f"Encoding failed with code {proc.returncode}:" + f" {e.__class__.__name__}") if outfile.exists(): try: outfile.unlink() @@ -136,11 +111,6 @@ def hevc_encode(infile: Path, outfile: Path, progress: bool = True): if pbar: pbar.close() - if proc.returncode != 0: - # logger.error(proc.stdout.read()) - # logger.error(proc.stderr.readlines()) - raise subprocess.CalledProcessError(proc.returncode, cmd) - def build_file_list(srcdir, extensions): exts = {ext.lower().lstrip('.') for ext in extensions} @@ -177,7 +147,8 @@ def parse_args(): parser.add_argument('-r', '--rm-original', dest='remove_original', action='store_true', help='Remove original after encoding') - parser.add_argument('-s', '--silent', dest='silent', action='store_true', + parser.add_argument('-s', '--silent', dest='silent', + action='store_true', help='Keep silent (no progress bar)') parser.add_argument('files', nargs='*', help='Input files (with no -d/--dir)') @@ -193,6 +164,7 @@ def parse_args(): def main(): + check_ffmpeg() args = parse_args() if args.srcdir: @@ -205,21 +177,32 @@ def main(): if not files: raise SystemExit(errmsg) - for infile in files: - # print(f"Processing: {infile}") + for idx, infile in enumerate(files, 1): + logger.info(f"[{idx}/{len(files)}] {infile.name}") + outfile = make_output_path(infile, args.outdir, args.srcdir) outfile.parent.mkdir(parents=True, exist_ok=True) - hevc_encode(infile, outfile, not args.silent) + try: + outfile.touch(0o644) + hevc_encode(infile, outfile, not args.silent) + except (OSError, PermissionError) as e: + logger.error(f"Job failed: {e}") + raise final_out = outfile.with_name( outfile.name.replace(".hevc.mp4", ".mp4")) - if args.remove_original: - logger.debug(f"Remove original: {infile}") - infile.unlink() - elif final_out.resolve() == infile.resolve(): - logger.warning(f"Filename conflict: {infile} <= {final_out}") - infile.rename(infile.with_suffix('.bak.mp4')) + if final_out.resolve() == infile.resolve(): + logger.warning( + f"Filename conflict!\n i: {infile}\n o: {final_out}") + if not args.remove_original: + logger.debug("renaming original to avoid conflict.") + infile.rename(infile.with_stem(infile.stem + '.bak')) + outfile.replace(final_out) + else: + outfile.replace(final_out) + if args.remove_original: + logger.debug(f"Remove original: {infile}") + infile.unlink() logger.debug(f"Output file: {final_out}") - outfile.replace(final_out) if __name__ == '__main__': diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..114c4ea --- /dev/null +++ b/logger.py @@ -0,0 +1,44 @@ +# @File : logger.py +# @Time : 2026/04/12 22:18:00 +# @Author : SilverAg.L + +import logging + +try: + import colorlog + fmt_arr = [ + "%(cyan)s%(asctime)s.%(msecs)03d%(reset)s", + "%(log_color)s%(levelname)-7s%(reset)s", + "%(blue)s%(message)s%(reset)s", + ] + fmt = colorlog.ColoredFormatter( + " | ".join(fmt_arr), + datefmt="%Y-%m-%d %H:%M:%S" + ) +except ImportError: + fmt = logging.Formatter( + "%(asctime)s.%(msecs)03d | %(levelname)-7s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + +fmt.default_msec_format = "%s.%03d" +handler = logging.StreamHandler() +handler.setFormatter(fmt) + + +def get_logger(name=None): + logger = logging.getLogger(name or __name__) + if not logger.handlers: + logger.addHandler(handler) + logger.setLevel('DEBUG') + return logger + + +def test_logger(): + log1 = get_logger("test1") + _ = get_logger("test1") + log1.info("Should print once") + + +if __name__ == "__main__": + test_logger() diff --git a/progress.py b/progress.py new file mode 100644 index 0000000..168cd63 --- /dev/null +++ b/progress.py @@ -0,0 +1,65 @@ +# @File : progress.py +# @Time : 2026/04/12 22:20:53 +# @Author : SilverAg.L + +from shutil import get_terminal_size +from sys import stdout + + +class RawProgressBar: + def __init__(self, total=100, unit='%', bar_format=None, **kwargs): + self.total = total + self.unit = unit + self.n = 0 + self.cols = get_terminal_size((40, 20)).columns + self.bar_format = bar_format # unused + + def update(self, delta): + self.n += delta + self.refresh() + + def refresh(self): + cur = min(self.n, self.total) + pct = min(cur / self.total if self.total > 0 else 0, 1.0) + pct_str = f"{pct * 100:3.0f}%" + rate_str = f"{cur:.2f}/{self.total:.2f}({self.unit})" + fixed_len = len(pct_str) + len(rate_str) + 10 + bar_width = max(self.cols - fixed_len, 10) + filled_len = int(bar_width * pct) + bar = '>' * filled_len + '-' * (bar_width - filled_len) + output = f"\r {pct_str} [{bar}] {rate_str}".ljust(self.cols) + stdout.write(output) + stdout.flush() + + def close(self): + stdout.write('\n') + stdout.flush() + + +try: + from tqdm import tqdm + + def get_progress_bar(total, unit, bar_format=None): + return tqdm(file=stdout, total=total, unit=unit, bar_format=bar_format) +except ImportError: + def get_progress_bar(total, unit, bar_format=None): + return RawProgressBar(total=total, unit=unit, bar_format=bar_format) + + +def is_tty(): + return stdout.isatty() + + +def test_progress_bar(): + from time import sleep + pbar = get_progress_bar(total=100, unit='%') + for i in range(101): + sleep(0.1) + pbar.n = i + pbar.refresh() + pbar.close() + print("Done!") + + +if __name__ == "__main__": + test_progress_bar()