Feature: 为消息类添加 has join include exclude 方法 (#1895)

This commit is contained in:
Ju4tCode
2023-04-04 21:42:01 +08:00
committed by GitHub
parent 20820e72ad
commit 1817102a7c
6 changed files with 356 additions and 54 deletions

View File

@ -1,5 +1,6 @@
import abc
from copy import deepcopy
from typing_extensions import Self
from dataclasses import field, asdict, dataclass
from typing import (
Any,
@ -12,6 +13,7 @@ from typing import (
TypeVar,
Iterable,
Optional,
SupportsIndex,
overload,
)
@ -19,7 +21,6 @@ from pydantic import parse_obj_as
from .template import MessageTemplate
T = TypeVar("T")
TMS = TypeVar("TMS", bound="MessageSegment")
TM = TypeVar("TM", bound="Message")
@ -47,7 +48,7 @@ class MessageSegment(abc.ABC, Generic[TM]):
def __len__(self) -> int:
return len(str(self))
def __ne__(self: T, other: T) -> bool:
def __ne__(self, other: Self) -> bool:
return not self == other
def __add__(self: TMS, other: Union[str, TMS, Iterable[TMS]]) -> TM:
@ -61,7 +62,7 @@ class MessageSegment(abc.ABC, Generic[TM]):
yield cls._validate
@classmethod
def _validate(cls, value):
def _validate(cls, value) -> Self:
if isinstance(value, cls):
return value
if not isinstance(value, dict):
@ -84,7 +85,10 @@ class MessageSegment(abc.ABC, Generic[TM]):
def items(self):
return asdict(self).items()
def copy(self: T) -> T:
def join(self: TMS, iterable: Iterable[Union[TMS, TM]]) -> TM:
return self.get_message_class()(self).join(iterable)
def copy(self) -> Self:
return deepcopy(self)
@abc.abstractmethod
@ -117,7 +121,7 @@ class Message(List[TMS], abc.ABC):
self.extend(self._construct(message)) # pragma: no cover
@classmethod
def template(cls: Type[TM], format_string: Union[str, TM]) -> MessageTemplate[TM]:
def template(cls, format_string: Union[str, TM]) -> MessageTemplate[Self]:
"""创建消息模板。
用法和 `str.format` 大致相同, 但是可以输出消息对象, 并且支持以 `Message` 对象作为消息模板
@ -146,7 +150,7 @@ class Message(List[TMS], abc.ABC):
yield cls._validate
@classmethod
def _validate(cls, value):
def _validate(cls, value) -> Self:
if isinstance(value, cls):
return value
elif isinstance(value, Message):
@ -169,16 +173,16 @@ class Message(List[TMS], abc.ABC):
"""构造消息数组"""
raise NotImplementedError
def __add__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
def __add__(self, other: Union[str, TMS, Iterable[TMS]]) -> Self:
result = self.copy()
result += other
return result
def __radd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
def __radd__(self, other: Union[str, TMS, Iterable[TMS]]) -> Self:
result = self.__class__(other)
return result + self
def __iadd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
def __iadd__(self, other: Union[str, TMS, Iterable[TMS]]) -> Self:
if isinstance(other, str):
self.extend(self._construct(other))
elif isinstance(other, MessageSegment):
@ -190,57 +194,62 @@ class Message(List[TMS], abc.ABC):
return self
@overload
def __getitem__(self: TM, __args: str) -> TM:
"""
def __getitem__(self, args: str) -> Self:
"""获取仅包含指定消息段类型的消息
参数:
__args: 消息段类型
args: 消息段类型
返回:
所有类型为 `__args` 的消息段
所有类型为 `args` 的消息段
"""
@overload
def __getitem__(self, __args: Tuple[str, int]) -> TMS:
"""
def __getitem__(self, args: Tuple[str, int]) -> TMS:
"""索引指定类型的消息段
参数:
__args: 消息段类型和索引
args: 消息段类型和索引
返回:
类型为 `__args[0]` 的消息段第 `__args[1]` 个
类型为 `args[0]` 的消息段第 `args[1]` 个
"""
@overload
def __getitem__(self: TM, __args: Tuple[str, slice]) -> TM:
"""
def __getitem__(self, args: Tuple[str, slice]) -> Self:
"""切片指定类型的消息段
参数:
__args: 消息段类型和切片
args: 消息段类型和切片
返回:
类型为 `__args[0]` 的消息段切片 `__args[1]`
类型为 `args[0]` 的消息段切片 `args[1]`
"""
@overload
def __getitem__(self, __args: int) -> TMS:
"""
def __getitem__(self, args: int) -> TMS:
"""索引消息段
参数:
__args: 索引
args: 索引
返回:
第 `__args` 个消息段
第 `args` 个消息段
"""
@overload
def __getitem__(self: TM, __args: slice) -> TM:
"""
def __getitem__(self, args: slice) -> Self:
"""切片消息段
参数:
__args: 切片
args: 切片
返回:
消息切片 `__args`
消息切片 `args`
"""
def __getitem__(
self: TM,
self,
args: Union[
str,
Tuple[str, int],
@ -248,7 +257,7 @@ class Message(List[TMS], abc.ABC):
int,
slice,
],
) -> Union[TMS, TM]:
) -> Union[TMS, Self]:
arg1, arg2 = args if isinstance(args, tuple) else (args, None)
if isinstance(arg1, int) and arg2 is None:
return super().__getitem__(arg1)
@ -263,15 +272,52 @@ class Message(List[TMS], abc.ABC):
else:
raise ValueError("Incorrect arguments to slice") # pragma: no cover
def index(self, value: Union[TMS, str], *args) -> int:
def __contains__(self, value: Union[TMS, str]) -> bool:
"""检查消息段是否存在
参数:
value: 消息段或消息段类型
返回:
消息内是否存在给定消息段或给定类型的消息段
"""
if isinstance(value, str):
return bool(next((seg for seg in self if seg.type == value), None))
return super().__contains__(value)
def has(self, value: Union[TMS, str]) -> bool:
"""{ref}``__contains__` <nonebot.adapters.Message.__contains__>` 相同"""
return value in self
def index(self, value: Union[TMS, str], *args: SupportsIndex) -> int:
"""索引消息段
参数:
value: 消息段或者消息段类型
arg: start 与 end
返回:
索引 index
异常:
ValueError: 消息段不存在
"""
if isinstance(value, str):
first_segment = next((seg for seg in self if seg.type == value), None)
if first_segment is None:
raise ValueError(f"Segment with type {value} is not in message")
raise ValueError(f"Segment with type {value!r} is not in message")
return super().index(first_segment, *args)
return super().index(value, *args)
def get(self: TM, type_: str, count: Optional[int] = None) -> TM:
def get(self, type_: str, count: Optional[int] = None) -> Self:
"""获取指定类型的消息段
参数:
type_: 消息段类型
count: 获取个数
返回:
构建的新消息
"""
if count is None:
return self[type_]
@ -286,9 +332,30 @@ class Message(List[TMS], abc.ABC):
return filtered
def count(self, value: Union[TMS, str]) -> int:
"""计算指定消息段的个数
参数:
value: 消息段或消息段类型
返回:
个数
"""
return len(self[value]) if isinstance(value, str) else super().count(value)
def append(self: TM, obj: Union[str, TMS]) -> TM:
def only(self, value: Union[TMS, str]) -> bool:
"""检查消息中是否仅包含指定消息段
参数:
value: 指定消息段或消息段类型
返回:
是否仅包含指定消息段
"""
if isinstance(value, str):
return all(seg.type == value for seg in self)
return all(seg == value for seg in self)
def append(self, obj: Union[str, TMS]) -> Self:
"""添加一个消息段到消息数组末尾。
参数:
@ -302,7 +369,7 @@ class Message(List[TMS], abc.ABC):
raise ValueError(f"Unexpected type: {type(obj)} {obj}") # pragma: no cover
return self
def extend(self: TM, obj: Union[TM, Iterable[TMS]]) -> TM:
def extend(self, obj: Union[Self, Iterable[TMS]]) -> Self:
"""拼接一个消息数组或多个消息段到消息数组末尾。
参数:
@ -312,18 +379,52 @@ class Message(List[TMS], abc.ABC):
self.append(segment)
return self
def copy(self: TM) -> TM:
def join(self, iterable: Iterable[Union[TMS, Self]]) -> Self:
"""将多个消息连接并将自身作为分割
参数:
iterable: 要连接的消息
返回:
连接后的消息
"""
ret = self.__class__()
for index, msg in enumerate(iterable):
if index != 0:
ret.extend(self)
if isinstance(msg, MessageSegment):
ret.append(msg.copy())
else:
ret.extend(msg.copy())
return ret
def copy(self) -> Self:
"""深拷贝消息"""
return deepcopy(self)
def include(self, *types: str) -> Self:
"""过滤消息
参数:
types: 包含的消息段类型
返回:
新构造的消息
"""
return self.__class__(seg for seg in self if seg.type in types)
def exclude(self, *types: str) -> Self:
"""过滤消息
参数:
types: 不包含的消息段类型
返回:
新构造的消息
"""
return self.__class__(seg for seg in self if seg.type not in types)
def extract_plain_text(self) -> str:
"""提取消息内纯文本消息"""
return "".join(str(seg) for seg in self if seg.is_text())
__autodoc__ = {
"MessageSegment.__str__": True,
"MessageSegment.__add__": True,
"Message.__getitem__": True,
"Message._construct": True,
}