mirror of
https://github.com/LiteyukiStudio/magicoca.git
synced 2025-06-02 18:25:26 +00:00
Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
8f9139d8cc | ||
|
d28006cebd | ||
7c638615ba |
134
README.md
134
README.md
@ -1,27 +1,139 @@
|
||||
# magicoca
|
||||
A communication library for Python
|
||||
# Magicoca
|
||||

|
||||
|
||||
## 支持的通信方式
|
||||
- 进程通信
|
||||
## :art: 简介
|
||||
Magicoca是一个基于MIT协议的简易Python进程通信库,提供了一种简单的方式来实现进程间通信。
|
||||
|
||||
## 稀奇古怪的语法
|
||||
## :rocket: 安装
|
||||
使用pip安装:
|
||||
```bash
|
||||
pip install magicoca
|
||||
```
|
||||
|
||||
或是使用源码安装(不推荐):
|
||||
|
||||
1. 将仓库clone到本地
|
||||
> 你需要首先安装[Git](https://git-scm.com/)
|
||||
|
||||
```bash
|
||||
git clone https://github.com/LiteyukiStudio/magicoca
|
||||
```
|
||||
|
||||
> 如果你的网络环境受到限制或影响, 也可以尝试使用Liteyuki Gitea镜像仓库:
|
||||
> ```bash
|
||||
> https://git.liteyuki.icu/bot/magicoca
|
||||
> ```
|
||||
|
||||
|
||||
2. 进入仓库目录(取决于你的文件夹路径)
|
||||
|
||||
```bash
|
||||
cd magicoca
|
||||
```
|
||||
|
||||
3. 使用pip本地安装
|
||||
```bash
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
## :hammer: 使用
|
||||
```python
|
||||
from magicoca.chan import Chan
|
||||
|
||||
# 创建一个通道
|
||||
ch = Chan()
|
||||
|
||||
# 发送消息
|
||||
ch << "hello"
|
||||
|
||||
# 接受消息
|
||||
msg = str << ch
|
||||
|
||||
# 通道关闭
|
||||
ch.close()
|
||||
```
|
||||
|
||||
## :book: 文档
|
||||
|
||||
### 通道
|
||||
通道是一个用于进程间通信的对象。
|
||||
如:
|
||||
```python
|
||||
from magicoca.chan import Chan
|
||||
|
||||
ch = Chan()
|
||||
```
|
||||
这里的`ch`就是一个通道。
|
||||
|
||||
# 发送消息
|
||||
|
||||
### 发送消息
|
||||
使用`<<`操作符可以将消息发送到通道。
|
||||
如:
|
||||
```python
|
||||
ch << "hello"
|
||||
```
|
||||
这里的`"hello"`就是一个消息。
|
||||
|
||||
# 接受消息
|
||||
当然你也可以发送其他类型的消息,比如数字、列表等。
|
||||
|
||||
### 接受消息
|
||||
使用`>>`操作符可以从通道中接受消息。
|
||||
如:
|
||||
```python
|
||||
msg = str << ch
|
||||
```
|
||||
这里的`msg`就是一个消息。
|
||||
|
||||
# 通道关闭
|
||||
当然你也可以接受其他类型的消息,比如数字、列表等。
|
||||
|
||||
### 通道关闭
|
||||
使用`close`方法可以关闭通道。
|
||||
如:
|
||||
```python
|
||||
ch.close()
|
||||
```
|
||||
当通道关闭后,你就不能再向通道中发送消息了。
|
||||
|
||||
# 可跨进程通信
|
||||
```
|
||||
### 通道状态
|
||||
你可以使用`is_closed`方法来判断通道是否关闭。
|
||||
如:
|
||||
```python
|
||||
if ch.is_closed():
|
||||
print("通道已关闭")
|
||||
else:
|
||||
print("通道未关闭")
|
||||
```
|
||||
|
||||
或者基于该原理, 你可以使用`if not ch.is_closed():`来判断通道是否未关闭来间接性发送消息并随时关闭通道以避免出现可能的错误。
|
||||
|
||||
### 通道容量
|
||||
实际上Magicoca的通道是没有容量限制的,你可以发送任意数量的消息。
|
||||
当然, 前提是你的机器内存足够大。
|
||||
|
||||
### 通道类型
|
||||
通道的类型是由你自己定义的。
|
||||
如:
|
||||
```python
|
||||
from magicoca.chan import Chan
|
||||
|
||||
# 创建一个通道, 类型为str
|
||||
ch = Chan(str)
|
||||
```
|
||||
|
||||
如果不指定通道类型, 那么通道的类型将默认为`Any`。
|
||||
|
||||
### 通道超时
|
||||
通道的超时是指在接受消息时, 如果没有消息, 那么就会等待一段时间, 直到有消息或者超时。
|
||||
如:
|
||||
```python
|
||||
from magicoca.chan import Chan
|
||||
|
||||
# 创建一个通道, 类型为str, 超时时间为1秒
|
||||
ch = Chan(str, timeout=1)
|
||||
```
|
||||
|
||||
如果不指定超时时间, 那么通道的超时时间将默认为`None`。
|
||||
|
||||
即无限等待。
|
||||
|
||||
### 通道支持
|
||||
支持线程间通信。
|
||||
|
@ -1,6 +1,6 @@
|
||||
from multiprocessing import set_start_method
|
||||
from multiprocessing.connection import wait
|
||||
from typing import Any, Callable, Generator
|
||||
from multiprocessing.connection import wait, Connection
|
||||
from typing import Generator
|
||||
|
||||
from magicoca.chan import Chan, T, NoRecvValue
|
||||
|
||||
@ -13,18 +13,34 @@ set_start_method("spawn", force=True)
|
||||
|
||||
def select(*args: Chan[T]) -> Generator[T, None, None]:
|
||||
"""
|
||||
Return a yield, when a value is received from one of the channels.
|
||||
When one of the channels receives data, yield that data.
|
||||
当其中一个通道接收到数据时, yield 该数据
|
||||
|
||||
Args:
|
||||
args: channels
|
||||
args: Multiple Chan objects
|
||||
多个 Chan 对象
|
||||
Returns:
|
||||
Generator[T, None, None]: A generator that yields data from the channels.
|
||||
一个生成器, 用于从通道中 yield 数据
|
||||
"""
|
||||
pipes = [ch.recv_conn for ch in args if not ch.is_closed]
|
||||
# 构造管道到通道列表的映射,避免重复的 recv_conn 对象
|
||||
pipe_to_chs: dict[Connection, list[Chan[T]]] = {}
|
||||
for ch in args:
|
||||
if not ch.is_closed:
|
||||
pipe: Connection = ch.recv_conn
|
||||
pipe_to_chs.setdefault(pipe, []).append(ch)
|
||||
pipes: list[Connection] = list(pipe_to_chs.keys())
|
||||
|
||||
while pipes:
|
||||
ready_pipes = wait(pipes)
|
||||
ready_pipes: list[Connection] = wait(pipes) # type: ignore
|
||||
for pipe in ready_pipes:
|
||||
for ch in args:
|
||||
if ch.recv_conn == pipe:
|
||||
if not isinstance(value := ch.recv(0), NoRecvValue):
|
||||
yield value
|
||||
if ch.is_closed:
|
||||
pipes.remove(pipe)
|
||||
# 遍历所有使用该管道的通道
|
||||
channels: list[Chan[T]] = list(pipe_to_chs.get(pipe, []))
|
||||
for ch in channels:
|
||||
if not isinstance(value := ch.recv(0), NoRecvValue):
|
||||
yield value
|
||||
if ch.is_closed:
|
||||
pipe_to_chs[pipe].remove(ch)
|
||||
# 如果该管道已没有活跃的通道,则移除
|
||||
if not pipe_to_chs[pipe]:
|
||||
pipes.remove(pipe)
|
@ -1,5 +1,7 @@
|
||||
"""
|
||||
Chan is a simple channel implementation using multiprocessing.Pipe.
|
||||
|
||||
Chan 是一个简单的通道实现,使用 multiprocessing.Pipe。
|
||||
"""
|
||||
from multiprocessing import Pipe
|
||||
from typing import TypeVar, Generic, Any
|
||||
@ -9,6 +11,8 @@ T = TypeVar("T")
|
||||
class NoRecvValue(Exception):
|
||||
"""
|
||||
Exception raised when there is no value to receive.
|
||||
|
||||
当没有值可以接收时引发的异常。
|
||||
"""
|
||||
pass
|
||||
|
||||
@ -17,60 +21,62 @@ class Chan(Generic[T]):
|
||||
"""
|
||||
Chan is a simple channel implementation using multiprocessing.Pipe.
|
||||
|
||||
Chan 是一个简单的通道实现,使用 multiprocessing.Pipe。
|
||||
|
||||
Attributes:
|
||||
send_conn: The sending end of the pipe.
|
||||
管道的发送端。
|
||||
recv_conn: The receiving end of the pipe.
|
||||
|
||||
Methods:
|
||||
send: Send a value to the channel.
|
||||
recv: Receive a value from the channel.
|
||||
close: Close the channel.
|
||||
__iter__: Return the channel object.
|
||||
__next__: Receive a value from the channel.
|
||||
__lshift__: Send a value to the channel.
|
||||
__rlshift__: Receive a value from the channel.
|
||||
管道的接收端。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the channel.
|
||||
|
||||
初始化通道。
|
||||
"""
|
||||
self.send_conn, self.recv_conn = Pipe()
|
||||
|
||||
self.is_closed = False
|
||||
|
||||
def send(self, value: T):
|
||||
"""
|
||||
Send a value to the channel.
|
||||
|
||||
发送一个值到通道。
|
||||
|
||||
Args:
|
||||
value: The value to send.
|
||||
要发送的值。
|
||||
"""
|
||||
self.send_conn.send(value)
|
||||
|
||||
def recv(self, timeout: float | None = None) -> T | None | NoRecvValue:
|
||||
"""Receive a value from the channel.
|
||||
If the timeout is None, it will block until a value is received.
|
||||
If the timeout is a positive number, it will wait for the specified time, and if no value is received, it will return None.
|
||||
接收通道中的值。
|
||||
如果超时为None,则它将阻塞,直到接收到值。
|
||||
如果超时是正数,则它将等待指定的时间,如果没有接收到值,则返回None。
|
||||
def recv(self, timeout: float | None = None) -> T | NoRecvValue:
|
||||
"""
|
||||
Receive a value from the channel.
|
||||
|
||||
从通道接收一个值。
|
||||
|
||||
Args:
|
||||
timeout:
|
||||
The maximum time to wait for a value.
|
||||
等待值的最长时间。
|
||||
timeout: The maximum time to wait for a value.
|
||||
等待值的最长时间。
|
||||
If None, it will block until a value is received.
|
||||
如果为 None,则阻塞直到接收到值。
|
||||
|
||||
Returns:
|
||||
T: The value received from the channel.
|
||||
从通道接收的值。
|
||||
从通道接收的值。
|
||||
"""
|
||||
if timeout is not None:
|
||||
if not self.recv_conn.poll(timeout):
|
||||
return NoRecvValue("No value to receive.")
|
||||
return self.recv_conn.recv()
|
||||
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close the channel. destructor
|
||||
Close the channel.
|
||||
|
||||
关闭通道。
|
||||
"""
|
||||
self.send_conn.close()
|
||||
self.recv_conn.close()
|
||||
@ -78,42 +84,66 @@ class Chan(Generic[T]):
|
||||
|
||||
def __iter__(self) -> "Chan[T]":
|
||||
"""
|
||||
Returns: The channel object.
|
||||
Return the channel object.
|
||||
|
||||
返回通道对象。
|
||||
"""
|
||||
return self
|
||||
|
||||
def __next__(self) -> T:
|
||||
def __next__(self) -> T | NoRecvValue:
|
||||
"""
|
||||
Receive a value from the channel.
|
||||
|
||||
从通道接收一个值。
|
||||
"""
|
||||
return self.recv()
|
||||
|
||||
def __lshift__(self, other: T):
|
||||
"""
|
||||
chan << obj
|
||||
Send an object to the channel.
|
||||
|
||||
发送一个对象到通道。
|
||||
|
||||
Args:
|
||||
other: The object to send.
|
||||
Returns: self
|
||||
要发送的对象。
|
||||
|
||||
Returns:
|
||||
self: The channel object.
|
||||
通道对象。
|
||||
"""
|
||||
self.send(other)
|
||||
return self
|
||||
|
||||
def __rlshift__(self, other: Any) -> T:
|
||||
def __rlshift__(self, other: Any) -> T | NoRecvValue:
|
||||
"""
|
||||
<< chan
|
||||
Returns: The value received from the channel.
|
||||
Receive a value from the channel.
|
||||
|
||||
从通道接收一个值。
|
||||
|
||||
Returns:
|
||||
T: The value received from the channel.
|
||||
从通道接收的值。
|
||||
"""
|
||||
return self.recv(None)
|
||||
|
||||
def __add__(self, other: "Chan[T]"):
|
||||
"""
|
||||
Connect 1 channel.send to another channel.recv.
|
||||
Connect one channel's send to another channel's recv.
|
||||
|
||||
将一个通道的发送端连接到另一个通道的接收端。
|
||||
|
||||
Args:
|
||||
other:
|
||||
Returns:
|
||||
other: The other channel to connect.
|
||||
要连接的另一个通道。
|
||||
"""
|
||||
self.recv_conn, other.recv_conn = other.recv_conn, self.recv_conn
|
||||
|
||||
def __del__(self):
|
||||
"""
|
||||
Close the channel when the object is deleted.
|
||||
|
||||
当对象被删除时关闭通道。
|
||||
"""
|
||||
self.close()
|
||||
|
||||
|
@ -1,8 +1,5 @@
|
||||
import time
|
||||
from select import select
|
||||
|
||||
from magicoca.chan import Chan
|
||||
from multiprocessing import Process, set_start_method
|
||||
from multiprocessing import Process
|
||||
|
||||
|
||||
|
||||
|
@ -4,41 +4,35 @@ from multiprocessing import Process
|
||||
from magicoca import Chan, select
|
||||
|
||||
|
||||
def sp1(chan: Chan[int]):
|
||||
for i in range(10):
|
||||
chan << i << i * 2
|
||||
|
||||
|
||||
def sp2(chan: Chan[int]):
|
||||
for i in range(10):
|
||||
chan << i << i * 3
|
||||
|
||||
|
||||
def rp(chans: list[Chan[int]]):
|
||||
rl = []
|
||||
for t in select(*chans):
|
||||
rl.append(t)
|
||||
if len(rl) == 40:
|
||||
break
|
||||
print(rl)
|
||||
|
||||
def send_process(chan: Chan[int], _id: int):
|
||||
while True:
|
||||
chan << _id
|
||||
time.sleep(2)
|
||||
for i in range(10):
|
||||
chan << i
|
||||
time.sleep(0.1 * _id)
|
||||
|
||||
def recv_process(chan_list: list[Chan[int]]):
|
||||
c = []
|
||||
for t in select(*chan_list):
|
||||
print(t)
|
||||
|
||||
|
||||
c.append(t)
|
||||
print("Select", t)
|
||||
if len(c) == 30:
|
||||
break
|
||||
class TestSelect:
|
||||
def test_select(self):
|
||||
chan_list = []
|
||||
for i in range(10):
|
||||
chan = Chan[int]()
|
||||
chan_list.append(chan)
|
||||
p = Process(target=send_process, args=(chan, i))
|
||||
p.start()
|
||||
p = Process(target=recv_process, args=(chan_list,))
|
||||
p.start()
|
||||
ch1 = Chan[int]()
|
||||
ch2 = Chan[int]()
|
||||
ch3 = Chan[int]()
|
||||
|
||||
p1 = Process(target=send_process, args=(ch1, 1))
|
||||
p2 = Process(target=send_process, args=(ch2, 2))
|
||||
p3 = Process(target=send_process, args=(ch3, 3))
|
||||
p4 = Process(target=recv_process, args=([ch1, ch2, ch3],))
|
||||
|
||||
p1.start()
|
||||
p2.start()
|
||||
p3.start()
|
||||
p4.start()
|
||||
|
||||
p1.join()
|
||||
p2.join()
|
||||
p3.join()
|
||||
p4.join()
|
||||
|
Loading…
x
Reference in New Issue
Block a user