mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-16 02:50:48 +00:00
♻️ reorganize class and add bot hook di
This commit is contained in:
133
nonebot/internal/permission.py
Normal file
133
nonebot/internal/permission.py
Normal file
@ -0,0 +1,133 @@
|
||||
import asyncio
|
||||
from contextlib import AsyncExitStack
|
||||
from typing import Any, Set, Tuple, Union, NoReturn, Optional, Coroutine
|
||||
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.dependencies import Dependent
|
||||
from nonebot.exception import SkippedException
|
||||
from nonebot.typing import T_DependencyCache, T_PermissionChecker
|
||||
|
||||
from .params import BotParam, EventParam, DependParam, DefaultParam
|
||||
|
||||
|
||||
async def _run_coro_with_catch(coro: Coroutine[Any, Any, Any]):
|
||||
try:
|
||||
return await coro
|
||||
except SkippedException:
|
||||
return False
|
||||
|
||||
|
||||
class Permission:
|
||||
"""{ref}`nonebot.matcher.Matcher` 权限类。
|
||||
|
||||
当事件传递时,在 {ref}`nonebot.matcher.Matcher` 运行前进行检查。
|
||||
|
||||
参数:
|
||||
checkers: PermissionChecker
|
||||
|
||||
用法:
|
||||
```python
|
||||
Permission(async_function) | sync_function
|
||||
# 等价于
|
||||
Permission(async_function, sync_function)
|
||||
```
|
||||
"""
|
||||
|
||||
__slots__ = ("checkers",)
|
||||
|
||||
HANDLER_PARAM_TYPES = [
|
||||
DependParam,
|
||||
BotParam,
|
||||
EventParam,
|
||||
DefaultParam,
|
||||
]
|
||||
|
||||
def __init__(self, *checkers: Union[T_PermissionChecker, Dependent[bool]]) -> None:
|
||||
self.checkers: Set[Dependent[bool]] = set(
|
||||
checker
|
||||
if isinstance(checker, Dependent)
|
||||
else Dependent[bool].parse(
|
||||
call=checker, allow_types=self.HANDLER_PARAM_TYPES
|
||||
)
|
||||
for checker in checkers
|
||||
)
|
||||
"""存储 `PermissionChecker`"""
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
bot: Bot,
|
||||
event: Event,
|
||||
stack: Optional[AsyncExitStack] = None,
|
||||
dependency_cache: Optional[T_DependencyCache] = None,
|
||||
) -> bool:
|
||||
"""检查是否满足某个权限
|
||||
|
||||
参数:
|
||||
bot: Bot 对象
|
||||
event: Event 对象
|
||||
stack: 异步上下文栈
|
||||
dependency_cache: 依赖缓存
|
||||
"""
|
||||
if not self.checkers:
|
||||
return True
|
||||
results = await asyncio.gather(
|
||||
*(
|
||||
_run_coro_with_catch(
|
||||
checker(
|
||||
bot=bot,
|
||||
event=event,
|
||||
stack=stack,
|
||||
dependency_cache=dependency_cache,
|
||||
)
|
||||
)
|
||||
for checker in self.checkers
|
||||
),
|
||||
)
|
||||
return any(results)
|
||||
|
||||
def __and__(self, other) -> NoReturn:
|
||||
raise RuntimeError("And operation between Permissions is not allowed.")
|
||||
|
||||
def __or__(
|
||||
self, other: Optional[Union["Permission", T_PermissionChecker]]
|
||||
) -> "Permission":
|
||||
if other is None:
|
||||
return self
|
||||
elif isinstance(other, Permission):
|
||||
return Permission(*self.checkers, *other.checkers)
|
||||
else:
|
||||
return Permission(*self.checkers, other)
|
||||
|
||||
|
||||
class User:
|
||||
"""检查当前事件是否属于指定会话
|
||||
|
||||
参数:
|
||||
users: 会话 ID 元组
|
||||
perm: 需同时满足的权限
|
||||
"""
|
||||
|
||||
__slots__ = ("users", "perm")
|
||||
|
||||
def __init__(
|
||||
self, users: Tuple[str, ...], perm: Optional[Permission] = None
|
||||
) -> None:
|
||||
self.users = users
|
||||
self.perm = perm
|
||||
|
||||
async def __call__(self, bot: Bot, event: Event) -> bool:
|
||||
return bool(
|
||||
event.get_session_id() in self.users
|
||||
and (self.perm is None or await self.perm(bot, event))
|
||||
)
|
||||
|
||||
|
||||
def USER(*users: str, perm: Optional[Permission] = None):
|
||||
"""匹配当前事件属于指定会话
|
||||
|
||||
参数:
|
||||
user: 会话白名单
|
||||
perm: 需要同时满足的权限
|
||||
"""
|
||||
|
||||
return Permission(User(users, perm))
|
Reference in New Issue
Block a user