mirror of
https://github.com/LiteyukiStudio/LiteyukiBot.git
synced 2025-07-28 13:50:58 +00:00
🐛 hotfix: from mypy import
This commit is contained in:
@ -225,17 +225,14 @@ class ProcessManager:
|
||||
|
||||
|
||||
class _SubProcessManager:
|
||||
"""
|
||||
子进程管理器
|
||||
若要子进程间通信,请先在子进程A中发送通信事件给主进程,包含当前进程信息及上下文信息,主进程再将信息发送给子进程B,子进程B再根据信息进行操作
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.processes: dict[str, SubProcess] = {}
|
||||
|
||||
def new_process(
|
||||
self, name: str, *args, **kwargs
|
||||
) -> Callable[[ProcessFuncType], None]:
|
||||
def decorator(func: ProcessFuncType):
|
||||
self.processes[name] = SubProcess(name, func, *args, **kwargs)
|
||||
|
||||
return decorator
|
||||
|
||||
def add(self, name: str, func: ProcessFuncType, *args, **kwargs):
|
||||
"""
|
||||
@ -268,5 +265,33 @@ class _SubProcessManager:
|
||||
process.start()
|
||||
logger.debug(f"Starting process {name}")
|
||||
|
||||
def terminate(self, name: str):
|
||||
"""
|
||||
终止指定子进程
|
||||
Args:
|
||||
name: 子进程名称
|
||||
Returns:
|
||||
"""
|
||||
if name not in self.processes:
|
||||
raise KeyError(f"Process {name} not found.")
|
||||
self.processes[name].terminate()
|
||||
|
||||
def terminate_all(self):
|
||||
"""
|
||||
终止所有子进程
|
||||
"""
|
||||
for name, process in self.processes.items():
|
||||
process.terminate()
|
||||
logger.debug(f"Terminating process {name}")
|
||||
|
||||
def get_process(self, name: str) -> SubProcess | None:
|
||||
"""
|
||||
获取指定子进程
|
||||
Args:
|
||||
name: 子进程名称
|
||||
Returns:
|
||||
"""
|
||||
return self.processes.get(name, None)
|
||||
|
||||
|
||||
sub_process_manager = _SubProcessManager()
|
||||
|
Reference in New Issue
Block a user