mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-30 01:30:02 +00:00
🎉 init feishu adapter
This commit is contained in:
0
packages/nonebot-adapter-feishu/nonebot/__init__.py
Normal file
0
packages/nonebot-adapter-feishu/nonebot/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
from .bot import Bot
|
||||
from .message import Message, MessageSegment
|
||||
from .event import Event
|
@ -0,0 +1,45 @@
|
||||
from typing import Any, Union, Optional, TYPE_CHECKING
|
||||
|
||||
from nonebot.log import logger
|
||||
from nonebot.message import handle_event
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.exception import RequestDenied
|
||||
|
||||
from .event import Event
|
||||
from .message import Message, MessageSegment
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nonebot.drivers import Driver
|
||||
|
||||
|
||||
class Bot(BaseBot):
|
||||
"""
|
||||
飞书 协议 Bot 适配。继承属性参考 `BaseBot <./#class-basebot>`_ 。
|
||||
"""
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
return "feishu"
|
||||
|
||||
@classmethod
|
||||
async def check_permission(cls, driver: "Driver", connection_type: str,
|
||||
headers: dict, body: Optional[dict]) -> str:
|
||||
# raise RequestDenied(401, "reason")
|
||||
return "bot id"
|
||||
|
||||
async def handle_message(self, message: dict):
|
||||
try:
|
||||
event = Event.parse_obj(message)
|
||||
await handle_event(self, event)
|
||||
except Exception as e:
|
||||
logger.opt(colors=True, exception=e).error(
|
||||
f"<r><bg #f8bbd0>Failed to handle event. Raw: {message}</bg #f8bbd0></r>"
|
||||
)
|
||||
|
||||
async def _call_api(self, api: str, **data) -> Any:
|
||||
raise NotImplementedError
|
||||
|
||||
async def send(self, event: Event, message: Union[str, Message,
|
||||
MessageSegment],
|
||||
**kwargs) -> Any:
|
||||
raise NotImplementedError
|
@ -0,0 +1,31 @@
|
||||
from pydantic import BaseModel
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
|
||||
from .message import Message
|
||||
|
||||
|
||||
class Event(BaseEvent):
|
||||
|
||||
def get_type(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_event_name(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_event_description(self) -> str:
|
||||
return str(self.dict())
|
||||
|
||||
def get_message(self) -> Message:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_plaintext(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_user_id(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_session_id(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def is_tome(self) -> bool:
|
||||
return False
|
@ -0,0 +1,26 @@
|
||||
from typing import Union, Mapping, Iterable
|
||||
from nonebot.adapters import Message as BaseMessage, MessageSegment as BaseMessageSegment
|
||||
|
||||
|
||||
class MessageSegment(BaseMessageSegment):
|
||||
|
||||
def __str__(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def __add__(self, other) -> "Message":
|
||||
return Message(self) + other
|
||||
|
||||
def __radd__(self, other) -> "Message":
|
||||
return Message(other) + self
|
||||
|
||||
def is_text(self) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class Message(BaseMessage):
|
||||
|
||||
@staticmethod
|
||||
def _construct(
|
||||
msg: Union[str, Mapping,
|
||||
Iterable[Mapping]]) -> Iterable[MessageSegment]:
|
||||
raise NotImplementedError
|
Reference in New Issue
Block a user