✨ 支持通道通过通道在进程中传递
This commit is contained in:
@ -15,6 +15,7 @@ from multiprocessing import Pipe
|
||||
from typing import Any, Callable, Coroutine, Generic, Optional, TypeAlias, TypeVar, get_args
|
||||
|
||||
from liteyuki.utils import IS_MAIN_PROCESS, is_coroutine_callable, run_coroutine
|
||||
from liteyuki.log import logger
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@ -217,6 +218,10 @@ class Channel(Generic[T]):
|
||||
def __next__(self) -> Any:
|
||||
return self.receive()
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
logger.debug(f"Channel {self.name} deleted.")
|
||||
|
||||
|
||||
"""子进程可用的主动和被动通道"""
|
||||
active_channel: Optional["Channel"] = None
|
||||
@ -232,8 +237,20 @@ if IS_MAIN_PROCESS:
|
||||
|
||||
@channel_deliver_passive_channel.on_receive(filter_func=lambda data: data[0] == "set_channel")
|
||||
def on_set_channel(data: tuple[str, dict[str, Any]]):
|
||||
name, channel, temp_channel = data[1]["name"], data[1]["channel"], _channel[data[0]]
|
||||
temp_channel.send(set_channel(name, channel))
|
||||
name, channel = data[1]["name"], data[1]["channel"]
|
||||
set_channel(name, channel)
|
||||
|
||||
|
||||
@channel_deliver_passive_channel.on_receive(filter_func=lambda data: data[0] == "get_channel")
|
||||
def on_get_channel(data: tuple[str, dict[str, Any]]):
|
||||
name, recv_chan = data[1]["name"], data[1]["recv_chan"]
|
||||
recv_chan.send(get_channel(name))
|
||||
|
||||
|
||||
@channel_deliver_passive_channel.on_receive(filter_func=lambda data: data[0] == "get_channels")
|
||||
def on_get_channels(data: tuple[str, dict[str, Any]]):
|
||||
recv_chan = data[1]["recv_chan"]
|
||||
recv_chan.send(get_channels())
|
||||
|
||||
|
||||
def set_channel(name: str, channel: Channel):
|
||||
@ -266,9 +283,6 @@ def set_channels(channels: dict[str, Channel]):
|
||||
Args:
|
||||
channels: 通道名称
|
||||
"""
|
||||
if not IS_MAIN_PROCESS:
|
||||
raise RuntimeError(f"Function {__name__} should only be called in the main process.")
|
||||
|
||||
for name, channel in channels.items():
|
||||
set_channel(name, channel)
|
||||
|
||||
@ -280,10 +294,21 @@ def get_channel(name: str) -> Channel:
|
||||
name: 通道名称
|
||||
Returns:
|
||||
"""
|
||||
if not IS_MAIN_PROCESS:
|
||||
raise RuntimeError(f"Function {__name__} should only be called in the main process.")
|
||||
if IS_MAIN_PROCESS:
|
||||
return _channel[name]
|
||||
|
||||
return _channel[name]
|
||||
else:
|
||||
recv_chan = Channel[Channel[Any]]("recv_chan")
|
||||
channel_deliver_passive_channel.send(
|
||||
(
|
||||
"get_channel",
|
||||
{
|
||||
"name" : name,
|
||||
"recv_chan": recv_chan
|
||||
}
|
||||
)
|
||||
)
|
||||
return recv_chan.receive()
|
||||
|
||||
|
||||
def get_channels() -> dict[str, Channel]:
|
||||
@ -291,7 +316,16 @@ def get_channels() -> dict[str, Channel]:
|
||||
获取通道实例
|
||||
Returns:
|
||||
"""
|
||||
if not IS_MAIN_PROCESS:
|
||||
raise RuntimeError(f"Function {__name__} should only be called in the main process.")
|
||||
|
||||
return _channel
|
||||
if IS_MAIN_PROCESS:
|
||||
return _channel
|
||||
else:
|
||||
recv_chan = Channel[dict[str, Channel[Any]]]("recv_chan")
|
||||
channel_deliver_passive_channel.send(
|
||||
(
|
||||
"get_channels",
|
||||
{
|
||||
"recv_chan": recv_chan
|
||||
}
|
||||
)
|
||||
)
|
||||
return recv_chan.receive()
|
||||
|
Reference in New Issue
Block a user