import asyncio import aiofiles import yaml from pydantic import BaseModel, field_validator class Config(BaseModel): images: list["Images"] = [] class Images(BaseModel): source: str target: str tags: list[str] = [] async def run_command(command: str) -> int | None: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if stdout: print(f"[STDOUT]\n{stdout.decode()}") if stderr: print(f"[STDERR]\n{stderr.decode()}") return process.returncode async def docker_pull(image: str) -> int | None: print(f"Pulling image {image}...") return await run_command(f"docker pull {image}") async def docker_tag(source: str, target: str): print(f"Tagging image {source} as {target}...") return await run_command(f"docker tag {source} {target}") async def docker_push(image: str): print(f"Pushing image {image}...") return await run_command(f"docker push {image}") async def main(): # Load the YAML file async with aiofiles.open('images.yaml', 'r') as file: config = yaml.safe_load(file) config = Config(**config) # Print the loaded configuration print("Loaded configuration:") print(config) tasks = [] for image in config.images: if len(image.tags) > 0: for tag in image.tags: async def task(): await docker_pull(f"{image.source}:{tag}") await docker_tag(f"{image.source}:{tag}", f"{image.target}:{tag}") await docker_push(f"{image.target}:{tag}") tasks.append(task()) else: async def task(): await docker_pull(image.source) await docker_tag(image.source, image.target) await docker_push(image.target) tasks.append(task()) # Run all tasks concurrently await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main())