1
0
forked from bot/app

📝 文档初步大迁移 vuepress -> vitepress

This commit is contained in:
2024-08-31 19:51:34 +08:00
parent 2f87b06c83
commit 9aade6599c
7 changed files with 103 additions and 141 deletions

View File

@ -9,18 +9,18 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Software: PyCharm
"""
import asyncio
from typing import Any, Awaitable, Callable, TypeAlias
from typing import Any, Awaitable, Callable, TypeAlias, Sequence
from liteyuki.log import logger
from liteyuki.utils import is_coroutine_callable, async_wrapper
SYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Any]
ASYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Awaitable[Any]]
LIFESPAN_FUNC: TypeAlias = SYNC_LIFESPAN_FUNC | ASYNC_LIFESPAN_FUNC
SYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Any] # 同步生命周期函数
ASYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Awaitable[Any]] # 异步生命周期函数
LIFESPAN_FUNC: TypeAlias = SYNC_LIFESPAN_FUNC | ASYNC_LIFESPAN_FUNC # 生命周期函数
SYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Any]
ASYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Awaitable[Any]]
PROCESS_LIFESPAN_FUNC: TypeAlias = SYNC_PROCESS_LIFESPAN_FUNC | ASYNC_PROCESS_LIFESPAN_FUNC
SYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Any] # 同步进程生命周期函数
ASYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Awaitable[Any]] # 异步进程生命周期函数
PROCESS_LIFESPAN_FUNC: TypeAlias = SYNC_PROCESS_LIFESPAN_FUNC | ASYNC_PROCESS_LIFESPAN_FUNC # 进程函数
class Lifespan:
@ -33,21 +33,20 @@ class Lifespan:
self._before_start_funcs: list[LIFESPAN_FUNC] = []
self._after_start_funcs: list[LIFESPAN_FUNC] = []
self._before_process_shutdown_funcs: list[LIFESPAN_FUNC] = []
self._before_process_shutdown_funcs: list[PROCESS_LIFESPAN_FUNC] = []
self._after_shutdown_funcs: list[LIFESPAN_FUNC] = []
self._before_process_restart_funcs: list[LIFESPAN_FUNC] = []
self._before_process_restart_funcs: list[PROCESS_LIFESPAN_FUNC] = []
self._after_restart_funcs: list[LIFESPAN_FUNC] = []
@staticmethod
async def run_funcs(funcs: list[ASYNC_LIFESPAN_FUNC | PROCESS_LIFESPAN_FUNC], *args, **kwargs) -> None:
async def run_funcs(funcs: Sequence[LIFESPAN_FUNC | PROCESS_LIFESPAN_FUNC], *args, **kwargs) -> None:
"""
并发运行异步函数
Args:
funcs:
funcs ([`Sequence`](https%3A//docs.python.org/3/library/typing.html#typing.Sequence)[[`ASYNC_LIFESPAN_FUNC`](#var-lifespan-func) | [`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)]): 函数列表
Returns:
"""
loop = asyncio.get_running_loop()
tasks = [func(*args, **kwargs) if is_coroutine_callable(func) else async_wrapper(func)(*args, **kwargs) for func in funcs]
await asyncio.gather(*tasks)
@ -55,9 +54,9 @@ class Lifespan:
"""
注册启动时的函数
Args:
func:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
LIFESPAN_FUNC:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._before_start_funcs.append(func)
return func
@ -66,20 +65,20 @@ class Lifespan:
"""
注册启动时的函数
Args:
func:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
LIFESPAN_FUNC:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._after_start_funcs.append(func)
return func
def on_before_process_shutdown(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
def on_before_process_shutdown(self, func: PROCESS_LIFESPAN_FUNC) -> PROCESS_LIFESPAN_FUNC:
"""
注册停止前的函数
注册进程停止前的函数
Args:
func:
func ([`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)): 进程生命周期函数
Returns:
LIFESPAN_FUNC:
[`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func): 进程生命周期函数
"""
self._before_process_shutdown_funcs.append(func)
return func
@ -88,22 +87,20 @@ class Lifespan:
"""
注册停止后的函数
Args:
func:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
LIFESPAN_FUNC:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._after_shutdown_funcs.append(func)
return func
def on_before_process_restart(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
def on_before_process_restart(self, func: PROCESS_LIFESPAN_FUNC) -> PROCESS_LIFESPAN_FUNC:
"""
注册重启的函数
注册进程重启的函数
Args:
func:
func ([`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)): 进程生命周期函数
Returns:
LIFESPAN_FUNC:
[`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func): 进程生命周期函数
"""
self._before_process_restart_funcs.append(func)
return func
@ -112,58 +109,51 @@ class Lifespan:
"""
注册重启后的函数
Args:
func:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
LIFESPAN_FUNC:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._after_restart_funcs.append(func)
return func
async def before_start(self) -> None:
"""
启动前
Returns:
启动前钩子
"""
logger.debug("Running before_start functions")
await self.run_funcs(self._before_start_funcs)
async def after_start(self) -> None:
"""
启动后
Returns:
启动后钩子
"""
logger.debug("Running after_start functions")
await self.run_funcs(self._after_start_funcs)
async def before_process_shutdown(self) -> None:
"""
停止前
Returns:
停止前钩子
"""
logger.debug("Running before_shutdown functions")
await self.run_funcs(self._before_process_shutdown_funcs)
async def after_shutdown(self) -> None:
"""
停止后
Returns:
停止后钩子 未实现
"""
logger.debug("Running after_shutdown functions")
await self.run_funcs(self._after_shutdown_funcs)
async def before_process_restart(self) -> None:
"""
重启前
Returns:
重启前钩子
"""
logger.debug("Running before_restart functions")
await self.run_funcs(self._before_process_restart_funcs)
async def after_restart(self) -> None:
"""
重启后
Returns:
重启后钩子 未实现
"""
logger.debug("Running after_restart functions")
await self.run_funcs(self._after_restart_funcs)