mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-28 00:31:14 +00:00
add message segment for coolq
This commit is contained in:
@ -1,19 +1,24 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from typing import Any, Dict, Optional
|
||||
import abc
|
||||
from functools import reduce
|
||||
from typing import Dict, Union, Iterable, Optional
|
||||
|
||||
from nonebot.config import Config
|
||||
|
||||
|
||||
class BaseBot(object):
|
||||
class BaseBot(abc.ABC):
|
||||
|
||||
@abc.abstractmethod
|
||||
def __init__(self, type: str, config: Config, *, websocket=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
async def handle_message(self, message: dict):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
async def call_api(self, api: str, data: dict):
|
||||
raise NotImplementedError
|
||||
|
||||
@ -64,8 +69,73 @@ class BaseMessageSegment(dict):
|
||||
|
||||
class BaseMessage(list):
|
||||
|
||||
def __init__(self, message: str = None):
|
||||
raise NotImplementedError
|
||||
def __init__(self,
|
||||
message: Union[str, BaseMessageSegment, "BaseMessage"] = None,
|
||||
*args,
|
||||
**kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
if isinstance(message, str):
|
||||
self.extend(self._construct(message))
|
||||
elif isinstance(message, BaseMessage):
|
||||
self.extend(message)
|
||||
elif isinstance(message, BaseMessageSegment):
|
||||
self.append(message)
|
||||
|
||||
def __str__(self):
|
||||
return ''.join((str(seg) for seg in self))
|
||||
|
||||
@staticmethod
|
||||
def _construct(msg: str) -> Iterable[BaseMessageSegment]:
|
||||
raise NotImplementedError
|
||||
|
||||
def __add__(
|
||||
self, other: Union[str, BaseMessageSegment,
|
||||
"BaseMessage"]) -> "BaseMessage":
|
||||
result = self.__class__(self)
|
||||
if isinstance(other, str):
|
||||
result.extend(self._construct(other))
|
||||
elif isinstance(other, BaseMessageSegment):
|
||||
result.append(other)
|
||||
elif isinstance(other, BaseMessage):
|
||||
result.extend(other)
|
||||
return result
|
||||
|
||||
def __radd__(self, other: Union[str, BaseMessageSegment, "BaseMessage"]):
|
||||
result = self.__class__(other)
|
||||
return result.__add__(self)
|
||||
|
||||
def append(self, obj: Union[str, BaseMessageSegment]) -> "BaseMessage":
|
||||
if isinstance(obj, BaseMessageSegment):
|
||||
if obj.type == "text" and self and self[-1].type == "text":
|
||||
self[-1].data["text"] += obj.data["text"]
|
||||
else:
|
||||
super().append(obj)
|
||||
elif isinstance(obj, str):
|
||||
self.extend(self._construct(obj))
|
||||
else:
|
||||
raise ValueError(f"Unexpected type: {type(obj)} {obj}")
|
||||
return self
|
||||
|
||||
def extend(
|
||||
self, obj: Union["BaseMessage",
|
||||
Iterable[BaseMessageSegment]]) -> "BaseMessage":
|
||||
for segment in obj:
|
||||
self.append(segment)
|
||||
return self
|
||||
|
||||
def reduce(self) -> None:
|
||||
index = 0
|
||||
while index < len(self):
|
||||
if index > 0 and self[
|
||||
index - 1].type == "text" and self[index].type == "text":
|
||||
self[index - 1].data["text"] += self[index].data["text"]
|
||||
del self[index]
|
||||
else:
|
||||
index += 1
|
||||
|
||||
def extract_plain_text(self) -> str:
|
||||
|
||||
def _concat(x: str, y: BaseMessageSegment) -> str:
|
||||
return f"{x} {y.data['text']}" if y.type == "text" else x
|
||||
|
||||
return reduce(_concat, self, "")
|
||||
|
Reference in New Issue
Block a user