Compare commits

...

3 Commits
v1.0.8 ... main

Author SHA1 Message Date
Nanaloveyuki
8f9139d8cc 📝 更新并完善文档内容 2025-05-06 14:23:25 +08:00
Nanaloveyuki
d28006cebd 💬 更新代码注释, 删除test_chan.py的无用引用 2025-05-06 13:57:45 +08:00
7c638615ba 优化select函数,减少管道重复映射,改进recv方法返回值类型 2025-02-18 06:33:33 +08:00
6 changed files with 241 additions and 92 deletions

134
README.md
View File

@ -1,27 +1,139 @@
# magicoca
A communication library for Python
# Magicoca
![Logo](./logo.png)
## 支持的通信方式
- 进程通信
## :art: 简介
Magicoca是一个基于MIT协议的简易Python进程通信库提供了一种简单的方式来实现进程间通信。
## 稀奇古怪的语法
## :rocket: 安装
使用pip安装
```bash
pip install magicoca
```
或是使用源码安装(不推荐)
1. 将仓库clone到本地
> 你需要首先安装[Git](https://git-scm.com/)
```bash
git clone https://github.com/LiteyukiStudio/magicoca
```
> 如果你的网络环境受到限制或影响, 也可以尝试使用Liteyuki Gitea镜像仓库:
> ```bash
> https://git.liteyuki.icu/bot/magicoca
> ```
2. 进入仓库目录(取决于你的文件夹路径)
```bash
cd magicoca
```
3. 使用pip本地安装
```bash
pip install -e .
```
## :hammer: 使用
```python
from magicoca.chan import Chan
# 创建一个通道
ch = Chan()
# 发送消息
ch << "hello"
# 接受消息
msg = str << ch
# 通道关闭
ch.close()
```
## :book: 文档
### 通道
通道是一个用于进程间通信的对象。
如:
```python
from magicoca.chan import Chan
ch = Chan()
```
这里的`ch`就是一个通道。
# 发送消息
### 发送消息
使用`<<`操作符可以将消息发送到通道。
如:
```python
ch << "hello"
```
这里的`"hello"`就是一个消息。
# 接受消息
当然你也可以发送其他类型的消息,比如数字、列表等。
### 接受消息
使用`>>`操作符可以从通道中接受消息。
如:
```python
msg = str << ch
```
这里的`msg`就是一个消息。
# 通道关闭
当然你也可以接受其他类型的消息,比如数字、列表等。
### 通道关闭
使用`close`方法可以关闭通道。
如:
```python
ch.close()
```
当通道关闭后,你就不能再向通道中发送消息了。
# 可跨进程通信
```
### 通道状态
你可以使用`is_closed`方法来判断通道是否关闭。
如:
```python
if ch.is_closed():
print("通道已关闭")
else:
print("通道未关闭")
```
或者基于该原理, 你可以使用`if not ch.is_closed():`来判断通道是否未关闭来间接性发送消息并随时关闭通道以避免出现可能的错误。
### 通道容量
实际上Magicoca的通道是没有容量限制的你可以发送任意数量的消息。
当然, 前提是你的机器内存足够大。
### 通道类型
通道的类型是由你自己定义的。
如:
```python
from magicoca.chan import Chan
# 创建一个通道, 类型为str
ch = Chan(str)
```
如果不指定通道类型, 那么通道的类型将默认为`Any`
### 通道超时
通道的超时是指在接受消息时, 如果没有消息, 那么就会等待一段时间, 直到有消息或者超时。
如:
```python
from magicoca.chan import Chan
# 创建一个通道, 类型为str, 超时时间为1秒
ch = Chan(str, timeout=1)
```
如果不指定超时时间, 那么通道的超时时间将默认为`None`
即无限等待。
### 通道支持
支持线程间通信。

BIN
logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 270 KiB

View File

@ -1,6 +1,6 @@
from multiprocessing import set_start_method
from multiprocessing.connection import wait
from typing import Any, Callable, Generator
from multiprocessing.connection import wait, Connection
from typing import Generator
from magicoca.chan import Chan, T, NoRecvValue
@ -13,18 +13,34 @@ set_start_method("spawn", force=True)
def select(*args: Chan[T]) -> Generator[T, None, None]:
"""
Return a yield, when a value is received from one of the channels.
When one of the channels receives data, yield that data.
当其中一个通道接收到数据时, yield 该数据
Args:
args: channels
args: Multiple Chan objects
多个 Chan 对象
Returns:
Generator[T, None, None]: A generator that yields data from the channels.
一个生成器, 用于从通道中 yield 数据
"""
pipes = [ch.recv_conn for ch in args if not ch.is_closed]
# 构造管道到通道列表的映射,避免重复的 recv_conn 对象
pipe_to_chs: dict[Connection, list[Chan[T]]] = {}
for ch in args:
if not ch.is_closed:
pipe: Connection = ch.recv_conn
pipe_to_chs.setdefault(pipe, []).append(ch)
pipes: list[Connection] = list(pipe_to_chs.keys())
while pipes:
ready_pipes = wait(pipes)
ready_pipes: list[Connection] = wait(pipes) # type: ignore
for pipe in ready_pipes:
for ch in args:
if ch.recv_conn == pipe:
if not isinstance(value := ch.recv(0), NoRecvValue):
yield value
if ch.is_closed:
pipes.remove(pipe)
# 遍历所有使用该管道的通道
channels: list[Chan[T]] = list(pipe_to_chs.get(pipe, []))
for ch in channels:
if not isinstance(value := ch.recv(0), NoRecvValue):
yield value
if ch.is_closed:
pipe_to_chs[pipe].remove(ch)
# 如果该管道已没有活跃的通道,则移除
if not pipe_to_chs[pipe]:
pipes.remove(pipe)

View File

@ -1,5 +1,7 @@
"""
Chan is a simple channel implementation using multiprocessing.Pipe.
Chan 是一个简单的通道实现使用 multiprocessing.Pipe
"""
from multiprocessing import Pipe
from typing import TypeVar, Generic, Any
@ -9,6 +11,8 @@ T = TypeVar("T")
class NoRecvValue(Exception):
"""
Exception raised when there is no value to receive.
当没有值可以接收时引发的异常
"""
pass
@ -17,60 +21,62 @@ class Chan(Generic[T]):
"""
Chan is a simple channel implementation using multiprocessing.Pipe.
Chan 是一个简单的通道实现使用 multiprocessing.Pipe
Attributes:
send_conn: The sending end of the pipe.
管道的发送端
recv_conn: The receiving end of the pipe.
Methods:
send: Send a value to the channel.
recv: Receive a value from the channel.
close: Close the channel.
__iter__: Return the channel object.
__next__: Receive a value from the channel.
__lshift__: Send a value to the channel.
__rlshift__: Receive a value from the channel.
管道的接收端
"""
def __init__(self):
"""
Initialize the channel.
初始化通道
"""
self.send_conn, self.recv_conn = Pipe()
self.is_closed = False
def send(self, value: T):
"""
Send a value to the channel.
发送一个值到通道
Args:
value: The value to send.
要发送的值
"""
self.send_conn.send(value)
def recv(self, timeout: float | None = None) -> T | None | NoRecvValue:
"""Receive a value from the channel.
If the timeout is None, it will block until a value is received.
If the timeout is a positive number, it will wait for the specified time, and if no value is received, it will return None.
接收通道中的值
如果超时为None则它将阻塞直到接收到值
如果超时是正数则它将等待指定的时间如果没有接收到值则返回None
def recv(self, timeout: float | None = None) -> T | NoRecvValue:
"""
Receive a value from the channel.
从通道接收一个值
Args:
timeout:
The maximum time to wait for a value.
等待值的最长时间
timeout: The maximum time to wait for a value.
等待值的最长时间
If None, it will block until a value is received.
如果为 None则阻塞直到接收到值
Returns:
T: The value received from the channel.
从通道接收的值
从通道接收的值
"""
if timeout is not None:
if not self.recv_conn.poll(timeout):
return NoRecvValue("No value to receive.")
return self.recv_conn.recv()
def close(self):
"""
Close the channel. destructor
Close the channel.
关闭通道
"""
self.send_conn.close()
self.recv_conn.close()
@ -78,42 +84,66 @@ class Chan(Generic[T]):
def __iter__(self) -> "Chan[T]":
"""
Returns: The channel object.
Return the channel object.
返回通道对象
"""
return self
def __next__(self) -> T:
def __next__(self) -> T | NoRecvValue:
"""
Receive a value from the channel.
从通道接收一个值
"""
return self.recv()
def __lshift__(self, other: T):
"""
chan << obj
Send an object to the channel.
发送一个对象到通道
Args:
other: The object to send.
Returns: self
要发送的对象
Returns:
self: The channel object.
通道对象
"""
self.send(other)
return self
def __rlshift__(self, other: Any) -> T:
def __rlshift__(self, other: Any) -> T | NoRecvValue:
"""
<< chan
Returns: The value received from the channel.
Receive a value from the channel.
从通道接收一个值
Returns:
T: The value received from the channel.
从通道接收的值
"""
return self.recv(None)
def __add__(self, other: "Chan[T]"):
"""
Connect 1 channel.send to another channel.recv.
Connect one channel's send to another channel's recv.
将一个通道的发送端连接到另一个通道的接收端
Args:
other:
Returns:
other: The other channel to connect.
要连接的另一个通道
"""
self.recv_conn, other.recv_conn = other.recv_conn, self.recv_conn
def __del__(self):
"""
Close the channel when the object is deleted.
当对象被删除时关闭通道
"""
self.close()

View File

@ -1,8 +1,5 @@
import time
from select import select
from magicoca.chan import Chan
from multiprocessing import Process, set_start_method
from multiprocessing import Process

View File

@ -4,41 +4,35 @@ from multiprocessing import Process
from magicoca import Chan, select
def sp1(chan: Chan[int]):
for i in range(10):
chan << i << i * 2
def sp2(chan: Chan[int]):
for i in range(10):
chan << i << i * 3
def rp(chans: list[Chan[int]]):
rl = []
for t in select(*chans):
rl.append(t)
if len(rl) == 40:
break
print(rl)
def send_process(chan: Chan[int], _id: int):
while True:
chan << _id
time.sleep(2)
for i in range(10):
chan << i
time.sleep(0.1 * _id)
def recv_process(chan_list: list[Chan[int]]):
c = []
for t in select(*chan_list):
print(t)
c.append(t)
print("Select", t)
if len(c) == 30:
break
class TestSelect:
def test_select(self):
chan_list = []
for i in range(10):
chan = Chan[int]()
chan_list.append(chan)
p = Process(target=send_process, args=(chan, i))
p.start()
p = Process(target=recv_process, args=(chan_list,))
p.start()
ch1 = Chan[int]()
ch2 = Chan[int]()
ch3 = Chan[int]()
p1 = Process(target=send_process, args=(ch1, 1))
p2 = Process(target=send_process, args=(ch2, 2))
p3 = Process(target=send_process, args=(ch3, 3))
p4 = Process(target=recv_process, args=([ch1, ch2, ch3],))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()