💬 更新代码注释, 删除test_chan.py的无用引用

This commit is contained in:
Nanaloveyuki 2025-05-06 13:57:45 +08:00
parent 7c638615ba
commit d28006cebd
3 changed files with 69 additions and 37 deletions

View File

@ -13,10 +13,15 @@ set_start_method("spawn", force=True)
def select(*args: Chan[T]) -> Generator[T, None, None]:
"""
当其中一个通道接收到数据时yield 该数据
When one of the channels receives data, yield that data.
当其中一个通道接收到数据时, yield 该数据
参数:
args: 多个 Chan 对象
Args:
args: Multiple Chan objects
多个 Chan 对象
Returns:
Generator[T, None, None]: A generator that yields data from the channels.
一个生成器, 用于从通道中 yield 数据
"""
# 构造管道到通道列表的映射,避免重复的 recv_conn 对象
pipe_to_chs: dict[Connection, list[Chan[T]]] = {}

View File

@ -1,5 +1,7 @@
"""
Chan is a simple channel implementation using multiprocessing.Pipe.
Chan 是一个简单的通道实现使用 multiprocessing.Pipe
"""
from multiprocessing import Pipe
from typing import TypeVar, Generic, Any
@ -9,6 +11,8 @@ T = TypeVar("T")
class NoRecvValue(Exception):
"""
Exception raised when there is no value to receive.
当没有值可以接收时引发的异常
"""
pass
@ -17,60 +21,62 @@ class Chan(Generic[T]):
"""
Chan is a simple channel implementation using multiprocessing.Pipe.
Chan 是一个简单的通道实现使用 multiprocessing.Pipe
Attributes:
send_conn: The sending end of the pipe.
管道的发送端
recv_conn: The receiving end of the pipe.
Methods:
send: Send a value to the channel.
recv: Receive a value from the channel.
close: Close the channel.
__iter__: Return the channel object.
__next__: Receive a value from the channel.
__lshift__: Send a value to the channel.
__rlshift__: Receive a value from the channel.
管道的接收端
"""
def __init__(self):
"""
Initialize the channel.
初始化通道
"""
self.send_conn, self.recv_conn = Pipe()
self.is_closed = False
def send(self, value: T):
"""
Send a value to the channel.
发送一个值到通道
Args:
value: The value to send.
要发送的值
"""
self.send_conn.send(value)
def recv(self, timeout: float | None = None) -> T | NoRecvValue:
"""Receive a value from the channel.
If the timeout is None, it will block until a value is received.
If the timeout is a positive number, it will wait for the specified time, and if no value is received, it will return None.
接收通道中的值
如果超时为None则它将阻塞直到接收到值
如果超时是正数则它将等待指定的时间如果没有接收到值则返回NoRecvValue
"""
Receive a value from the channel.
从通道接收一个值
Args:
timeout:
The maximum time to wait for a value.
等待值的最长时间
timeout: The maximum time to wait for a value.
等待值的最长时间
If None, it will block until a value is received.
如果为 None则阻塞直到接收到值
Returns:
T: The value received from the channel.
从通道接收的值
从通道接收的值
"""
if timeout is not None:
if not self.recv_conn.poll(timeout):
return NoRecvValue("No value to receive.")
return self.recv_conn.recv()
def close(self):
"""
Close the channel. destructor
Close the channel.
关闭通道
"""
self.send_conn.close()
self.recv_conn.close()
@ -78,42 +84,66 @@ class Chan(Generic[T]):
def __iter__(self) -> "Chan[T]":
"""
Returns: The channel object.
Return the channel object.
返回通道对象
"""
return self
def __next__(self) -> T | NoRecvValue:
"""
Receive a value from the channel.
从通道接收一个值
"""
return self.recv()
def __lshift__(self, other: T):
"""
chan << obj
Send an object to the channel.
发送一个对象到通道
Args:
other: The object to send.
Returns: self
要发送的对象
Returns:
self: The channel object.
通道对象
"""
self.send(other)
return self
def __rlshift__(self, other: Any) -> T | NoRecvValue:
"""
<< chan
Returns: The value received from the channel.
Receive a value from the channel.
从通道接收一个值
Returns:
T: The value received from the channel.
从通道接收的值
"""
return self.recv(None)
def __add__(self, other: "Chan[T]"):
"""
Connect 1 channel.send to another channel.recv.
Connect one channel's send to another channel's recv.
将一个通道的发送端连接到另一个通道的接收端
Args:
other:
Returns:
other: The other channel to connect.
要连接的另一个通道
"""
self.recv_conn, other.recv_conn = other.recv_conn, self.recv_conn
def __del__(self):
"""
Close the channel when the object is deleted.
当对象被删除时关闭通道
"""
self.close()

View File

@ -1,8 +1,5 @@
import time
from select import select
from magicoca.chan import Chan
from multiprocessing import Process, set_start_method
from multiprocessing import Process