8 Commits

7 changed files with 253 additions and 55 deletions

View File

@@ -3,7 +3,7 @@ name: Publish
on:
push:
tags:
- '*'
- 'v*'
jobs:
pypi-publish:

View File

@@ -0,0 +1,25 @@
from multiprocessing import set_start_method
from typing import Any, Callable, Generator
from magicoca.chan import Chan, T, NoRecvValue
__all__ = [
"Chan",
"select"
]
set_start_method("spawn", force=True)
def select(*args: Chan[T]) -> Generator[T, None, None]:
"""
Return a yield, when a value is received from one of the channels.
Args:
args: channels
"""
while True:
for ch in args:
if ch.is_closed:
continue
if not isinstance(value := ch.recv(0), NoRecvValue):
yield value

View File

@@ -1,28 +1,84 @@
"""
Chan is a simple channel implementation using multiprocessing.Pipe.
"""
from multiprocessing import Pipe
from typing import TypeVar, Generic, Any
T = TypeVar("T")
class NoRecvValue(Exception):
"""
Exception raised when there is no value to receive.
"""
pass
class Chan(Generic[T]):
def __init__(self, buffer: int = 0):
self._buffer = buffer
self._send_conn, self._recv_conn = Pipe()
"""
Chan is a simple channel implementation using multiprocessing.Pipe.
Attributes:
send_conn: The sending end of the pipe.
recv_conn: The receiving end of the pipe.
Methods:
send: Send a value to the channel.
recv: Receive a value from the channel.
close: Close the channel.
__iter__: Return the channel object.
__next__: Receive a value from the channel.
__lshift__: Send a value to the channel.
__rlshift__: Receive a value from the channel.
"""
def __init__(self):
"""
Initialize the channel.
"""
self.send_conn, self.recv_conn = Pipe()
self.is_closed = False
def send(self, value: T):
self._send_conn.send(value)
"""
Send a value to the channel.
Args:
value: The value to send.
"""
self.send_conn.send(value)
def recv(self, timeout: float | None = None) -> T | None | NoRecvValue:
"""Receive a value from the channel.
If the timeout is None, it will block until a value is received.
If the timeout is a positive number, it will wait for the specified time, and if no value is received, it will return None.
接收通道中的值。
如果超时为None则它将阻塞直到接收到值。
如果超时是正数则它将等待指定的时间如果没有接收到值则返回None。
Args:
timeout:
The maximum time to wait for a value.
等待值的最长时间。
Returns:
T: The value received from the channel.
从通道接收的值。
"""
if timeout is not None:
if not self.recv_conn.poll(timeout):
return NoRecvValue("No value to receive.")
return self.recv_conn.recv()
def recv(self) -> T:
return self._recv_conn.recv()
def close(self):
self._send_conn.close()
self._recv_conn.close()
def __iter__(self):
"""
Close the channel. destructor
"""
self.send_conn.close()
self.recv_conn.close()
self.is_closed = True
Returns:
def __iter__(self) -> "Chan[T]":
"""
Returns: The channel object.
"""
return self
@@ -33,9 +89,8 @@ class Chan(Generic[T]):
"""
chan << obj
Args:
other:
Returns:
other: The object to send.
Returns: self
"""
self.send(other)
return self
@@ -43,7 +98,21 @@ class Chan(Generic[T]):
def __rlshift__(self, other: Any) -> T:
"""
<< chan
Returns:
Returns: The value received from the channel.
"""
return self.recv()
return self.recv(None)
def __add__(self, other: "Chan[T]"):
"""
Connect 1 channel.send to another channel.recv.
Args:
other:
Returns:
"""
self.recv_conn, other.recv_conn = other.recv_conn, self.recv_conn
def __del__(self):
"""
Close the channel when the object is deleted.
"""
self.close()

55
pdm.lock generated
View File

