mirror of
				https://github.com/nonebot/nonebot2.git
				synced 2025-10-31 06:56:39 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			186 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			186 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from copy import copy
 | ||
| from typing import Any, Dict, Type, Union, Mapping, Iterable
 | ||
| 
 | ||
| from nonebot.typing import overrides
 | ||
| from nonebot.adapters import Message as BaseMessage, MessageSegment as BaseMessageSegment
 | ||
| 
 | ||
| 
 | ||
| class MessageSegment(BaseMessageSegment["Message"]):
 | ||
|     """
 | ||
|     钉钉 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
 | ||
|     """
 | ||
| 
 | ||
|     @classmethod
 | ||
|     @overrides(BaseMessageSegment)
 | ||
|     def get_message_class(cls) -> Type["Message"]:
 | ||
|         return Message
 | ||
| 
 | ||
|     @overrides(BaseMessageSegment)
 | ||
|     def __str__(self) -> str:
 | ||
|         if self.type == "text":
 | ||
|             return str(self.data["content"])
 | ||
|         elif self.type == "markdown":
 | ||
|             return str(self.data["text"])
 | ||
|         return ""
 | ||
| 
 | ||
|     @overrides(BaseMessageSegment)
 | ||
|     def is_text(self) -> bool:
 | ||
|         return self.type == "text"
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def atAll() -> "MessageSegment":
 | ||
|         """@全体"""
 | ||
|         return MessageSegment("at", {"isAtAll": True})
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def atMobiles(*mobileNumber: str) -> "MessageSegment":
 | ||
|         """@指定手机号人员"""
 | ||
|         return MessageSegment("at", {"atMobiles": list(mobileNumber)})
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def atDingtalkIds(*dingtalkIds: str) -> "MessageSegment":
 | ||
|         """@指定 id,@ 默认会在消息段末尾。
 | ||
|         所以你可以在消息中使用 @{senderId} 占位,发送出去之后 @ 就会出现在占位的位置:
 | ||
|         ```python
 | ||
|         message = MessageSegment.text(f"@{event.senderId},你好")
 | ||
|         message += MessageSegment.atDingtalkIds(event.senderId)
 | ||
|         ```
 | ||
|         """
 | ||
|         return MessageSegment("at", {"atDingtalkIds": list(dingtalkIds)})
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def text(text: str) -> "MessageSegment":
 | ||
|         """发送 ``text`` 类型消息"""
 | ||
|         return MessageSegment("text", {"content": text})
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def image(picURL: str) -> "MessageSegment":
 | ||
|         """发送 ``image`` 类型消息"""
 | ||
|         return MessageSegment("image", {"picURL": picURL})
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def extension(dict_: dict) -> "MessageSegment":
 | ||
|         """"标记 text 文本的 extension 属性,需要与 text 消息段相加。"""
 | ||
|         return MessageSegment("extension", dict_)
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def code(code_language: str, code: str) -> "Message":
 | ||
|         """"发送 code 消息段"""
 | ||
|         message = MessageSegment.text(code)
 | ||
|         message += MessageSegment.extension({
 | ||
|             "text_type": "code_snippet",
 | ||
|             "code_language": code_language
 | ||
|         })
 | ||
|         return message
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def markdown(title: str, text: str) -> "MessageSegment":
 | ||
|         """发送 ``markdown`` 类型消息"""
 | ||
|         return MessageSegment(
 | ||
|             "markdown",
 | ||
|             {
 | ||
|                 "title": title,
 | ||
|                 "text": text,
 | ||
|             },
 | ||
|         )
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def actionCardSingleBtn(title: str, text: str, singleTitle: str,
 | ||
|                             singleURL) -> "MessageSegment":
 | ||
|         """发送 ``actionCardSingleBtn`` 类型消息"""
 | ||
