mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-17 03:20:54 +00:00
🎨 format code using black and isort
This commit is contained in:
101
nonebot/rule.py
101
nonebot/rule.py
@ -17,8 +17,18 @@ from argparse import Namespace
|
||||
from contextlib import AsyncExitStack
|
||||
from typing_extensions import TypedDict
|
||||
from argparse import ArgumentParser as ArgParser
|
||||
from typing import (Any, Dict, List, Type, Tuple, Union, Callable, NoReturn,
|
||||
Optional, Sequence)
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Type,
|
||||
Tuple,
|
||||
Union,
|
||||
Callable,
|
||||
NoReturn,
|
||||
Optional,
|
||||
Sequence,
|
||||
)
|
||||
|
||||
from pygtrie import CharTrie
|
||||
|
||||
@ -33,10 +43,9 @@ PREFIX_KEY = "_prefix"
|
||||
SUFFIX_KEY = "_suffix"
|
||||
CMD_KEY = "command"
|
||||
RAW_CMD_KEY = "raw_command"
|
||||
CMD_RESULT = TypedDict("CMD_RESULT", {
|
||||
"command": Optional[Tuple[str, ...]],
|
||||
"raw_command": Optional[str]
|
||||
})
|
||||
CMD_RESULT = TypedDict(
|
||||
"CMD_RESULT", {"command": Optional[Tuple[str, ...]], "raw_command": Optional[str]}
|
||||
)
|
||||
|
||||
SHELL_ARGS = "_args"
|
||||
SHELL_ARGV = "_argv"
|
||||
@ -61,11 +70,14 @@ class Rule:
|
||||
from nonebot.utils import run_sync
|
||||
Rule(async_function, run_sync(sync_function))
|
||||
"""
|
||||
|
||||
__slots__ = ("checkers",)
|
||||
|
||||
HANDLER_PARAM_TYPES = [
|
||||
params.BotParam, params.EventParam, params.StateParam,
|
||||
params.DefaultParam
|
||||
params.BotParam,
|
||||
params.EventParam,
|
||||
params.StateParam,
|
||||
params.DefaultParam,
|
||||
]
|
||||
|
||||
def __init__(self, *checkers: Union[T_RuleChecker, Handler]) -> None:
|
||||
@ -76,9 +88,11 @@ class Rule:
|
||||
|
||||
"""
|
||||
self.checkers = set(
|
||||
checker if isinstance(checker, Handler) else Handler(
|
||||
checker, allow_types=self.HANDLER_PARAM_TYPES)
|
||||
for checker in checkers)
|
||||
checker
|
||||
if isinstance(checker, Handler)
|
||||
else Handler(checker, allow_types=self.HANDLER_PARAM_TYPES)
|
||||
for checker in checkers
|
||||
)
|
||||
"""
|
||||
:说明:
|
||||
|
||||
@ -95,8 +109,8 @@ class Rule:
|
||||
event: Event,
|
||||
state: T_State,
|
||||
stack: Optional[AsyncExitStack] = None,
|
||||
dependency_cache: Optional[Dict[Callable[..., Any],
|
||||
Any]] = None) -> bool:
|
||||
dependency_cache: Optional[Dict[Callable[..., Any], Any]] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
:说明:
|
||||
|
||||
@ -117,12 +131,17 @@ class Rule:
|
||||
if not self.checkers:
|
||||
return True
|
||||
results = await asyncio.gather(
|
||||
*(checker(bot=bot,
|
||||
event=event,
|
||||
state=state,
|
||||
_stack=stack,
|
||||
_dependency_cache=dependency_cache)
|
||||
for checker in self.checkers))
|
||||
*(
|
||||
checker(
|
||||
bot=bot,
|
||||
event=event,
|
||||
state=state,
|
||||
_stack=stack,
|
||||
_dependency_cache=dependency_cache,
|
||||
)
|
||||
for checker in self.checkers
|
||||
)
|
||||
)
|
||||
return all(results)
|
||||
|
||||
def __and__(self, other: Optional[Union["Rule", T_RuleChecker]]) -> "Rule":
|
||||
@ -156,8 +175,9 @@ class TrieRule:
|
||||
cls.suffix[suffix[::-1]] = value
|
||||
|
||||
@classmethod
|
||||
def get_value(cls, bot: Bot, event: Event,
|
||||
state: T_State) -> Tuple[CMD_RESULT, CMD_RESULT]:
|
||||
def get_value(
|
||||
cls, bot: Bot, event: Event, state: T_State
|
||||
) -> Tuple[CMD_RESULT, CMD_RESULT]:
|
||||
prefix = CMD_RESULT(command=None, raw_command=None)
|
||||
suffix = CMD_RESULT(command=None, raw_command=None)
|
||||
state[PREFIX_KEY] = prefix
|
||||
@ -180,8 +200,7 @@ class TrieRule:
|
||||
return prefix, suffix
|
||||
|
||||
|
||||
def startswith(msg: Union[str, Tuple[str, ...]],
|
||||
ignorecase: bool = False) -> Rule:
|
||||
def startswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
|
||||
"""
|
||||
:说明:
|
||||
|
||||
@ -196,7 +215,8 @@ def startswith(msg: Union[str, Tuple[str, ...]],
|
||||
|
||||
pattern = re.compile(
|
||||
f"^(?:{'|'.join(re.escape(prefix) for prefix in msg)})",
|
||||
re.IGNORECASE if ignorecase else 0)
|
||||
re.IGNORECASE if ignorecase else 0,
|
||||
)
|
||||
|
||||
async def _startswith(bot: Bot, event: Event, state: T_State) -> bool:
|
||||
if event.get_type() != "message":
|
||||
@ -207,8 +227,7 @@ def startswith(msg: Union[str, Tuple[str, ...]],
|
||||
return Rule(_startswith)
|
||||
|
||||
|
||||
def endswith(msg: Union[str, Tuple[str, ...]],
|
||||
ignorecase: bool = False) -> Rule:
|
||||
def endswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
|
||||
"""
|
||||
:说明:
|
||||
|
||||
@ -223,7 +242,8 @@ def endswith(msg: Union[str, Tuple[str, ...]],
|
||||
|
||||
pattern = re.compile(
|
||||
f"(?:{'|'.join(re.escape(prefix) for prefix in msg)})$",
|
||||
re.IGNORECASE if ignorecase else 0)
|
||||
re.IGNORECASE if ignorecase else 0,
|
||||
)
|
||||
|
||||
async def _endswith(bot: Bot, event: Event, state: T_State) -> bool:
|
||||
if event.get_type() != "message":
|
||||
@ -314,19 +334,22 @@ class ArgumentParser(ArgParser):
|
||||
setattr(self, "message", old_message)
|
||||
|
||||
def exit(self, status: int = 0, message: Optional[str] = None):
|
||||
raise ParserExit(status=status,
|
||||
message=message or getattr(self, "message", None))
|
||||
raise ParserExit(
|
||||
status=status, message=message or getattr(self, "message", None)
|
||||
)
|
||||
|
||||
def parse_args(self,
|
||||
args: Optional[Sequence[str]] = None,
|
||||
namespace: Optional[Namespace] = None) -> Namespace:
|
||||
def parse_args(
|
||||
self,
|
||||
args: Optional[Sequence[str]] = None,
|
||||
namespace: Optional[Namespace] = None,
|
||||
) -> Namespace:
|
||||
setattr(self, "message", "")
|
||||
return super().parse_args(args=args,
|
||||
namespace=namespace) # type: ignore
|
||||
return super().parse_args(args=args, namespace=namespace) # type: ignore
|
||||
|
||||
|
||||
def shell_command(*cmds: Union[str, Tuple[str, ...]],
|
||||
parser: Optional[ArgumentParser] = None) -> Rule:
|
||||
def shell_command(
|
||||
*cmds: Union[str, Tuple[str, ...]], parser: Optional[ArgumentParser] = None
|
||||
) -> Rule:
|
||||
r"""
|
||||
:说明:
|
||||
|
||||
@ -361,8 +384,7 @@ def shell_command(*cmds: Union[str, Tuple[str, ...]],
|
||||
\:\:\:
|
||||
"""
|
||||
if not isinstance(parser, ArgumentParser):
|
||||
raise TypeError(
|
||||
"`parser` must be an instance of nonebot.rule.ArgumentParser")
|
||||
raise TypeError("`parser` must be an instance of nonebot.rule.ArgumentParser")
|
||||
|
||||
config = get_driver().config
|
||||
command_start = config.command_start
|
||||
@ -382,8 +404,7 @@ def shell_command(*cmds: Union[str, Tuple[str, ...]],
|
||||
async def _shell_command(event: Event, state: T_State) -> bool:
|
||||
if state[PREFIX_KEY][CMD_KEY] in commands:
|
||||
message = str(event.get_message())
|
||||
strip_message = message[len(state[PREFIX_KEY][RAW_CMD_KEY]
|
||||
):].lstrip()
|
||||
strip_message = message[len(state[PREFIX_KEY][RAW_CMD_KEY]) :].lstrip()
|
||||
state[SHELL_ARGV] = shlex.split(strip_message)
|
||||
if parser:
|
||||
try:
|
||||
|
Reference in New Issue
Block a user