@@ -5,7 +5,7 @@
groups = ["default", "dev"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:7448dc54658d779bbdd51b39ae72faf632280702d8719337a0fff377415332ba"
content_hash = "sha256:0036f04f76c33e040729d2763c99d933269bae8a2417a2f5846a78887dd196c9"
[[metadata.targets]]
requires_python = ">=3.10"
@@ -45,6 +45,48 @@ files = [
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
]
[[package]]
name = "mypy"
version = "1.11.2"
requires_python = ">=3.8"
summary = "Optional static typing for Python"
groups = ["dev"]
dependencies = [
"mypy-extensions>=1.0.0",
"tomli>=1.1.0; python_version < \"3.11\"",
"typing-extensions>=4.6.0",
]
files = [
{file = "mypy-1.11.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d42a6dd818ffce7be66cce644f1dff482f1d97c53ca70908dff0b9ddc120b77a"},
{file = "mypy-1.11.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:801780c56d1cdb896eacd5619a83e427ce436d86a3bdf9112527f24a66618fef"},
{file = "mypy-1.11.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:41ea707d036a5307ac674ea172875f40c9d55c5394f888b168033177fce47383"},
{file = "mypy-1.11.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:6e658bd2d20565ea86da7d91331b0eed6d2eee22dc031579e6297f3e12c758c8"},
{file = "mypy-1.11.2-cp310-cp310-win_amd64.whl", hash = "sha256:478db5f5036817fe45adb7332d927daa62417159d49783041338921dcf646fc7"},
{file = "mypy-1.11.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:75746e06d5fa1e91bfd5432448d00d34593b52e7e91a187d981d08d1f33d4385"},
{file = "mypy-1.11.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a976775ab2256aadc6add633d44f100a2517d2388906ec4f13231fafbb0eccca"},
{file = "mypy-1.11.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:cd953f221ac1379050a8a646585a29574488974f79d8082cedef62744f0a0104"},
{file = "mypy-1.11.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:57555a7715c0a34421013144a33d280e73c08df70f3a18a552938587ce9274f4"},
{file = "mypy-1.11.2-cp311-cp311-win_amd64.whl", hash = "sha256:36383a4fcbad95f2657642a07ba22ff797de26277158f1cc7bd234821468b1b6"},
{file = "mypy-1.11.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e8960dbbbf36906c5c0b7f4fbf2f0c7ffb20f4898e6a879fcf56a41a08b0d318"},
{file = "mypy-1.11.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:06d26c277962f3fb50e13044674aa10553981ae514288cb7d0a738f495550b36"},
{file = "mypy-1.11.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6e7184632d89d677973a14d00ae4d03214c8bc301ceefcdaf5c474866814c987"},
{file = "mypy-1.11.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:3a66169b92452f72117e2da3a576087025449018afc2d8e9bfe5ffab865709ca"},
{file = "mypy-1.11.2-cp312-cp312-win_amd64.whl", hash = "sha256:969ea3ef09617aff826885a22ece0ddef69d95852cdad2f60c8bb06bf1f71f70"},
{file = "mypy-1.11.2-py3-none-any.whl", hash = "sha256:b499bc07dbdcd3de92b0a8b29fdf592c111276f6a12fe29c30f6c417dd546d12"},
{file = "mypy-1.11.2.tar.gz", hash = "sha256:7f9993ad3e0ffdc95c2a14b66dee63729f021968bff8ad911867579c65d13a79"},
]
[[package]]
name = "mypy-extensions"
version = "1.0.0"
requires_python = ">=3.5"
summary = "Type system extensions for programs checked with the mypy type checker."
groups = ["dev"]
files = [
{file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
{file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
]
[[package]]
name = "packaging"
version = "24.1"
@@ -97,3 +139,14 @@ files = [
{file = "tomli-2.0.2-py3-none-any.whl", hash = "sha256:2ebe24485c53d303f690b0ec092806a085f07af5a5aa1464f3931eec36caaa38"},
{file = "tomli-2.0.2.tar.gz", hash = "sha256:d46d457a85337051c36524bc5349dd91b1877838e2979ac5ced3e710ed8a60ed"},
]
[[package]]
name = "typing-extensions"
version = "4.12.2"
requires_python = ">=3.8"
summary = "Backported and Experimental Type Hints for Python 3.8+"
groups = ["dev"]
files = [
{file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
{file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
]

View File

@@ -23,6 +23,7 @@ tag_regex = '^v(?:\D*)?(?P<version>([1-9][0-9]*!)?(0|[1-9][0-9]*)(\.(0|[1-9][0-9
[tool.pdm.dev-dependencies]
dev = [
"pytest>=8.3.3",
"mypy>=1.11.2",
]
[tool.pdm]
distribution = true

View File

@@ -1,30 +1,36 @@
import time
from select import select
from magicoca.chan import Chan
from multiprocessing import Process
from multiprocessing import Process, set_start_method
def p1f(chan: Chan[int]):
for i in range(10):
chan << i
chan << -1
def p2f(chan: Chan[int]):
recv_ans = []
while True:
a = int << chan
print("Recv", a)
recv_ans.append(a)
if a == -1:
break
if recv_ans != list(range(10)) + [-1]:
raise ValueError("Chan Shift Test Failed")
class TestChan:
def test_test(self):
print("Test is running")
def test_chan_shift(self):
"""测试运算符"""
ch = Chan[int]()
def p1f(chan: Chan[int]):
for i in range(10):
time.sleep(1)
chan << i
chan << -1
def p2f(chan: Chan[int]):
recv_ans = []
while True:
a = int << chan
print("Recv", a)
recv_ans.append(a)
if a == -1:
break
if recv_ans != list(range(10)) + [-1]:
raise ValueError("Chan Shift Test Failed")
print("Test Chan Shift")
p1 = Process(target=p1f, args=(ch,))
p2 = Process(target=p2f, args=(ch,))
@@ -34,25 +40,9 @@ class TestChan:
p2.join()
def test_chan_sr(self):
"""测试收发"""
ch = Chan[int]()
def p1f(chan: Chan[int]):
for i in range(10):
time.sleep(1)
chan.send(i)
chan.send(-1)
def p2f(chan: Chan[int]):
recv_ans = []
while True:
a = chan.recv()
recv_ans.append(a)
print("Recv2", a)
if a == -1:
break
if recv_ans != list(range(10)) + [-1]:
raise ValueError("Chan SR Test Failed")
print("Test Chan SR")
p1 = Process(target=p1f, args=(ch,))
p2 = Process(target=p2f, args=(ch,))
@@ -60,3 +50,21 @@ class TestChan:
p2.start()
p1.join()
p2.join()
def test_connect(self):
"""测试双通道连接"""
chan1 = Chan[int]()
chan2 = Chan[int]()
chan1 + chan2
print("Test Chan Connect")
p1 = Process(target=p1f, args=(chan1,))
p2 = Process(target=p2f, args=(chan2,))
p1.start()
p2.start()
p1.join()
p2.join()

42
tests/test_select.py Normal file
View File

@@ -0,0 +1,42 @@
from multiprocessing import Process
from magicoca import Chan, select
def sp1(chan: Chan[int]):
for i in range(10):
chan << i << i * 2
def sp2(chan: Chan[int]):
for i in range(10):
chan << i << i * 3
def rp(chans: list[Chan[int]]):
rl = []
for t in select(*chans):
rl.append(t)
if len(rl) == 40:
break
print(rl)
class TestSelect:
def test_select(self):
chan1 = Chan[int]()
chan2 = Chan[int]()
print("Test Chan Select")
p1 = Process(target=sp1, args=(chan1,))
p2 = Process(target=sp2, args=(chan2,))
p3 = Process(target=rp, args=([chan1, chan2],))
p3.start()
p1.start()
p2.start()
p1.join()
p2.join()
p3.join()