improve command

This commit is contained in:
yanyongyu 2020-12-30 12:42:10 +08:00
parent 5028f6408a
commit d4f344d247
2 changed files with 144 additions and 134 deletions

View File

@ -14,6 +14,7 @@ from typing import Any, Dict, Union, TypeVar, Optional, Callable, Iterable, Awai
from pydantic import BaseModel from pydantic import BaseModel
from nonebot.config import Config from nonebot.config import Config
from nonebot.utils import DataclassEncoder
if TYPE_CHECKING: if TYPE_CHECKING:
from nonebot.drivers import Driver, WebSocket from nonebot.drivers import Driver, WebSocket
@ -138,135 +139,6 @@ class Bot(abc.ABC):
raise NotImplementedError raise NotImplementedError
class Event(abc.ABC, BaseModel):
"""Event 基类。提供获取关键信息的方法,其余信息可直接获取。"""
class Config:
extra = "allow"
@abc.abstractmethod
def get_type(self) -> Literal["message", "notice", "request", "meta_event"]:
"""
:说明:
获取事件类型的方法类型通常为 NoneBot 内置的四种类型
:返回:
* ``Literal["message", "notice", "request", "meta_event"]``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_name(self) -> str:
"""
:说明:
获取事件名称的方法
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_description(self) -> str:
"""
:说明:
获取事件描述的方法通常为事件具体内容
:返回:
* ``str``
"""
raise NotImplementedError
def __str__(self) -> str:
return f"[{self.get_event_name()}]: {self.get_event_description()}"
def get_log_string(self) -> str:
"""
:说明:
获取事件日志信息的方法通常你不需要修改这个方法只有当希望 NoneBot 隐藏该事件日志时可以抛出 ``NoLogException`` 异常
:返回:
* ``str``
:异常:
- ``NoLogException``
"""
return f"[{self.get_event_name()}]: {self.get_event_description()}"
@abc.abstractmethod
def get_user_id(self) -> str:
"""
:说明:
获取事件主体 id 的方法通常是用户 id
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_session_id(self) -> str:
"""
:说明:
获取会话 id 的方法用于判断当前事件属于哪一个会话通常是用户 id群组 id 组合
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_message(self) -> "Message":
"""
:说明:
获取事件消息内容的方法
:返回:
* ``Message``
"""
raise NotImplementedError
def get_plaintext(self) -> str:
"""
:说明:
获取消息纯文本的方法通常不需要修改默认通过 ``get_message().extract_plain_text`` 获取
:返回:
* ``str``
"""
return self.get_message().extract_plain_text()
@abc.abstractmethod
def is_tome(self) -> bool:
"""
:说明:
获取事件是否与机器人有关的方法
:返回:
* ``bool``
"""
raise NotImplementedError
T_Message = TypeVar("T_Message", bound="Message") T_Message = TypeVar("T_Message", bound="Message")
T_MessageSegment = TypeVar("T_MessageSegment", bound="MessageSegment") T_MessageSegment = TypeVar("T_MessageSegment", bound="MessageSegment")
@ -446,3 +318,133 @@ class Message(list, abc.ABC):
plain_text = reduce(_concat, self, "") plain_text = reduce(_concat, self, "")
return plain_text[1:] if plain_text else plain_text return plain_text[1:] if plain_text else plain_text
class Event(abc.ABC, BaseModel):
"""Event 基类。提供获取关键信息的方法,其余信息可直接获取。"""
class Config:
extra = "allow"
json_encoders = {Message: DataclassEncoder}
@abc.abstractmethod
def get_type(self) -> Literal["message", "notice", "request", "meta_event"]:
"""
:说明:
获取事件类型的方法类型通常为 NoneBot 内置的四种类型
:返回:
* ``Literal["message", "notice", "request", "meta_event"]``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_name(self) -> str:
"""
:说明:
获取事件名称的方法
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_description(self) -> str:
"""
:说明:
获取事件描述的方法通常为事件具体内容
:返回:
* ``str``
"""
raise NotImplementedError
def __str__(self) -> str:
return f"[{self.get_event_name()}]: {self.get_event_description()}"
def get_log_string(self) -> str:
"""
:说明:
获取事件日志信息的方法通常你不需要修改这个方法只有当希望 NoneBot 隐藏该事件日志时可以抛出 ``NoLogException`` 异常
:返回:
* ``str``
:异常:
- ``NoLogException``
"""
return f"[{self.get_event_name()}]: {self.get_event_description()}"
@abc.abstractmethod
def get_user_id(self) -> str:
"""
:说明:
获取事件主体 id 的方法通常是用户 id
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_session_id(self) -> str:
"""
:说明:
获取会话 id 的方法用于判断当前事件属于哪一个会话通常是用户 id群组 id 组合
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_message(self) -> "Message":
"""
:说明:
获取事件消息内容的方法
:返回:
* ``Message``
"""
raise NotImplementedError
def get_plaintext(self) -> str:
"""
:说明:
获取消息纯文本的方法通常不需要修改默认通过 ``get_message().extract_plain_text`` 获取
:返回:
* ``str``
"""
return self.get_message().extract_plain_text()
@abc.abstractmethod
def is_tome(self) -> bool:
"""
:说明:
获取事件是否与机器人有关的方法
:返回:
* ``bool``
"""
raise NotImplementedError

View File

@ -13,7 +13,7 @@ from types import ModuleType
from dataclasses import dataclass from dataclasses import dataclass
from importlib._bootstrap import _load from importlib._bootstrap import _load
from contextvars import Context, ContextVar, copy_context from contextvars import Context, ContextVar, copy_context
from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional, TYPE_CHECKING
from nonebot.log import logger from nonebot.log import logger
from nonebot.matcher import Matcher from nonebot.matcher import Matcher
@ -21,6 +21,9 @@ from nonebot.permission import Permission
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
from nonebot.rule import Rule, startswith, endswith, keyword, command, regex from nonebot.rule import Rule, startswith, endswith, keyword, command, regex
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
plugins: Dict[str, "Plugin"] = {} plugins: Dict[str, "Plugin"] = {}
""" """
:类型: ``Dict[str, Plugin]`` :类型: ``Dict[str, Plugin]``
@ -417,10 +420,15 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
- ``Type[Matcher]`` - ``Type[Matcher]``
""" """
async def _strip_cmd(bot, event, state: T_State): async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.message print(event.dict())
event.message = message.__class__( message = event.get_message()
str(message)[len(state["_prefix"]["raw_command"]):].strip()) segment = message.pop(0)
new_message = message.__class__(
str(segment)
[len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", []) handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd) handlers.insert(0, _strip_cmd)