mirror of
https://github.com/LiteyukiStudio/LiteyukiBot.git
synced 2025-07-30 08:49:51 +00:00
✨ 对通道类添加类型检查和泛型
This commit is contained in:
@ -4,7 +4,7 @@
|
||||
"""
|
||||
|
||||
import threading
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from liteyuki.comm.channel import Channel
|
||||
from liteyuki.utils import IS_MAIN_PROCESS
|
||||
@ -28,11 +28,10 @@ def _get_lock(key) -> threading.Lock:
|
||||
class KeyValueStore:
|
||||
def __init__(self):
|
||||
self._store = {}
|
||||
self.active_chan = Channel[tuple[str, Optional[dict[str, Any]]]](_id="shared_memory-active")
|
||||
self.passive_chan = Channel[tuple[str, Optional[dict[str, Any]]]](_id="shared_memory-passive")
|
||||
|
||||
self.active_chan = Channel(_id="shared_memory-active")
|
||||
self.passive_chan = Channel(_id="shared_memory-passive")
|
||||
|
||||
def set(self, key: str, value: any) -> None:
|
||||
def set(self, key: str, value: Any) -> None:
|
||||
"""
|
||||
设置键值对
|
||||
Args:
|
||||
@ -46,9 +45,17 @@ class KeyValueStore:
|
||||
self._store[key] = value
|
||||
else:
|
||||
# 向主进程发送请求拿取
|
||||
self.passive_chan.send(("set", key, value))
|
||||
self.passive_chan.send(
|
||||
(
|
||||
"set",
|
||||
{
|
||||
"key" : key,
|
||||
"value": value
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
def get(self, key: str, default: Optional[any] = None) -> any:
|
||||
def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
|
||||
"""
|
||||
获取键值对
|
||||
Args:
|
||||
@ -56,15 +63,26 @@ class KeyValueStore:
|
||||
default: 默认值
|
||||
|
||||
Returns:
|
||||
any: 值
|
||||
Any: 值
|
||||
"""
|
||||
if IS_MAIN_PROCESS:
|
||||
lock = _get_lock(key)
|
||||
with lock:
|
||||
return self._store.get(key, default)
|
||||
else:
|
||||
self.passive_chan.send(("get", key, default))
|
||||
return self.active_chan.receive()
|
||||
recv_chan = Channel[Optional[Any]]("recv_chan")
|
||||
self.passive_chan.send(
|
||||
(
|
||||
"get",
|
||||
{
|
||||
"key" : key,
|
||||
"default" : default,
|
||||
"recv_chan": recv_chan
|
||||
}
|
||||
|
||||
)
|
||||
)
|
||||
return recv_chan.receive()
|
||||
|
||||
def delete(self, key: str, ignore_key_error: bool = True) -> None:
|
||||
"""
|
||||
@ -87,92 +105,34 @@ class KeyValueStore:
|
||||
raise e
|
||||
else:
|
||||
# 向主进程发送请求删除
|
||||
self.passive_chan.send(("delete", key))
|
||||
self.passive_chan.send(
|
||||
(
|
||||
"delete",
|
||||
{
|
||||
"key": key
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
def get_all(self) -> dict[str, any]:
|
||||
def get_all(self) -> dict[str, Any]:
|
||||
"""
|
||||
获取所有键值对
|
||||
Returns:
|
||||
dict[str, any]: 键值对
|
||||
dict[str, Any]: 键值对
|
||||
"""
|
||||
if IS_MAIN_PROCESS:
|
||||
return self._store
|
||||
else:
|
||||
self.passive_chan.send(("get_all",))
|
||||
return self.active_chan.receive()
|
||||
|
||||
|
||||
class KeyValueStoreNoLock:
|
||||
def __init__(self):
|
||||
self._store = {}
|
||||
|
||||
self.active_chan = Channel(_id="shared_memory-active")
|
||||
self.passive_chan = Channel(_id="shared_memory-passive")
|
||||
|
||||
def set(self, key: str, value: any) -> None:
|
||||
"""
|
||||
设置键值对
|
||||
Args:
|
||||
key: 键
|
||||
value: 值
|
||||
|
||||
"""
|
||||
if IS_MAIN_PROCESS:
|
||||
self._store[key] = value
|
||||
else:
|
||||
# 向主进程发送请求拿取
|
||||
self.passive_chan.send(("set", key, value))
|
||||
|
||||
def get(self, key: str, default: Optional[any] = None) -> any:
|
||||
"""
|
||||
获取键值对
|
||||
Args:
|
||||
key: 键
|
||||
default: 默认值
|
||||
|
||||
Returns:
|
||||
any: 值
|
||||
"""
|
||||
if IS_MAIN_PROCESS:
|
||||
return self._store.get(key, default)
|
||||
else:
|
||||
temp_chan = Channel("temp_chan")
|
||||
self.passive_chan.send(("get", key, default, temp_chan))
|
||||
return temp_chan.receive()
|
||||
|
||||
def delete(self, key: str, ignore_key_error: bool = True) -> None:
|
||||
"""
|
||||
删除键值对
|
||||
Args:
|
||||
key: 键
|
||||
ignore_key_error: 是否忽略键不存在的错误
|
||||
|
||||
Returns:
|
||||
"""
|
||||
if IS_MAIN_PROCESS:
|
||||
if key in self._store:
|
||||
try:
|
||||
del self._store[key]
|
||||
del _locks[key]
|
||||
except KeyError as e:
|
||||
if not ignore_key_error:
|
||||
raise e
|
||||
else:
|
||||
# 向主进程发送请求删除
|
||||
self.passive_chan.send(("delete", key))
|
||||
|
||||
def get_all(self) -> dict[str, any]:
|
||||
"""
|
||||
获取所有键值对
|
||||
Returns:
|
||||
dict[str, any]: 键值对
|
||||
"""
|
||||
if IS_MAIN_PROCESS:
|
||||
return self._store
|
||||
else:
|
||||
temp_chan = Channel("temp_chan")
|
||||
self.passive_chan.send(("get_all", temp_chan))
|
||||
return temp_chan.receive()
|
||||
recv_chan = Channel[dict[str, Any]]("recv_chan")
|
||||
self.passive_chan.send(
|
||||
(
|
||||
"get_all",
|
||||
{
|
||||
"recv_chan": recv_chan
|
||||
}
|
||||
)
|
||||
)
|
||||
return recv_chan.receive()
|
||||
|
||||
|
||||
class GlobalKeyValueStore:
|
||||
@ -191,19 +151,18 @@ class GlobalKeyValueStore:
|
||||
raise RuntimeError("Cannot get instance in sub process.")
|
||||
|
||||
|
||||
shared_memory: Optional[KeyValueStore] = None
|
||||
|
||||
# 全局单例访问点
|
||||
if IS_MAIN_PROCESS:
|
||||
shared_memory = GlobalKeyValueStore.get_instance()
|
||||
shared_memory: KeyValueStore = GlobalKeyValueStore.get_instance()
|
||||
|
||||
@shared_memory.passive_chan.on_receive(lambda d: d[0] == "get")
|
||||
def on_get(data: tuple[str, str, any, Channel]):
|
||||
data[3].send(shared_memory.get(data[1], data[2]))
|
||||
def on_get():
|
||||
# TODO
|
||||
pass
|
||||
|
||||
|
||||
@shared_memory.passive_chan.on_receive(lambda d: d[0] == "set")
|
||||
def on_set(data: tuple[str, str, any]):
|
||||
def on_set(data: tuple[str, str, Any]):
|
||||
shared_memory.set(data[1], data[2])
|
||||
|
||||
|
||||
@ -218,7 +177,7 @@ if IS_MAIN_PROCESS:
|
||||
data[1].send(shared_memory.get_all())
|
||||
else:
|
||||
# 子进程在入口函数中对shared_memory进行初始化
|
||||
shared_memory = None
|
||||
shared_memory: Optional[KeyValueStore] = None # type: ignore
|
||||
|
||||
_ref_count = 0 # import 引用计数, 防止获取空指针
|
||||
if not IS_MAIN_PROCESS:
|
||||
|
Reference in New Issue
Block a user