import asyncio from typing import Any, Callable, Coroutine from functools import partial import aiofiles import yaml from pydantic import BaseModel class Config(BaseModel): images: list["Images"] = [] class Images(BaseModel): dockerfile: str | None = None source: str target: str tags: list[str] = [] async def run_command(command: str) -> int | None: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if stdout: print(f"[STDOUT]\n{stdout.decode()}") if stderr: print(f"[STDERR]\n{stderr.decode()}") return process.returncode async def docker_pull(image: str) -> int | None: print(f"Pulling image {image}...") return await run_command(f"docker pull {image}") async def docker_tag(source: str, target: str) -> int | None: print(f"Tagging image {source} as {target}...") return await run_command(f"docker tag {source} {target}") async def docker_push(image: str) -> int | None: print(f"Pushing image {image}...") return await run_command(f"docker push {image}") async def docker_build(dockerfile: str, target: str) -> int | None: print(f"Building image from {dockerfile}...") return await run_command(f"docker build -t {target} -f {dockerfile} .") async def docker_sync_task(source: str, target: str) -> int | None: print(f"Pulling image {source}...") if r := await docker_pull(source): if r != 0: return r if r := await docker_tag(source, target): if r != 0: return r if r := await docker_push(target): if r != 0: return r return 0 async def docker_build_task(dockerfile: str, target: str) -> int | None: print(f"Building image from {dockerfile}...") if r := await docker_build(dockerfile, target): if r != 0: return r if r := await docker_push(target): if r != 0: return r return 0 semaphore = asyncio.Semaphore(5) async def limited_task[T: Any](semaphore: asyncio.Semaphore, task: Callable[[], Coroutine[None, None, T]]) -> T: async with semaphore: return await task() async def main(): async with aiofiles.open('images.yaml', 'r') as file: config = await file.read() config = yaml.safe_load(config) config = Config(**config) print("Loaded configuration:") print(config) tasks = [] for image in config.images: if image.dockerfile: # 若有dockerfile提供,則使用docker build if len(image.tags) > 0: # 若有tags提供,則使用docker build和docker tag for tag in image.tags: tasks.append(limited_task(semaphore, partial(docker_build_task, image.dockerfile, f"{image.target}:{tag}"))) else: tasks.append(limited_task(semaphore, partial(docker_build_task, image.dockerfile, image.target))) elif len(image.tags) > 0: # 若有tags提供,則使用docker pull和docker tag for tag in image.tags: tasks.append(limited_task(semaphore, partial(docker_sync_task, f"{image.source}:{tag}", f"{image.target}:{tag}"))) else: tasks.append(limited_task(semaphore, partial(docker_sync_task, image.source, image.target))) results = await asyncio.gather(*tasks) failed_tasks = sum(1 for result in results if result is not None and result != 0) print(f"{len(results)} tasks completed. {len(results) - failed_tasks} succeed, {failed_tasks} failed.") if failed_tasks > 0: raise Exception(f"{failed_tasks} tasks failed.") if __name__ == "__main__": asyncio.run(main())