mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-09-07 04:26:45 +00:00
@ -3,14 +3,15 @@ import sys
|
||||
import hmac
|
||||
import json
|
||||
import asyncio
|
||||
from typing import Any, Dict, Union, Optional, TYPE_CHECKING
|
||||
from typing import Any, Dict, Tuple, Union, Optional, TYPE_CHECKING
|
||||
|
||||
import httpx
|
||||
from nonebot.log import logger
|
||||
from nonebot.typing import overrides
|
||||
from nonebot.message import handle_event
|
||||
from nonebot.utils import DataclassEncoder
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.exception import RequestDenied
|
||||
from nonebot.drivers import Driver, HTTPConnection, HTTPRequest, HTTPResponse, WebSocket
|
||||
|
||||
from .utils import log, escape
|
||||
from .config import Config as CQHTTPConfig
|
||||
@ -20,7 +21,6 @@ from .exception import NetworkError, ApiNotAvailable, ActionFailed
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nonebot.config import Config
|
||||
from nonebot.drivers import Driver, WebSocket
|
||||
|
||||
|
||||
def get_auth_bearer(access_token: Optional[str] = None) -> Optional[str]:
|
||||
@ -28,7 +28,7 @@ def get_auth_bearer(access_token: Optional[str] = None) -> Optional[str]:
|
||||
return None
|
||||
scheme, _, param = access_token.partition(" ")
|
||||
if scheme.lower() not in ["bearer", "token"]:
|
||||
raise RequestDenied(401, "Not authenticated")
|
||||
return None
|
||||
return param
|
||||
|
||||
|
||||
@ -225,14 +225,6 @@ class Bot(BaseBot):
|
||||
"""
|
||||
cqhttp_config: CQHTTPConfig
|
||||
|
||||
def __init__(self,
|
||||
connection_type: str,
|
||||
self_id: str,
|
||||
*,
|
||||
websocket: Optional["WebSocket"] = None):
|
||||
|
||||
super().__init__(connection_type, self_id, websocket=websocket)
|
||||
|
||||
@property
|
||||
@overrides(BaseBot)
|
||||
def type(self) -> str:
|
||||
@ -242,84 +234,84 @@ class Bot(BaseBot):
|
||||
return "cqhttp"
|
||||
|
||||
@classmethod
|
||||
def register(cls, driver: "Driver", config: "Config"):
|
||||
def register(cls, driver: Driver, config: "Config"):
|
||||
super().register(driver, config)
|
||||
cls.cqhttp_config = CQHTTPConfig(**config.dict())
|
||||
|
||||
@classmethod
|
||||
@overrides(BaseBot)
|
||||
async def check_permission(cls, driver: "Driver", connection_type: str,
|
||||
headers: dict, body: Optional[bytes]) -> str:
|
||||
async def check_permission(
|
||||
cls, driver: Driver,
|
||||
request: HTTPConnection) -> Tuple[Optional[str], HTTPResponse]:
|
||||
"""
|
||||
:说明:
|
||||
|
||||
CQHTTP (OneBot) 协议鉴权。参考 `鉴权 <https://github.com/howmanybots/onebot/blob/master/v11/specs/communication/authorization.md>`_
|
||||
"""
|
||||
x_self_id = headers.get("x-self-id")
|
||||
x_signature = headers.get("x-signature")
|
||||
token = get_auth_bearer(headers.get("authorization"))
|
||||
x_self_id = request.headers.get("x-self-id")
|
||||
x_signature = request.headers.get("x-signature")
|
||||
token = get_auth_bearer(request.headers.get("authorization"))
|
||||
cqhttp_config = CQHTTPConfig(**driver.config.dict())
|
||||
|
||||
# 检查连接方式
|
||||
if connection_type not in ["http", "websocket"]:
|
||||
log("WARNING", "Unsupported connection type")
|
||||
raise RequestDenied(405, "Unsupported connection type")
|
||||
|
||||
# 检查self_id
|
||||
if not x_self_id:
|
||||
log("WARNING", "Missing X-Self-ID Header")
|
||||
raise RequestDenied(400, "Missing X-Self-ID Header")
|
||||
return None, HTTPResponse(400, b"Missing X-Self-ID Header")
|
||||
|
||||
# 检查签名
|
||||
secret = cqhttp_config.secret
|
||||
if secret and connection_type == "http":
|
||||
if secret and isinstance(request, HTTPRequest):
|
||||
if not x_signature:
|
||||
log("WARNING", "Missing Signature Header")
|
||||
raise RequestDenied(401, "Missing Signature")
|
||||
sig = hmac.new(secret.encode("utf-8"), body, "sha1").hexdigest()
|
||||
return None, HTTPResponse(401, b"Missing Signature")
|
||||
sig = hmac.new(secret.encode("utf-8"), request.body,
|
||||
"sha1").hexdigest()
|
||||
if x_signature != "sha1=" + sig:
|
||||
log("WARNING", "Signature Header is invalid")
|
||||
raise RequestDenied(403, "Signature is invalid")
|
||||
return None, HTTPResponse(403, b"Signature is invalid")
|
||||
|
||||
access_token = cqhttp_config.access_token
|
||||
if access_token and access_token != token and connection_type == "websocket":
|
||||
if access_token and access_token != token and isinstance(
|
||||
request, WebSocket):
|
||||
log(
|
||||
"WARNING", "Authorization Header is invalid"
|
||||
if token else "Missing Authorization Header")
|
||||
raise RequestDenied(
|
||||
403, "Authorization Header is invalid"
|
||||
if token else "Missing Authorization Header")
|
||||
return str(x_self_id)
|
||||
return None, HTTPResponse(
|
||||
403, b"Authorization Header is invalid"
|
||||
if token else b"Missing Authorization Header")
|
||||
return str(x_self_id), HTTPResponse(204, b'')
|
||||
|
||||
@overrides(BaseBot)
|
||||
async def handle_message(self, message: dict):
|
||||
async def handle_message(self, message: bytes):
|
||||
"""
|
||||
:说明:
|
||||
|
||||
调用 `_check_reply <#async-check-reply-bot-event>`_, `_check_at_me <#check-at-me-bot-event>`_, `_check_nickname <#check-nickname-bot-event>`_ 处理事件并转换为 `Event <#class-event>`_
|
||||
"""
|
||||
if not message:
|
||||
data = json.loads(message)
|
||||
|
||||
if not data:
|
||||
return
|
||||
|
||||
if "post_type" not in message:
|
||||
ResultStore.add_result(message)
|
||||
if "post_type" not in data:
|
||||
ResultStore.add_result(data)
|
||||
return
|
||||
|
||||
try:
|
||||
post_type = message['post_type']
|
||||
detail_type = message.get(f"{post_type}_type")
|
||||
post_type = data['post_type']
|
||||
detail_type = data.get(f"{post_type}_type")
|
||||
detail_type = f".{detail_type}" if detail_type else ""
|
||||
sub_type = message.get("sub_type")
|
||||
sub_type = data.get("sub_type")
|
||||
sub_type = f".{sub_type}" if sub_type else ""
|
||||
models = get_event_model(post_type + detail_type + sub_type)
|
||||
for model in models:
|
||||
try:
|
||||
event = model.parse_obj(message)
|
||||
event = model.parse_obj(data)
|
||||
break
|
||||
except Exception as e:
|
||||
log("DEBUG", "Event Parser Error", e)
|
||||
else:
|
||||
event = Event.parse_obj(message)
|
||||
event = Event.parse_obj(data)
|
||||
|
||||
# Check whether user is calling me
|
||||
await _check_reply(self, event)
|
||||
@ -329,25 +321,28 @@ class Bot(BaseBot):
|
||||
await handle_event(self, event)
|
||||
except Exception as e:
|
||||
logger.opt(colors=True, exception=e).error(
|
||||
f"<r><bg #f8bbd0>Failed to handle event. Raw: {message}</bg #f8bbd0></r>"
|
||||
f"<r><bg #f8bbd0>Failed to handle event. Raw: {data}</bg #f8bbd0></r>"
|
||||
)
|
||||
|
||||
@overrides(BaseBot)
|
||||
async def _call_api(self, api: str, **data) -> Any:
|
||||
log("DEBUG", f"Calling API <y>{api}</y>")
|
||||
if self.connection_type == "websocket":
|
||||
if isinstance(self.request, WebSocket):
|
||||
seq = ResultStore.get_seq()
|
||||
await self.websocket.send({
|
||||
"action": api,
|
||||
"params": data,
|
||||
"echo": {
|
||||
"seq": seq
|
||||
}
|
||||
})
|
||||
json_data = json.dumps(
|
||||
{
|
||||
"action": api,
|
||||
"params": data,
|
||||
"echo": {
|
||||
"seq": seq
|
||||
}
|
||||
},
|
||||
cls=DataclassEncoder)
|
||||
await self.request.send(json_data)
|
||||
return _handle_api_result(await ResultStore.fetch(
|
||||
seq, self.config.api_timeout))
|
||||
|
||||
elif self.connection_type == "http":
|
||||
elif isinstance(self.request, HTTPRequest):
|
||||
api_root = self.config.api_root.get(self.self_id)
|
||||
if not api_root:
|
||||
raise ApiNotAvailable
|
||||
@ -431,7 +426,7 @@ class Bot(BaseBot):
|
||||
message, str) else message
|
||||
msg = message if isinstance(message, Message) else Message(message)
|
||||
|
||||
at_sender = at_sender and getattr(event, "user_id", None)
|
||||
at_sender = at_sender and bool(getattr(event, "user_id", None))
|
||||
|
||||
params = {}
|
||||
if getattr(event, "user_id", None):
|
||||
@ -449,8 +444,7 @@ class Bot(BaseBot):
|
||||
raise ValueError("Cannot guess message type to reply!")
|
||||
|
||||
if at_sender and params["message_type"] != "private":
|
||||
params["message"] = MessageSegment.at(params["user_id"]) + \
|
||||
MessageSegment.text(" ") + msg
|
||||
params["message"] = MessageSegment.at(params["user_id"]) + " " + msg
|
||||
else:
|
||||
params["message"] = msg
|
||||
return await self.send_msg(**params)
|
||||
|
@ -1,16 +1,17 @@
|
||||
import json
|
||||
import urllib.parse
|
||||
|
||||
from datetime import datetime
|
||||
import time
|
||||
from typing import Any, Union, Optional, TYPE_CHECKING
|
||||
from datetime import datetime
|
||||
from typing import Any, Tuple, Union, Optional, TYPE_CHECKING
|
||||
|
||||
import httpx
|
||||
|
||||
from nonebot.log import logger
|
||||
from nonebot.typing import overrides
|
||||
from nonebot.message import handle_event
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.exception import RequestDenied
|
||||
from nonebot.drivers import Driver, HTTPConnection, HTTPRequest, HTTPResponse
|
||||
|
||||
from .utils import calc_hmac_base64, log
|
||||
from .config import Config as DingConfig
|
||||
@ -20,7 +21,6 @@ from .event import MessageEvent, PrivateMessageEvent, GroupMessageEvent, Convers
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nonebot.config import Config
|
||||
from nonebot.drivers import Driver
|
||||
|
||||
SEND = "send"
|
||||
|
||||
@ -31,10 +31,6 @@ class Bot(BaseBot):
|
||||
"""
|
||||
ding_config: DingConfig
|
||||
|
||||
def __init__(self, connection_type: str, self_id: str, **kwargs):
|
||||
|
||||
super().__init__(connection_type, self_id, **kwargs)
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
"""
|
||||
@ -43,57 +39,61 @@ class Bot(BaseBot):
|
||||
return "ding"
|
||||
|
||||
@classmethod
|
||||
def register(cls, driver: "Driver", config: "Config"):
|
||||
def register(cls, driver: Driver, config: "Config"):
|
||||
super().register(driver, config)
|
||||
cls.ding_config = DingConfig(**config.dict())
|
||||
|
||||
@classmethod
|
||||
@overrides(BaseBot)
|
||||
async def check_permission(cls, driver: "Driver", connection_type: str,
|
||||
headers: dict, body: Optional[bytes]) -> str:
|
||||
async def check_permission(
|
||||
cls, driver: Driver,
|
||||
request: HTTPConnection) -> Tuple[Optional[str], HTTPResponse]:
|
||||
"""
|
||||
:说明:
|
||||
|
||||
钉钉协议鉴权。参考 `鉴权 <https://ding-doc.dingtalk.com/doc#/serverapi2/elzz1p>`_
|
||||
"""
|
||||
timestamp = headers.get("timestamp")
|
||||
sign = headers.get("sign")
|
||||
timestamp = request.headers.get("timestamp")
|
||||
sign = request.headers.get("sign")
|
||||
|
||||
# 检查连接方式
|
||||
if connection_type not in ["http"]:
|
||||
raise RequestDenied(
|
||||
405, "Unsupported connection type, available type: `http`")
|
||||
if not isinstance(request, HTTPRequest):
|
||||
return None, HTTPResponse(
|
||||
405, b"Unsupported connection type, available type: `http`")
|
||||
|
||||
# 检查 timestamp
|
||||
if not timestamp:
|
||||
raise RequestDenied(400, "Missing `timestamp` Header")
|
||||
return None, HTTPResponse(400, b"Missing `timestamp` Header")
|
||||
|
||||
# 检查 sign
|
||||
secret = cls.ding_config.secret
|
||||
if secret:
|
||||
if not sign:
|
||||
log("WARNING", "Missing Signature Header")
|
||||
raise RequestDenied(400, "Missing `sign` Header")
|
||||
return None, HTTPResponse(400, b"Missing `sign` Header")
|
||||
sign_base64 = calc_hmac_base64(str(timestamp), secret)
|
||||
if sign != sign_base64.decode('utf-8'):
|
||||
log("WARNING", "Signature Header is invalid")
|
||||
raise RequestDenied(403, "Signature is invalid")
|
||||
return None, HTTPResponse(403, b"Signature is invalid")
|
||||
else:
|
||||
log("WARNING", "Ding signature check ignored!")
|
||||
return json.loads(body.decode())["chatbotUserId"]
|
||||
return (json.loads(request.body.decode())["chatbotUserId"],
|
||||
HTTPResponse(204, b''))
|
||||
|
||||
@overrides(BaseBot)
|
||||
async def handle_message(self, message: dict):
|
||||
if not message:
|
||||
async def handle_message(self, message: bytes):
|
||||
data = json.loads(message)
|
||||
|
||||
if not data:
|
||||
return
|
||||
|
||||
# 判断消息类型,生成不同的 Event
|
||||
try:
|
||||
conversation_type = message["conversationType"]
|
||||
conversation_type = data["conversationType"]
|
||||
if conversation_type == ConversationType.private:
|
||||
event = PrivateMessageEvent.parse_obj(message)
|
||||
event = PrivateMessageEvent.parse_obj(data)
|
||||
elif conversation_type == ConversationType.group:
|
||||
event = GroupMessageEvent.parse_obj(message)
|
||||
event = GroupMessageEvent.parse_obj(data)
|
||||
else:
|
||||
raise ValueError("Unsupported conversation type")
|
||||
except Exception as e:
|
||||
@ -104,7 +104,7 @@ class Bot(BaseBot):
|
||||
await handle_event(self, event)
|
||||
except Exception as e:
|
||||
logger.opt(colors=True, exception=e).error(
|
||||
f"<r><bg #f8bbd0>Failed to handle event. Raw: {message}</bg #f8bbd0></r>"
|
||||
f"<r><bg #f8bbd0>Failed to handle event. Raw: {data}</bg #f8bbd0></r>"
|
||||
)
|
||||
return
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
"""
|
||||
r"""
|
||||
Mirai-API-HTTP 协议适配
|
||||
============================
|
||||
|
||||
协议详情请看: `mirai-api-http 文档`_
|
||||
协议详情请看: `mirai-api-http 文档`_
|
||||
|
||||
\:\:\: tip
|
||||
该Adapter目前仍然处在早期实验性阶段, 并未经过充分测试
|
||||
|
@ -5,11 +5,11 @@ from typing import Any, Dict, List, NoReturn, Optional, Tuple, Union
|
||||
|
||||
import httpx
|
||||
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.config import Config
|
||||
from nonebot.drivers import Driver, WebSocket
|
||||
from nonebot.exception import ApiNotAvailable, RequestDenied
|
||||
from nonebot.typing import overrides
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.exception import ApiNotAvailable
|
||||
from nonebot.drivers import Driver, HTTPConnection, HTTPResponse, WebSocket
|
||||
|
||||
from .config import Config as MiraiConfig
|
||||
from .event import Event, FriendMessage, GroupMessage, TempMessage
|
||||
@ -140,7 +140,7 @@ class SessionManager:
|
||||
|
||||
|
||||
class Bot(BaseBot):
|
||||
"""
|
||||
r"""
|
||||
mirai-api-http 协议 Bot 适配。
|
||||
|
||||
\:\:\: warning
|
||||
@ -151,14 +151,6 @@ class Bot(BaseBot):
|
||||
|
||||
"""
|
||||
|
||||
@overrides(BaseBot)
|
||||
def __init__(self,
|
||||
connection_type: str,
|
||||
self_id: str,
|
||||
*,
|
||||
websocket: Optional[WebSocket] = None):
|
||||
super().__init__(connection_type, self_id, websocket=websocket)
|
||||
|
||||
@property
|
||||
@overrides(BaseBot)
|
||||
def type(self) -> str:
|
||||
@ -166,7 +158,8 @@ class Bot(BaseBot):
|
||||
|
||||
@property
|
||||
def alive(self) -> bool:
|
||||
return not self.websocket.closed
|
||||
assert isinstance(self.request, WebSocket)
|
||||
return not self.request.closed
|
||||
|
||||
@property
|
||||
def api(self) -> SessionManager:
|
||||
@ -177,27 +170,26 @@ class Bot(BaseBot):
|
||||
|
||||
@classmethod
|
||||
@overrides(BaseBot)
|
||||
async def check_permission(cls, driver: "Driver", connection_type: str,
|
||||
headers: dict, body: Optional[bytes]) -> str:
|
||||
if connection_type == 'ws':
|
||||
raise RequestDenied(
|
||||
status_code=501,
|
||||
reason='Websocket connection is not implemented')
|
||||
self_id: Optional[str] = headers.get('bot')
|
||||
async def check_permission(
|
||||
cls, driver: Driver,
|
||||
request: HTTPConnection) -> Tuple[Optional[str], HTTPResponse]:
|
||||
if isinstance(request, WebSocket):
|
||||
return None, HTTPResponse(
|
||||
501, b'Websocket connection is not implemented')
|
||||
self_id: Optional[str] = request.headers.get('bot')
|
||||
if self_id is None:
|
||||
raise RequestDenied(status_code=400,
|
||||
reason='Header `Bot` is required.')
|
||||
return None, HTTPResponse(400, b'Header `Bot` is required.')
|
||||
self_id = str(self_id).strip()
|
||||
await SessionManager.new(
|
||||
int(self_id),
|
||||
host=cls.mirai_config.host, # type: ignore
|
||||
port=cls.mirai_config.port, #type: ignore
|
||||
auth_key=cls.mirai_config.auth_key) # type: ignore
|
||||
return self_id
|
||||
return self_id, HTTPResponse(204, b'')
|
||||
|
||||
@classmethod
|
||||
@overrides(BaseBot)
|
||||
def register(cls, driver: "Driver", config: "Config"):
|
||||
def register(cls, driver: Driver, config: "Config"):
|
||||
cls.mirai_config = MiraiConfig(**config.dict())
|
||||
if (cls.mirai_config.auth_key and cls.mirai_config.host and
|
||||
cls.mirai_config.port) is None:
|
||||
@ -224,7 +216,7 @@ class Bot(BaseBot):
|
||||
|
||||
@overrides(BaseBot)
|
||||
async def call_api(self, api: str, **data) -> NoReturn:
|
||||
"""
|
||||
r"""
|
||||
\:\:\: danger
|
||||
由于Mirai的HTTP API特殊性, 该API暂时无法实现
|
||||
\:\:\:
|
||||
|
@ -1,18 +1,16 @@
|
||||
import asyncio
|
||||
import json
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from ipaddress import IPv4Address
|
||||
from typing import (Any, Callable, Coroutine, Dict, NoReturn, Optional, Set,
|
||||
TypeVar)
|
||||
from typing import Any, Set, Dict, Tuple, TypeVar, Optional, Callable, Coroutine
|
||||
|
||||
import httpx
|
||||
import websockets
|
||||
|
||||
from nonebot.config import Config
|
||||
from nonebot.drivers import Driver
|
||||
from nonebot.drivers import WebSocket as BaseWebSocket
|
||||
from nonebot.exception import RequestDenied
|
||||
from nonebot.log import logger
|
||||
from nonebot.config import Config
|
||||
from nonebot.typing import overrides
|
||||
from nonebot.drivers import Driver, HTTPConnection, HTTPResponse, WebSocket as BaseWebSocket
|
||||
|
||||
from .bot import SessionManager, Bot
|
||||
|
||||
@ -21,7 +19,9 @@ WebsocketHandler_T = TypeVar('WebsocketHandler_T',
|
||||
bound=WebsocketHandlerFunction)
|
||||
|
||||
|
||||
@dataclass
|
||||
class WebSocket(BaseWebSocket):
|
||||
websocket: websockets.WebSocketClientProtocol = None # type: ignore
|
||||
|
||||
@classmethod
|
||||
async def new(cls, *, host: IPv4Address, port: int,
|
||||
@ -37,24 +37,26 @@ class WebSocket(BaseWebSocket):
|
||||
self.event_handlers: Set[WebsocketHandlerFunction] = set()
|
||||
super().__init__(websocket)
|
||||
|
||||
@property
|
||||
@overrides(BaseWebSocket)
|
||||
def websocket(self) -> websockets.WebSocketClientProtocol:
|
||||
return self._websocket
|
||||
|
||||
@property
|
||||
@overrides(BaseWebSocket)
|
||||
def closed(self) -> bool:
|
||||
return self.websocket.closed
|
||||
|
||||
@overrides(BaseWebSocket)
|
||||
async def send(self, data: Dict[str, Any]):
|
||||
return await self.websocket.send(json.dumps(data))
|
||||
async def send(self, data: str):
|
||||
return await self.websocket.send(data)
|
||||
|
||||
@overrides(BaseWebSocket)
|
||||
async def receive(self) -> Dict[str, Any]:
|
||||
received = await self.websocket.recv()
|
||||
return json.loads(received)
|
||||
async def send_bytes(self, data: str):
|
||||
return await self.websocket.send(data)
|
||||
|
||||
@overrides(BaseWebSocket)
|
||||
async def receive(self) -> str:
|
||||
return await self.websocket.recv() # type: ignore
|
||||
|
||||
@overrides(BaseWebSocket)
|
||||
async def receive_bytes(self) -> bytes:
|
||||
return await self.websocket.recv() # type: ignore
|
||||
|
||||
async def _dispatcher(self):
|
||||
while not self.closed:
|
||||
@ -93,11 +95,6 @@ class WebsocketBot(Bot):
|
||||
mirai-api-http 正向 Websocket 协议 Bot 适配。
|
||||
"""
|
||||
|
||||
@overrides(Bot)
|
||||
def __init__(self, connection_type: str, self_id: str, *,
|
||||
websocket: WebSocket):
|
||||
super().__init__(connection_type, self_id, websocket=websocket)
|
||||
|
||||
@property
|
||||
@overrides(Bot)
|
||||
def type(self) -> str:
|
||||
@ -105,7 +102,8 @@ class WebsocketBot(Bot):
|
||||
|
||||
@property
|
||||
def alive(self) -> bool:
|
||||
return not self.websocket.closed
|
||||
assert isinstance(self.request, WebSocket)
|
||||
return not self.request.closed
|
||||
|
||||
@property
|
||||
def api(self) -> SessionManager:
|
||||
@ -115,16 +113,14 @@ class WebsocketBot(Bot):
|
||||
|
||||
@classmethod
|
||||
@overrides(Bot)
|
||||
async def check_permission(cls, driver: "Driver", connection_type: str,
|
||||
headers: dict,
|
||||
body: Optional[bytes]) -> NoReturn:
|
||||
raise RequestDenied(
|
||||
status_code=501,
|
||||
reason=f'Connection {connection_type} not implented')
|
||||
async def check_permission(
|
||||
cls, driver: Driver,
|
||||
request: HTTPConnection) -> Tuple[None, HTTPResponse]:
|
||||
return None, HTTPResponse(501, b'Connection not implented')
|
||||
|
||||
@classmethod
|
||||
@overrides(Bot)
|
||||
def register(cls, driver: "Driver", config: "Config", qq: int):
|
||||
def register(cls, driver: Driver, config: "Config", qq: int):
|
||||
"""
|
||||
:说明:
|
||||
|
||||
|
Reference in New Issue
Block a user