image-manager/sync.py
Snowykami 4d671a283f
Some checks failed
Sync Container Images / sync-images (push) Failing after 43s
修正限流任务的实现,确保在调用时传递信号量参数
2025-04-17 22:07:06 +08:00

92 lines
2.7 KiB
Python

import asyncio
from typing import Any, Callable, Coroutine
import aiofiles
import yaml
from pydantic import BaseModel
class Config(BaseModel):
images: list["Images"] = []
class Images(BaseModel):
source: str
target: str
tags: list[str] = []
async def run_command(command: str) -> int | None:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if stdout:
print(f"[STDOUT]\n{stdout.decode()}")
if stderr:
print(f"[STDERR]\n{stderr.decode()}")
return process.returncode
async def docker_pull(image: str) -> int | None:
print(f"Pulling image {image}...")
return await run_command(f"docker pull {image}")
async def docker_tag(source: str, target: str) -> int | None:
print(f"Tagging image {source} as {target}...")
return await run_command(f"docker tag {source} {target}")
async def docker_push(image: str) -> int | None:
print(f"Pushing image {image}...")
return await run_command(f"docker push {image}")
async def docker_task(source: str, target: str) -> int | None:
print(f"Pulling image {source}...")
if r := await docker_pull(source):
if r != 0:
return r
if r := await docker_tag(source, target):
if r != 0:
return r
if r := await docker_push(target):
if r != 0:
return r
return 0
semaphore = asyncio.Semaphore(50)
async def limited_task[T: Any](semaphore: asyncio.Semaphore, task: Callable[[], Coroutine[None, None, T]]) -> T:
async with semaphore:
return task()
async def main():
async with aiofiles.open('images.yaml', 'r') as file:
config = await file.read()
config = yaml.safe_load(config)
config = Config(**config)
print("Loaded configuration:")
print(config)
tasks = []
for image in config.images:
if len(image.tags) > 0:
for tag in image.tags:
tasks.append(limited_task(semaphore, docker_task(f"{image.target}:{tag}", f"{image.target}:{tag}")))
else:
tasks.append(limited_task(semaphore, docker_task(image.source, image.target)))
results = await asyncio.gather(*tasks)
failed_tasks = 0
for result in results:
if result is not None and result != 0:
failed_tasks += 1
print(f"{len(results)} tasks completed. {len(results) - failed_tasks} succeed, {failed_tasks} failed.")
if failed_tasks > 0:
raise Exception(f"{failed_tasks} tasks failed.")
if __name__ == "__main__":
asyncio.run(main())