|         return MessageSegment(
 | ||
|             "actionCard", {
 | ||
|                 "title": title,
 | ||
|                 "text": text,
 | ||
|                 "singleTitle": singleTitle,
 | ||
|                 "singleURL": singleURL
 | ||
|             })
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def actionCardMultiBtns(
 | ||
|         title: str,
 | ||
|         text: str,
 | ||
|         btns: list,
 | ||
|         hideAvatar: bool = False,
 | ||
|         btnOrientation: str = '1',
 | ||
|     ) -> "MessageSegment":
 | ||
|         """
 | ||
|         发送 ``actionCardMultiBtn`` 类型消息
 | ||
| 
 | ||
|         :参数:
 | ||
| 
 | ||
|             * ``btnOrientation``: 0:按钮竖直排列 1:按钮横向排列
 | ||
|             * ``btns``: [{ "title": title, "actionURL": actionURL }, ...]
 | ||
|         """
 | ||
|         return MessageSegment(
 | ||
|             "actionCard", {
 | ||
|                 "title": title,
 | ||
|                 "text": text,
 | ||
|                 "hideAvatar": "1" if hideAvatar else "0",
 | ||
|                 "btnOrientation": btnOrientation,
 | ||
|                 "btns": btns
 | ||
|             })
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def feedCard(links: list) -> "MessageSegment":
 | ||
|         """
 | ||
|         发送 ``feedCard`` 类型消息
 | ||
| 
 | ||
|         :参数:
 | ||
| 
 | ||
|             * ``links``: [{ "title": xxx, "messageURL": xxx, "picURL": xxx }, ...]
 | ||
|         """
 | ||
|         return MessageSegment("feedCard", {"links": links})
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def raw(data) -> "MessageSegment":
 | ||
|         return MessageSegment('raw', data)
 | ||
| 
 | ||
|     def to_dict(self) -> Dict[str, Any]:
 | ||
|         # 让用户可以直接发送原始的消息格式
 | ||
|         if self.type == "raw":
 | ||
|             return copy(self.data)
 | ||
| 
 | ||
|         # 不属于消息内容,只是作为消息段的辅助
 | ||
|         if self.type in ["at", "extension"]:
 | ||
|             return {self.type: copy(self.data)}
 | ||
| 
 | ||
|         return {"msgtype": self.type, self.type: copy(self.data)}
 | ||
| 
 | ||
| 
 | ||
| class Message(BaseMessage[MessageSegment]):
 | ||
|     """
 | ||
|     钉钉 协议 Message 适配。
 | ||
|     """
 | ||
| 
 | ||
|     @classmethod
 | ||
|     @overrides(BaseMessage)
 | ||
|     def get_segment_class(cls) -> Type[MessageSegment]:
 | ||
|         return MessageSegment
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     @overrides(BaseMessage)
 | ||
|     def _construct(
 | ||
|         msg: Union[str, Mapping,
 | ||
|                    Iterable[Mapping]]) -> Iterable[MessageSegment]:
 | ||
|         if isinstance(msg, Mapping):
 | ||
|             yield MessageSegment(msg["type"], msg.get("data") or {})
 | ||
|         elif isinstance(msg, str):
 | ||
|             yield MessageSegment.text(msg)
 | ||
|         elif isinstance(msg, Iterable):
 | ||
|             for seg in msg:
 | ||
|                 yield MessageSegment(seg["type"], seg.get("data") or {})
 | ||
| 
 | ||
|     def _produce(self) -> dict:
 | ||
|         data = {}
 | ||
|         segment: MessageSegment
 | ||
|         for segment in self:
 | ||
|             # text 可以和 text 合并
 | ||
|             if segment.type == "text" and data.get("msgtype") == 'text':
 | ||
|                 data.setdefault("text", {})
 | ||
|                 data["text"]["content"] = data["text"].setdefault(
 | ||
|                     "content", "") + segment.data["content"]
 | ||
|             else:
 | ||
|                 data.update(segment.to_dict())
 | ||
|         return data
 |