⚗️ change permission to use handler

This commit is contained in:
yanyongyu
2021-11-21 12:36:44 +08:00
parent a5948fb5e3
commit b4d12d905d
9 changed files with 207 additions and 122 deletions

View File

@ -10,9 +10,12 @@ r"""
"""
import asyncio
from typing import Union, Callable, NoReturn, Optional, Awaitable
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Type, Union, Callable, NoReturn, Optional
from nonebot.utils import run_sync
from nonebot import params
from nonebot.handler import Handler
from nonebot.dependencies import Param
from nonebot.adapters import Bot, Event
from nonebot.typing import T_PermissionChecker
@ -34,14 +37,23 @@ class Permission:
"""
__slots__ = ("checkers",)
def __init__(self, *checkers: Callable[[Bot, Event],
Awaitable[bool]]) -> None:
HANDLER_PARAM_TYPES: List[Type[Param]] = [
params.BotParam, params.EventParam
]
def __init__(self,
*checkers: T_PermissionChecker,
dependency_overrides_provider: Optional[Any] = None) -> None:
"""
:参数:
* ``*checkers: Callable[[Bot, Event], Awaitable[bool]]``: **异步** PermissionChecker
* ``*checkers: T_PermissionChecker``: PermissionChecker
"""
self.checkers = set(checkers)
self.checkers = set(
Handler(checker,
allow_types=self.HANDLER_PARAM_TYPES,
dependency_overrides_provider=dependency_overrides_provider)
for checker in checkers)
"""
:说明:
@ -49,10 +61,16 @@ class Permission:
:类型:
* ``Set[Callable[[Bot, Event], Awaitable[bool]]]``
* ``Set[Handler]``
"""
async def __call__(self, bot: Bot, event: Event) -> bool:
async def __call__(
self,
bot: Bot,
event: Event,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[Dict[Callable[..., Any],
Any]] = None) -> bool:
"""
:说明:
@ -62,6 +80,8 @@ class Permission:
* ``bot: Bot``: Bot 对象
* ``event: Event``: Event 对象
* ``stack: Optional[AsyncExitStack]``: 异步上下文栈
* ``dependency_cache: Optional[Dict[Callable[..., Any], Any]]``: 依赖缓存
:返回:
@ -70,7 +90,11 @@ class Permission:
if not self.checkers:
return True
results = await asyncio.gather(
*map(lambda c: c(bot, event), self.checkers))
checker(bot=bot,
event=event,
_stack=stack,
_dependency_cache=dependency_cache)
for checker in self.checkers)
return any(results)
def __and__(self, other) -> NoReturn:
@ -79,16 +103,12 @@ class Permission:
def __or__(
self, other: Optional[Union["Permission",
T_PermissionChecker]]) -> "Permission":
checkers = self.checkers.copy()
if other is None:
return self
elif isinstance(other, Permission):
checkers |= other.checkers
elif asyncio.iscoroutinefunction(other):
checkers.add(other) # type: ignore
return Permission(*self.checkers, *other.checkers)
else:
checkers.add(run_sync(other))
return Permission(*checkers)
return Permission(*self.checkers, other)
async def _message(bot: Bot, event: Event) -> bool: