🐛 fix: Channel的接收者过滤器的问题,优化重启部分

This commit is contained in:
2024-07-31 02:28:25 +08:00
parent 0fb5b84392
commit ca34f9c2a1
21 changed files with 386 additions and 329 deletions

View File

@ -10,12 +10,8 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
本模块定义了一个通用的通道类,用于进程间通信
"""
import threading
from multiprocessing import Queue
from queue import Empty, Full
from typing import Any, Awaitable, Callable, List, Optional, TypeAlias
from nonebot import logger
from multiprocessing import Pipe
from typing import Any, Optional, Callable, Awaitable, List, TypeAlias
from liteyuki.utils import is_coroutine_callable, run_coroutine
@ -29,85 +25,66 @@ FILTER_FUNC: TypeAlias = SYNC_FILTER_FUNC | ASYNC_FILTER_FUNC
class Channel:
def __init__(self, buffer_size: int = 0):
self._queue = Queue(buffer_size)
"""
通道类,用于进程间通信
有两种接收工作方式,但是只能选择一种,主动接收和被动接收,主动接收使用 `receive` 方法,被动接收使用 `on_receive` 装饰器
"""
def __init__(self):
self.parent_conn, self.child_conn = Pipe()
self._closed = False
self._on_receive_funcs: List[ON_RECEIVE_FUNC] = []
self._on_receive_funcs_with_receiver: dict[str, List[ON_RECEIVE_FUNC]] = {}
self._receiving_thread = threading.Thread(target=self._start_receiver, daemon=True)
self._receiving_thread.start()
def send(
self,
data: Any,
receiver: Optional[str] = None,
block: bool = True,
timeout: Optional[float] = None
):
def send(self, data: Any, receiver: Optional[str] = None):
"""
发送数据
Args:
data: 数据
receiver: 接收者如果为None则广播
block: 是否阻塞
timeout: 超时时间
Returns:
"""
print(f"send {data} -> {receiver}")
if self._closed:
raise RuntimeError("Cannot send to a closed channel")
try:
self._queue.put((data, receiver), block, timeout)
except Full:
logger.warning("Channel buffer is full, send operation is blocked")
self.child_conn.send((data, receiver))
def receive(
self,
receiver: str = None,
block: bool = True,
timeout: Optional[float] = None
) -> Any:
def receive(self, receiver: str = None) -> Any:
"""
接收数据
Args:
receiver: 接收者如果为None则接收任意数据
block: 是否阻塞
timeout: 超时时间
Returns:
"""
if self._closed:
raise RuntimeError("Cannot receive from a closed channel")
try:
while True:
data, data_receiver = self._queue.get(block, timeout)
if receiver is None or receiver == data_receiver:
return data
except Empty:
if not block:
return None
raise
while True:
# 判断receiver是否为None或者receiver是否等于接收者是则接收数据否则不动数据
if self.parent_conn.poll():
data, receiver = self.parent_conn.recv()
self.parent_conn.send((data, receiver))
self._run_on_receive_funcs(data, receiver)
return data
def peek(self) -> Optional[Any]:
"""
查看管道中的数据,不移除
Returns:
"""
if self._closed:
raise RuntimeError("Cannot peek from a closed channel")
if self.parent_conn.poll():
data, receiver = self.parent_conn.recv()
self.parent_conn.send((data, receiver))
return data
return None
def close(self):
"""
关闭通道
Returns:
"""
self._closed = True
self._queue.close()
while not self._queue.empty():
self._queue.get()
self.parent_conn.close()
self.child_conn.close()
def on_receive(
self,
filter_func: Optional[FILTER_FUNC] = None,
receiver: Optional[str] = None,
) -> Callable[[ON_RECEIVE_FUNC], ON_RECEIVE_FUNC]:
def on_receive(self, filter_func: Optional[FILTER_FUNC] = None, receiver: Optional[str] = None) -> Callable[[ON_RECEIVE_FUNC], ON_RECEIVE_FUNC]:
"""
接收数据并执行函数
Args:
@ -138,22 +115,11 @@ class Channel:
return decorator
def _start_receiver(self):
"""
使用多线程启动接收循环,在通道实例化时自动启动
Returns:
"""
while True:
data, receiver = self._queue.get(block=True, timeout=None)
self._run_on_receive_funcs(data, receiver)
def _run_on_receive_funcs(self, data: Any, receiver: Optional[str] = None):
"""
运行接收函数
Args:
data: 数据
Returns:
"""
if receiver is None:
for func in self._on_receive_funcs:
@ -165,8 +131,8 @@ class Channel:
def __iter__(self):
return self
def __next__(self, timeout: Optional[float] = None) -> Any:
return self.receive(block=True, timeout=timeout)
def __next__(self) -> Any:
return self.receive()
"""默认通道实例,可直接从模块导入使用"""