211 lines
6.8 KiB
Python
Executable File
211 lines
6.8 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import argparse
|
|
import subprocess
|
|
|
|
from sys import argv
|
|
from pathlib import Path
|
|
|
|
from logger import get_logger
|
|
from progress import get_progress_bar, is_tty
|
|
|
|
COMPRESS_OPTIONS = [
|
|
"-c:v", "libx265", "-crf", "18", "-preset", "medium",
|
|
"-c:a", "copy",
|
|
"-tag:v", "hvc1",
|
|
]
|
|
|
|
itsme = Path(argv[0]).name
|
|
logger = get_logger(itsme)
|
|
|
|
|
|
def check_ffmpeg():
|
|
try:
|
|
subprocess.run(['ffmpeg', '-version'],
|
|
capture_output=True, check=True)
|
|
subprocess.run(['ffprobe', '-version'],
|
|
capture_output=True, check=True)
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
logger.error("ffmpeg not found or not working. Please install ffmpeg.")
|
|
raise SystemExit(1)
|
|
|
|
|
|
def get_duration(file_path: Path):
|
|
cmd = [
|
|
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
|
|
'-of', 'default=noprint_wrappers=1:nokey=1', str(file_path)
|
|
]
|
|
result = subprocess.run(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
|
try:
|
|
dur = float(result.stdout.strip())
|
|
if dur <= 0:
|
|
raise ValueError(f"Invalid duration: {dur}")
|
|
return dur
|
|
except (ValueError, TypeError) as e:
|
|
logger.error(f"Cannot determine duration of {file_path}: {e}")
|
|
raise
|
|
|
|
|
|
def read_progress(ostream_line, dur, pbar):
|
|
if not ostream_line.startswith("out_time_ms="):
|
|
return
|
|
try:
|
|
us = int(ostream_line.split('=')[1])
|
|
if us < 0:
|
|
return
|
|
secs = us / 1_000_000
|
|
if secs > dur * 1.1: # Allow 10% overshoot
|
|
pbar.n = dur
|
|
else:
|
|
pbar.n = secs
|
|
pbar.refresh()
|
|
except (ValueError, OverflowError):
|
|
pass
|
|
|
|
|
|
def hevc_encode(infile: Path, outfile: Path, progress: bool = True):
|
|
# too heavy bro.
|
|
dur = get_duration(infile)
|
|
cmd = [
|
|
"ffmpeg", "-y", "-i", str(infile),
|
|
"-progress", "pipe:1", "-nostats",
|
|
*COMPRESS_OPTIONS, str(outfile)
|
|
]
|
|
logger.debug(f"Duration: {dur:.2f} seconds")
|
|
logger.debug(f"Command: {' '.join(cmd)}")
|
|
proc = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True, encoding='utf-8', bufsize=1)
|
|
progress = progress and is_tty()
|
|
pbar = None if not progress else get_progress_bar(
|
|
total=dur, unit='s',
|
|
bar_format='{l_bar}{bar}| {n:.2f}/{total:.2f}({unit})')
|
|
try:
|
|
while True:
|
|
line = proc.stdout.readline()
|
|
if not line and proc.poll() is not None:
|
|
break
|
|
if not pbar:
|
|
continue
|
|
read_progress(line.strip(), dur, pbar)
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
raise subprocess.CalledProcessError(proc.returncode, cmd)
|
|
except (
|
|
KeyboardInterrupt,
|
|
subprocess.CalledProcessError
|
|
) as e:
|
|
if proc.poll() is None:
|
|
proc.kill()
|
|
proc.wait()
|
|
logger.error(
|
|
f"Encoding failed with code {proc.returncode}:"
|
|
f" {e.__class__.__name__}")
|
|
if outfile.exists():
|
|
try:
|
|
outfile.unlink()
|
|
except OSError:
|
|
pass
|
|
raise
|
|
finally:
|
|
if pbar:
|
|
pbar.close()
|
|
|
|
|
|
def build_file_list(srcdir, extensions):
|
|
exts = {ext.lower().lstrip('.') for ext in extensions}
|
|
return [
|
|
path.absolute() for path in Path(srcdir).rglob('*')
|
|
if path.is_file() and path.suffix.lower().lstrip('.') in exts
|
|
]
|
|
|
|
|
|
def make_output_path(infile: Path, outdir: Path = None, relroot: str = None):
|
|
if not outdir:
|
|
return infile.with_suffix('.hevc.mp4')
|
|
outdir = Path(outdir)
|
|
if not relroot:
|
|
return outdir / infile.with_suffix('.hevc.mp4').name
|
|
rel_path = infile.relative_to(
|
|
Path(relroot).absolute()).with_suffix('.hevc.mp4')
|
|
return outdir / rel_path
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
prog=itsme,
|
|
description='Encode video files to HEVC CRF 18.',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument('-d', '--dir', dest='srcdir',
|
|
help='Source directory')
|
|
parser.add_argument('-o', '--outdir', dest='outdir',
|
|
help='Output directory')
|
|
parser.add_argument('-f', '--ext', dest='extensions', action='append',
|
|
default=[],
|
|
help='Extensions to search for. Can be repeated.')
|
|
parser.add_argument('-r', '--rm-original', dest='remove_original',
|
|
action='store_true',
|
|
help='Remove original after encoding')
|
|
parser.add_argument('-s', '--silent', dest='silent',
|
|
action='store_true',
|
|
help='Keep silent (no progress bar)')
|
|
parser.add_argument('files', nargs='*',
|
|
help='Input files (with no -d/--dir)')
|
|
|
|
args = parser.parse_args()
|
|
if args.srcdir and args.files:
|
|
parser.error('cannot mix -d/--dir with input files')
|
|
if not args.srcdir and not args.files:
|
|
parser.error('either -d/--dir or input files must be provided')
|
|
if args.srcdir and not args.extensions:
|
|
args.extensions = ['mp4']
|
|
return args
|
|
|
|
|
|
def main():
|
|
check_ffmpeg()
|
|
args = parse_args()
|
|
|
|
if args.srcdir:
|
|
files = build_file_list(args.srcdir, args.extensions)
|
|
errmsg = (f'No files found in {args.srcdir} '
|
|
f'with extensions: {args.extensions}')
|
|
else:
|
|
files = [Path(f).absolute() for f in args.files if Path(f).is_file()]
|
|
errmsg = 'No input files provided'
|
|
if not files:
|
|
raise SystemExit(errmsg)
|
|
files.sort(key=lambda x: x.stat().st_size)
|
|
|
|
for idx, infile in enumerate(files, 1):
|
|
logger.info(f"[{idx}/{len(files)}] {infile.name}")
|
|
|
|
outfile = make_output_path(infile, args.outdir, args.srcdir)
|
|
outfile.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
outfile.touch(0o644)
|
|
hevc_encode(infile, outfile, not args.silent)
|
|
except (OSError, PermissionError) as e:
|
|
logger.error(f"Job failed: {e}")
|
|
raise
|
|
final_out = outfile.with_name(
|
|
outfile.name.replace(".hevc.mp4", ".mp4"))
|
|
if final_out.resolve() == infile.resolve():
|
|
logger.warning(
|
|
f"Filename conflict!\n i: {infile}\n o: {final_out}")
|
|
if not args.remove_original:
|
|
logger.debug("renaming original to avoid conflict.")
|
|
infile.rename(infile.with_stem(infile.stem + '.bak'))
|
|
outfile.replace(final_out)
|
|
else:
|
|
outfile.replace(final_out)
|
|
if args.remove_original:
|
|
logger.debug(f"Remove original: {infile}")
|
|
infile.unlink()
|
|
logger.debug(f"Output file: {final_out}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|