Compare commits
9 Commits
b6f71289e2
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
5c94030067
|
|||
|
fe236a2cc0
|
|||
| b05289d71d | |||
| b01bc0145b | |||
| ef7965d274 | |||
|
2013d2e61d
|
|||
|
fcfd425981
|
|||
|
6a5c70d6fe
|
|||
|
fbec536fcd
|
Vendored
+16
@@ -0,0 +1,16 @@
|
|||||||
|
{
|
||||||
|
// Use IntelliSense to learn about possible attributes.
|
||||||
|
// Hover to view descriptions of existing attributes.
|
||||||
|
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Python Debugger: Current File with Arguments",
|
||||||
|
"type": "debugpy",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "${file}",
|
||||||
|
"console": "integratedTerminal",
|
||||||
|
"args": "${command:pickArgs}"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -1,2 +1,46 @@
|
|||||||
# python-ffmpeg-hevc-crf18
|
# python-ffmpeg-hevc-crf18
|
||||||
|
正如其名,按照固定配置(HEVC CRF 18)压制视频,“以尽可能不影响画面的情况下压缩体积”。
|
||||||
|
|
||||||
|
## 依赖
|
||||||
|
|
||||||
|
唯一不可或缺的只有`ffmpeg`。无论 Windows 还是 Linux,获取这个软件包并不难。
|
||||||
|
|
||||||
|
可选第三方 Python 库:`tqdm`和`colorlog`。没有也没关系,接口是兼容的。
|
||||||
|
|
||||||
|
## 应用
|
||||||
|
有两种命令行调用。当模块导入挑着用也没问题,只是没必要。
|
||||||
|
|
||||||
|
- `cmps_hevc_crf18 [-d SRCDIR] [-o OUTDIR] [-f EXT1] [-f EXT2] [-r] [-s]`
|
||||||
|
- `cmps_hevc_crf18 [-r] [-s] [-o OUTDIR] files`
|
||||||
|
|
||||||
|
剩下的直接翻译 argparse 帮助文本得了:
|
||||||
|
```
|
||||||
|
编码视频文件为 HEVC CRF 18(MP4)格式。
|
||||||
|
|
||||||
|
位置参数:
|
||||||
|
files 单独的视频文件(与 -d 互斥,二选一)
|
||||||
|
|
||||||
|
选项:
|
||||||
|
-h, --help 鹰文帮助
|
||||||
|
-d, --dir SRCDIR 源文件夹(会在里面搜索指定后缀的文件,与 files 互斥)
|
||||||
|
-o, --outdir OUTDIR 输出文件夹(默认原地,跟 -d 合用会保留相对路径)
|
||||||
|
-f, --ext EXTENSIONS 指定的后缀名(.mp4 和 mp4 均可,可重复指定多个后缀,不区分大小写)
|
||||||
|
-r, --rm-original 跑完是否把原文件删了
|
||||||
|
-s, --silent 静默(不显示进度条)
|
||||||
|
```
|
||||||
|
|
||||||
|
## 背景
|
||||||
|
实际就是命令行:
|
||||||
|
```sh
|
||||||
|
ffmpeg -i input.mp4 -c:v libx265 -crf 18 -preset medium -c:a copy -tag:v hvc1 output.mp4
|
||||||
|
```
|
||||||
|
写那么多 Python 封装只是为了路径处理和好看一点的终端输出。
|
||||||
|
|
||||||
|
而在 Windows 里文件所有权并不复杂(当前用户通常都可以读写),我更倾向于用 PowerShell:
|
||||||
|
```powershell
|
||||||
|
get-childitem path -File -Recurse -Filter *.mp4 | foreach {
|
||||||
|
cmps_hevc $_.FullName # 依旧 ffmpeg,也就变换一下 input.mp4 output.mp4
|
||||||
|
rename-item $_.FullName "$($_.Name).bak"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
就算后续还要筛选(好比有的视频压制完比原文件还大),也比 Python 方便得多。
|
||||||
|
|||||||
Executable
+115
@@ -0,0 +1,115 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
import argparse
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from ffwrapper import check_ffmpeg, hevc_encode
|
||||||
|
from logger import get_logger
|
||||||
|
from utils import format_size
|
||||||
|
|
||||||
|
logger = get_logger('cmps_hevc_crf18')
|
||||||
|
|
||||||
|
|
||||||
|
def build_file_list(srcdir, extensions):
|
||||||
|
exts = {ext.lower().lstrip('.') for ext in extensions}
|
||||||
|
return [
|
||||||
|
path.absolute() for path in Path(srcdir).rglob('*')
|
||||||
|
if path.is_file() and path.suffix.lower().lstrip('.') in exts
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def make_output_path(infile: Path, outdir: Path = None, relroot: str = None):
|
||||||
|
if not outdir:
|
||||||
|
return infile.with_suffix('.hevc.mp4')
|
||||||
|
outdir = Path(outdir)
|
||||||
|
if not relroot:
|
||||||
|
return outdir / infile.with_suffix('.hevc.mp4').name
|
||||||
|
rel_path = infile.relative_to(
|
||||||
|
Path(relroot).absolute()).with_suffix('.hevc.mp4')
|
||||||
|
return outdir / rel_path
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_output(infile: Path, outfile: Path, rm_original: bool = False):
|
||||||
|
final_out = outfile.with_name(
|
||||||
|
outfile.name.replace(".hevc.mp4", ".mp4"))
|
||||||
|
is_path_conflict = final_out.resolve() == infile.resolve()
|
||||||
|
if is_path_conflict:
|
||||||
|
logger.warning("Path conflict!")
|
||||||
|
logger.debug(f" original: {infile}")
|
||||||
|
logger.debug(f" conflict: {outfile.name} -> {final_out.name}")
|
||||||
|
if not rm_original:
|
||||||
|
logger.debug("Renaming original to avoid overwriting.")
|
||||||
|
infile.rename(infile.with_stem(infile.stem + '.bak'))
|
||||||
|
logger.info(f"Resolved output: {final_out}")
|
||||||
|
outfile.replace(final_out)
|
||||||
|
if rm_original and not is_path_conflict:
|
||||||
|
logger.debug(f"Remove original: {infile}")
|
||||||
|
infile.unlink()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Encode video files to HEVC CRF 18.',
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
)
|
||||||
|
parser.add_argument('-d', '--dir', dest='srcdir',
|
||||||
|
help='Source directory')
|
||||||
|
parser.add_argument('-o', '--outdir', dest='outdir',
|
||||||
|
help='Output directory')
|
||||||
|
parser.add_argument('-f', '--ext', dest='extensions', action='append',
|
||||||
|
default=[],
|
||||||
|
help='Extensions to search for. Can be repeated.')
|
||||||
|
parser.add_argument('-r', '--rm-original', dest='remove_original',
|
||||||
|
action='store_true',
|
||||||
|
help='Remove original after encoding')
|
||||||
|
parser.add_argument('-s', '--silent', dest='silent',
|
||||||
|
action='store_true',
|
||||||
|
help='Keep silent (no progress bar)')
|
||||||
|
parser.add_argument('files', nargs='*',
|
||||||
|
help='Input files (with no -d/--dir)')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.srcdir and args.files:
|
||||||
|
parser.error('cannot mix -d/--dir with input files')
|
||||||
|
if not args.srcdir and not args.files:
|
||||||
|
parser.error('either -d/--dir or input files must be provided')
|
||||||
|
if args.srcdir and not args.extensions:
|
||||||
|
args.extensions = ['mp4']
|
||||||
|
return args
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
check_ffmpeg()
|
||||||
|
args = parse_args()
|
||||||
|
|
||||||
|
if args.srcdir:
|
||||||
|
files = build_file_list(args.srcdir, args.extensions)
|
||||||
|
errmsg = (f'No files found in {args.srcdir} '
|
||||||
|
f'with extensions: {args.extensions}')
|
||||||
|
else:
|
||||||
|
files = [Path(f).absolute() for f in args.files if Path(f).is_file()]
|
||||||
|
errmsg = 'No input files provided'
|
||||||
|
if not files:
|
||||||
|
raise SystemExit(errmsg)
|
||||||
|
if len(files) > 1:
|
||||||
|
files.sort(key=lambda x: x.stat().st_size)
|
||||||
|
|
||||||
|
for idx, infile in enumerate(files, 1):
|
||||||
|
logger.info(f"[{idx}/{len(files)}] {infile.name}")
|
||||||
|
|
||||||
|
outfile = make_output_path(infile, args.outdir, args.srcdir)
|
||||||
|
outfile.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
try:
|
||||||
|
outfile.touch(0o644)
|
||||||
|
hevc_encode(infile, outfile, not args.silent)
|
||||||
|
except (OSError, PermissionError) as e:
|
||||||
|
logger.error(f"Job failed: {e}")
|
||||||
|
raise
|
||||||
|
isize, osize = infile.stat().st_size, outfile.stat().st_size
|
||||||
|
delta = (osize - isize) / 1024 ** 2
|
||||||
|
logger.info(f"Δ: {delta:+.2f} MB"
|
||||||
|
f" ({format_size(isize)} -> {format_size(osize)})")
|
||||||
|
resolve_output(infile, outfile, args.remove_original)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
+123
@@ -0,0 +1,123 @@
|
|||||||
|
# @File : ffwrapper.py
|
||||||
|
# @Time : 2026/04/21 17:34:20
|
||||||
|
# @Author : SilverAg.L
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from logger import get_logger
|
||||||
|
from progress import get_progress_bar, is_tty
|
||||||
|
from utils import shorten_name
|
||||||
|
|
||||||
|
logger = get_logger('cmps_hevc_crf18')
|
||||||
|
|
||||||
|
|
||||||
|
def check_ffmpeg():
|
||||||
|
try:
|
||||||
|
subprocess.run(['ffmpeg', '-version'],
|
||||||
|
capture_output=True, check=True)
|
||||||
|
subprocess.run(['ffprobe', '-version'],
|
||||||
|
capture_output=True, check=True)
|
||||||
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||||
|
logger.error("ffmpeg not found or not working. Please install ffmpeg.")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def get_duration(file_path: Path):
|
||||||
|
cmd = [
|
||||||
|
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
|
||||||
|
'-of', 'default=noprint_wrappers=1:nokey=1', str(file_path)
|
||||||
|
]
|
||||||
|
result = subprocess.run(
|
||||||
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
||||||
|
try:
|
||||||
|
dur = float(result.stdout.strip())
|
||||||
|
if dur <= 0:
|
||||||
|
raise ValueError(f"Invalid duration: {dur}")
|
||||||
|
return dur
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
logger.error(f"Cannot determine duration of {file_path}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
COMPRESS_OPTIONS = [
|
||||||
|
"-c:v", "libx265", "-crf", "18", "-preset", "medium",
|
||||||
|
"-c:a", "copy",
|
||||||
|
"-tag:v", "hvc1",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def start_ffmpeg(infile: Path, outfile: Path):
|
||||||
|
cmd = [
|
||||||
|
"ffmpeg", "-y", "-i", shorten_name(infile.name),
|
||||||
|
"-progress", "pipe:1", "-nostats", "-loglevel", "error",
|
||||||
|
*COMPRESS_OPTIONS, shorten_name(outfile.name, flag=2)
|
||||||
|
]
|
||||||
|
logger.debug(f"Command: {' '.join(cmd)}")
|
||||||
|
cmd[3] = str(infile)
|
||||||
|
cmd[-1] = str(outfile)
|
||||||
|
return subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.STDOUT,
|
||||||
|
bufsize=1,
|
||||||
|
text=True,
|
||||||
|
encoding='utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
def set_progress(line: str, dur, pbar):
|
||||||
|
if not pbar or not line.startswith("out_time_ms="):
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
us = int(line.split('=')[1])
|
||||||
|
if us < 0:
|
||||||
|
return
|
||||||
|
current_sec = us / 1_000_000
|
||||||
|
pbar.n = min(current_sec, dur)
|
||||||
|
pbar.refresh()
|
||||||
|
except (ValueError, IndexError, OverflowError):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def clean_on_failure(proc: subprocess.Popen, outfile: Path):
|
||||||
|
if proc.poll() is None:
|
||||||
|
proc.kill()
|
||||||
|
proc.wait()
|
||||||
|
if outfile.exists():
|
||||||
|
try:
|
||||||
|
outfile.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def hevc_encode(infile: Path, outfile: Path, progress: bool = True):
|
||||||
|
dur = get_duration(infile)
|
||||||
|
logger.debug(f"Duration: {dur:.2f} seconds")
|
||||||
|
|
||||||
|
proc = start_ffmpeg(infile, outfile)
|
||||||
|
pbar = get_progress_bar(
|
||||||
|
total=dur, unit='s',
|
||||||
|
bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})'
|
||||||
|
) if progress and is_tty() else None
|
||||||
|
|
||||||
|
try:
|
||||||
|
for line in iter(proc.stdout.readline, ''):
|
||||||
|
set_progress(line.strip(), dur, pbar)
|
||||||
|
if proc.poll() is not None:
|
||||||
|
break
|
||||||
|
|
||||||
|
proc.wait()
|
||||||
|
if proc.returncode != 0:
|
||||||
|
raise subprocess.CalledProcessError(proc.returncode, "ffmpeg")
|
||||||
|
except (
|
||||||
|
KeyboardInterrupt,
|
||||||
|
subprocess.CalledProcessError
|
||||||
|
):
|
||||||
|
# Issue: cleaning may also be interrupted by CtrlC
|
||||||
|
# (especially in Windows)
|
||||||
|
clean_on_failure(proc, outfile)
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if pbar:
|
||||||
|
pbar.close()
|
||||||
|
proc.stdout.close()
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
# @File : logger.py
|
||||||
|
# @Time : 2026/04/12 22:18:00
|
||||||
|
# @Author : SilverAg.L
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
try:
|
||||||
|
import colorlog
|
||||||
|
fmt_arr = [
|
||||||
|
"%(cyan)s%(asctime)s.%(msecs)03d%(reset)s",
|
||||||
|
"%(log_color)s%(levelname)-7s%(reset)s",
|
||||||
|
"%(blue)s%(message)s%(reset)s",
|
||||||
|
]
|
||||||
|
fmt = colorlog.ColoredFormatter(
|
||||||
|
" | ".join(fmt_arr),
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
|
except ImportError:
|
||||||
|
fmt = logging.Formatter(
|
||||||
|
"%(asctime)s.%(msecs)03d | %(levelname)-7s | %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
|
|
||||||
|
fmt.default_msec_format = "%s.%03d"
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
handler.setFormatter(fmt)
|
||||||
|
|
||||||
|
|
||||||
|
def get_logger(name=None):
|
||||||
|
logger = logging.getLogger(name or __name__)
|
||||||
|
if not logger.handlers:
|
||||||
|
logger.addHandler(handler)
|
||||||
|
logger.setLevel('DEBUG')
|
||||||
|
return logger
|
||||||
|
|
||||||
|
|
||||||
|
def test_logger():
|
||||||
|
log1 = get_logger("test1")
|
||||||
|
_ = get_logger("test1")
|
||||||
|
log1.info("Should print once")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_logger()
|
||||||
+65
@@ -0,0 +1,65 @@
|
|||||||
|
# @File : progress.py
|
||||||
|
# @Time : 2026/04/12 22:20:53
|
||||||
|
# @Author : SilverAg.L
|
||||||
|
|
||||||
|
from shutil import get_terminal_size
|
||||||
|
from sys import stdout
|
||||||
|
|
||||||
|
|
||||||
|
class RawProgressBar:
|
||||||
|
def __init__(self, total=100, unit='%', bar_format=None, **kwargs):
|
||||||
|
self.total = total
|
||||||
|
self.unit = unit
|
||||||
|
self.n = 0
|
||||||
|
self.cols = get_terminal_size((40, 20)).columns
|
||||||
|
self.bar_format = bar_format # unused
|
||||||
|
|
||||||
|
def update(self, delta):
|
||||||
|
self.n += delta
|
||||||
|
self.refresh()
|
||||||
|
|
||||||
|
def refresh(self):
|
||||||
|
cur = min(self.n, self.total)
|
||||||
|
pct = min(cur / self.total if self.total > 0 else 0, 1.0)
|
||||||
|
pct_str = f"{pct * 100:3.0f}%"
|
||||||
|
rate_str = f"{cur:.2f}/{self.total:.2f}({self.unit})"
|
||||||
|
fixed_len = len(pct_str) + len(rate_str) + 10
|
||||||
|
bar_width = max(self.cols - fixed_len, 10)
|
||||||
|
filled_len = int(bar_width * pct)
|
||||||
|
bar = '>' * filled_len + '-' * (bar_width - filled_len)
|
||||||
|
output = f"\r {pct_str} [{bar}] {rate_str}".ljust(self.cols)
|
||||||
|
stdout.write(output)
|
||||||
|
stdout.flush()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
stdout.write('\n')
|
||||||
|
stdout.flush()
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
def get_progress_bar(total, unit, bar_format=None):
|
||||||
|
return tqdm(file=stdout, total=total, unit=unit, bar_format=bar_format)
|
||||||
|
except ImportError:
|
||||||
|
def get_progress_bar(total, unit, bar_format=None):
|
||||||
|
return RawProgressBar(total=total, unit=unit, bar_format=bar_format)
|
||||||
|
|
||||||
|
|
||||||
|
def is_tty():
|
||||||
|
return stdout.isatty()
|
||||||
|
|
||||||
|
|
||||||
|
def test_progress_bar():
|
||||||
|
from time import sleep
|
||||||
|
pbar = get_progress_bar(total=100, unit='%')
|
||||||
|
for i in range(101):
|
||||||
|
sleep(0.1)
|
||||||
|
pbar.n = i
|
||||||
|
pbar.refresh()
|
||||||
|
pbar.close()
|
||||||
|
print("Done!")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_progress_bar()
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
# @File : utils.py
|
||||||
|
# @Time : 2026/04/21 17:24:21
|
||||||
|
# @Author : SilverAg.L
|
||||||
|
|
||||||
|
def shorten_name(filename: str, flag: int = 1):
|
||||||
|
stem, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '')
|
||||||
|
stem = stem.replace(' ', '').upper()
|
||||||
|
ext = ext[:3].upper()
|
||||||
|
bstem = stem.encode(errors='ignore')
|
||||||
|
for i in range(8, 0, -1):
|
||||||
|
try:
|
||||||
|
cut = bstem[:i].decode()
|
||||||
|
if cut != stem:
|
||||||
|
cut += f'~{flag}'
|
||||||
|
stem = cut
|
||||||
|
break
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
continue
|
||||||
|
return f'{stem}.{ext}' if ext else stem
|
||||||
|
|
||||||
|
|
||||||
|
def format_size(bytes: int) -> str:
|
||||||
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
||||||
|
if bytes < 1024:
|
||||||
|
return f"{bytes:.2f} {unit}"
|
||||||
|
bytes /= 1024
|
||||||
|
return f"{bytes:.2f} PB"
|
||||||
Reference in New Issue
Block a user