Compare commits

...

5 Commits

Author SHA1 Message Date
AgxCOy 5c94030067 bugfix: 无进度条时仍试图刷新进度 2026-05-11 01:23:10 +08:00
AgxCOy fe236a2cc0 改进日志表述 2026-04-21 19:59:13 +08:00
AgxCOy b05289d71d 简化日志中给出的命令行 2026-04-20 20:24:21 +08:00
AgxCOy b01bc0145b feat: 压缩前后体积对比
顺便也分离文件名冲突处理逻辑
2026-04-20 13:57:00 +08:00
AgxCOy ef7965d274 fix: Windows环境下进度条因stdout阻塞迟迟未能显示
顺便重构一下`hevc_encode`,这样就“轻”多了。
2026-04-20 13:50:49 +08:00
4 changed files with 194 additions and 123 deletions
+16
View File
@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File with Arguments",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"args": "${command:pickArgs}"
}
]
}
+28 -123
View File
@@ -1,115 +1,12 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
import subprocess
from sys import argv
from pathlib import Path from pathlib import Path
from ffwrapper import check_ffmpeg, hevc_encode
from logger import get_logger from logger import get_logger
from progress import get_progress_bar, is_tty from utils import format_size
COMPRESS_OPTIONS = [ logger = get_logger('cmps_hevc_crf18')
"-c:v", "libx265", "-crf", "18", "-preset", "medium",
"-c:a", "copy",
"-tag:v", "hvc1",
]
itsme = Path(argv[0]).name
logger = get_logger(itsme)
def check_ffmpeg():
try:
subprocess.run(['ffmpeg', '-version'],
capture_output=True, check=True)
subprocess.run(['ffprobe', '-version'],
capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error("ffmpeg not found or not working. Please install ffmpeg.")
raise SystemExit(1)
def get_duration(file_path: Path):
cmd = [
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', str(file_path)
]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
try:
dur = float(result.stdout.strip())
if dur <= 0:
raise ValueError(f"Invalid duration: {dur}")
return dur
except (ValueError, TypeError) as e:
logger.error(f"Cannot determine duration of {file_path}: {e}")
raise
def read_progress(ostream_line, dur, pbar):
if not ostream_line.startswith("out_time_ms="):
return
try:
us = int(ostream_line.split('=')[1])
if us < 0:
return
secs = us / 1_000_000
if secs > dur * 1.1: # Allow 10% overshoot
pbar.n = dur
else:
pbar.n = secs
pbar.refresh()
except (ValueError, OverflowError):
pass
def hevc_encode(infile: Path, outfile: Path, progress: bool = True):
# too heavy bro.
dur = get_duration(infile)
cmd = [
"ffmpeg", "-y", "-i", str(infile),
"-progress", "pipe:1", "-nostats",
*COMPRESS_OPTIONS, str(outfile)
]
logger.debug(f"Duration: {dur:.2f} seconds")
logger.debug(f"Command: {' '.join(cmd)}")
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, encoding='utf-8', bufsize=1)
progress = progress and is_tty()
pbar = None if not progress else get_progress_bar(
total=dur, unit='s',
bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})')
try:
while True:
line = proc.stdout.readline()
if not line and proc.poll() is not None:
break
if not pbar:
continue
read_progress(line.strip(), dur, pbar)
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, cmd)
except (
KeyboardInterrupt,
subprocess.CalledProcessError
) as e:
if proc.poll() is None:
proc.kill()
proc.wait()
logger.error(
f"Encoding failed with code {proc.returncode}:"
f" {e.__class__.__name__}")
if outfile.exists():
try:
outfile.unlink()
except OSError:
pass
raise
finally:
if pbar:
pbar.close()
def build_file_list(srcdir, extensions): def build_file_list(srcdir, extensions):
@@ -131,9 +28,26 @@ def make_output_path(infile: Path, outdir: Path = None, relroot: str = None):
return outdir / rel_path return outdir / rel_path
def resolve_output(infile: Path, outfile: Path, rm_original: bool = False):
final_out = outfile.with_name(
outfile.name.replace(".hevc.mp4", ".mp4"))
is_path_conflict = final_out.resolve() == infile.resolve()
if is_path_conflict:
logger.warning("Path conflict!")
logger.debug(f" original: {infile}")
logger.debug(f" conflict: {outfile.name} -> {final_out.name}")
if not rm_original:
logger.debug("Renaming original to avoid overwriting.")
infile.rename(infile.with_stem(infile.stem + '.bak'))
logger.info(f"Resolved output: {final_out}")
outfile.replace(final_out)
if rm_original and not is_path_conflict:
logger.debug(f"Remove original: {infile}")
infile.unlink()
def parse_args(): def parse_args():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog=itsme,
description='Encode video files to HEVC CRF 18.', description='Encode video files to HEVC CRF 18.',
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
) )
@@ -176,7 +90,8 @@ def main():
errmsg = 'No input files provided' errmsg = 'No input files provided'
if not files: if not files:
raise SystemExit(errmsg) raise SystemExit(errmsg)
files.sort(key=lambda x: x.stat().st_size) if len(files) > 1:
files.sort(key=lambda x: x.stat().st_size)
for idx, infile in enumerate(files, 1): for idx, infile in enumerate(files, 1):
logger.info(f"[{idx}/{len(files)}] {infile.name}") logger.info(f"[{idx}/{len(files)}] {infile.name}")
@@ -189,21 +104,11 @@ def main():
except (OSError, PermissionError) as e: except (OSError, PermissionError) as e:
logger.error(f"Job failed: {e}") logger.error(f"Job failed: {e}")
raise raise
final_out = outfile.with_name( isize, osize = infile.stat().st_size, outfile.stat().st_size
outfile.name.replace(".hevc.mp4", ".mp4")) delta = (osize - isize) / 1024 ** 2
if final_out.resolve() == infile.resolve(): logger.info(f"Δ: {delta:+.2f} MB"
logger.warning( f" ({format_size(isize)} -> {format_size(osize)})")
f"Filename conflict!\n i: {infile}\n o: {final_out}") resolve_output(infile, outfile, args.remove_original)
if not args.remove_original:
logger.debug("renaming original to avoid conflict.")
infile.rename(infile.with_stem(infile.stem + '.bak'))
outfile.replace(final_out)
else:
outfile.replace(final_out)
if args.remove_original:
logger.debug(f"Remove original: {infile}")
infile.unlink()
logger.debug(f"Output file: {final_out}")
if __name__ == '__main__': if __name__ == '__main__':
+123
View File
@@ -0,0 +1,123 @@
# @File : ffwrapper.py
# @Time : 2026/04/21 17:34:20
# @Author : SilverAg.L
import subprocess
from pathlib import Path
from logger import get_logger
from progress import get_progress_bar, is_tty
from utils import shorten_name
logger = get_logger('cmps_hevc_crf18')
def check_ffmpeg():
try:
subprocess.run(['ffmpeg', '-version'],
capture_output=True, check=True)
subprocess.run(['ffprobe', '-version'],
capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error("ffmpeg not found or not working. Please install ffmpeg.")
raise SystemExit(1)
def get_duration(file_path: Path):
cmd = [
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', str(file_path)
]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
try:
dur = float(result.stdout.strip())
if dur <= 0:
raise ValueError(f"Invalid duration: {dur}")
return dur
except (ValueError, TypeError) as e:
logger.error(f"Cannot determine duration of {file_path}: {e}")
raise
COMPRESS_OPTIONS = [
"-c:v", "libx265", "-crf", "18", "-preset", "medium",
"-c:a", "copy",
"-tag:v", "hvc1",
]
def start_ffmpeg(infile: Path, outfile: Path):
cmd = [
"ffmpeg", "-y", "-i", shorten_name(infile.name),
"-progress", "pipe:1", "-nostats", "-loglevel", "error",
*COMPRESS_OPTIONS, shorten_name(outfile.name, flag=2)
]
logger.debug(f"Command: {' '.join(cmd)}")
cmd[3] = str(infile)
cmd[-1] = str(outfile)
return subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
text=True,
encoding='utf-8')
def set_progress(line: str, dur, pbar):
if not pbar or not line.startswith("out_time_ms="):
return
try:
us = int(line.split('=')[1])
if us < 0:
return
current_sec = us / 1_000_000
pbar.n = min(current_sec, dur)
pbar.refresh()
except (ValueError, IndexError, OverflowError):
return
def clean_on_failure(proc: subprocess.Popen, outfile: Path):
if proc.poll() is None:
proc.kill()
proc.wait()
if outfile.exists():
try:
outfile.unlink()
except OSError:
pass
def hevc_encode(infile: Path, outfile: Path, progress: bool = True):
dur = get_duration(infile)
logger.debug(f"Duration: {dur:.2f} seconds")
proc = start_ffmpeg(infile, outfile)
pbar = get_progress_bar(
total=dur, unit='s',
bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})'
) if progress and is_tty() else None
try:
for line in iter(proc.stdout.readline, ''):
set_progress(line.strip(), dur, pbar)
if proc.poll() is not None:
break
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, "ffmpeg")
except (
KeyboardInterrupt,
subprocess.CalledProcessError
):
# Issue: cleaning may also be interrupted by CtrlC
# (especially in Windows)
clean_on_failure(proc, outfile)
raise
finally:
if pbar:
pbar.close()
proc.stdout.close()
+27
View File
@@ -0,0 +1,27 @@
# @File : utils.py
# @Time : 2026/04/21 17:24:21
# @Author : SilverAg.L
def shorten_name(filename: str, flag: int = 1):
stem, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '')
stem = stem.replace(' ', '').upper()
ext = ext[:3].upper()
bstem = stem.encode(errors='ignore')
for i in range(8, 0, -1):
try:
cut = bstem[:i].decode()
if cut != stem:
cut += f'~{flag}'
stem = cut
break
except UnicodeDecodeError:
continue
return f'{stem}.{ext}' if ext else stem
def format_size(bytes: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024:
return f"{bytes:.2f} {unit}"
bytes /= 1024
return f"{bytes:.2f} PB"