🧑‍💻 Develop: 添加 ruff RUF 规则 (#2598)

Co-authored-by: Ju4tCode <42488585+yanyongyu@users.noreply.github.com>
This commit is contained in:
uy/sun
2024-03-07 14:57:26 +08:00
committed by GitHub
parent 92ba99c34c
commit 9ff7f4baba
19 changed files with 118 additions and 99 deletions

View File

@ -1,7 +1,7 @@
import abc
import asyncio
from functools import partial
from typing import TYPE_CHECKING, Any, Set, Union, Optional, Protocol
from typing import TYPE_CHECKING, Any, Set, Union, ClassVar, Optional, Protocol
from nonebot.log import logger
from nonebot.config import Config
@ -27,9 +27,9 @@ class Bot(abc.ABC):
self_id: 机器人 ID
"""
_calling_api_hook: Set[T_CallingAPIHook] = set()
_calling_api_hook: ClassVar[Set[T_CallingAPIHook]] = set()
"""call_api 时执行的函数"""
_called_api_hook: Set[T_CalledAPIHook] = set()
_called_api_hook: ClassVar[Set[T_CalledAPIHook]] = set()
"""call_api 后执行的函数"""
def __init__(self, adapter: "Adapter", self_id: str):

View File

@ -20,7 +20,7 @@ class Event(abc.ABC, BaseModel):
class Config(ConfigDict):
extra = "allow" # type: ignore
json_encoders = {Message: DataclassEncoder}
json_encoders = {Message: DataclassEncoder} # noqa: RUF012
if not PYDANTIC_V2: # pragma: pydantic-v1

View File

@ -25,7 +25,7 @@ from _string import formatter_field_name_split # type: ignore
if TYPE_CHECKING:
from .message import Message, MessageSegment
def formatter_field_name_split( # noqa: F811
def formatter_field_name_split(
field_name: str,
) -> Tuple[str, List[Tuple[bool, str]]]: ...

View File

@ -2,7 +2,7 @@ import abc
import asyncio
from typing_extensions import TypeAlias
from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING, Any, Set, Dict, Type, AsyncGenerator
from typing import TYPE_CHECKING, Any, Set, Dict, Type, ClassVar, AsyncGenerator
from nonebot.log import logger
from nonebot.config import Env, Config
@ -36,11 +36,11 @@ class Driver(abc.ABC):
config: 包含配置信息的 Config 对象
"""
_adapters: Dict[str, "Adapter"] = {}
_adapters: ClassVar[Dict[str, "Adapter"]] = {}
"""已注册的适配器列表"""
_bot_connection_hook: Set[Dependent[Any]] = set()
_bot_connection_hook: ClassVar[Set[Dependent[Any]]] = set()
"""Bot 连接建立时执行的函数"""
_bot_disconnection_hook: Set[Dependent[Any]] = set()
_bot_disconnection_hook: ClassVar[Set[Dependent[Any]]] = set()
"""Bot 连接断开时执行的函数"""
def __init__(self, env: Env, config: Config):

View File

@ -141,7 +141,7 @@ class Matcher(metaclass=MatcherMeta):
"""事件响应器匹配规则"""
permission: ClassVar[Permission] = Permission()
"""事件响应器触发权限"""
handlers: List[Dependent[Any]] = []
handlers: ClassVar[List[Dependent[Any]]] = []
"""事件响应器拥有的事件处理函数列表"""
priority: ClassVar[int] = 1
"""事件响应器优先级"""
@ -171,7 +171,7 @@ class Matcher(metaclass=MatcherMeta):
)
def __init__(self):
self.handlers = self.handlers.copy()
self.remain_handlers: List[Dependent[Any]] = self.handlers.copy()
self.state = self._default_state.copy()
def __repr__(self) -> str:
@ -457,7 +457,7 @@ class Matcher(metaclass=MatcherMeta):
parameterless: 非参数类型依赖列表
"""
async def _receive(event: Event, matcher: "Matcher") -> Union[None, NoReturn]:
async def _receive(event: Event, matcher: "Matcher") -> None:
matcher.set_target(RECEIVE_KEY.format(id=id))
if matcher.get_target() == RECEIVE_KEY.format(id=id):
matcher.set_receive(id, event)
@ -775,7 +775,7 @@ class Matcher(metaclass=MatcherMeta):
async def resolve_reject(self):
handler = current_handler.get()
self.handlers.insert(0, handler)
self.remain_handlers.insert(0, handler)
if REJECT_CACHE_TARGET in self.state:
self.state[REJECT_TARGET] = self.state[REJECT_CACHE_TARGET]
@ -809,8 +809,8 @@ class Matcher(metaclass=MatcherMeta):
# Refresh preprocess state
self.state.update(state)
while self.handlers:
handler = self.handlers.pop(0)
while self.remain_handlers:
handler = self.remain_handlers.pop(0)
current_handler.set(handler)
logger.debug(f"Running handler {handler}")
try:
@ -852,7 +852,7 @@ class Matcher(metaclass=MatcherMeta):
type_,
Rule(),
permission,
self.handlers,
self.remain_handlers,
temp=True,
priority=0,
block=True,
@ -872,7 +872,7 @@ class Matcher(metaclass=MatcherMeta):
type_,
Rule(),
permission,
self.handlers,
self.remain_handlers,
temp=True,
priority=0,
block=True,

View File

@ -1,7 +1,7 @@
import asyncio
from typing_extensions import Self
from contextlib import AsyncExitStack
from typing import Set, Tuple, Union, NoReturn, Optional
from typing import Set, List, Type, Tuple, Union, ClassVar, NoReturn, Optional
from nonebot.dependencies import Dependent
from nonebot.utils import run_coro_with_catch
@ -9,7 +9,7 @@ from nonebot.exception import SkippedException
from nonebot.typing import T_DependencyCache, T_PermissionChecker
from .adapter import Bot, Event
from .params import BotParam, EventParam, DependParam, DefaultParam
from .params import Param, BotParam, EventParam, DependParam, DefaultParam
class Permission:
@ -30,7 +30,7 @@ class Permission:
__slots__ = ("checkers",)
HANDLER_PARAM_TYPES = [
HANDLER_PARAM_TYPES: ClassVar[List[Type[Param]]] = [
DependParam,
BotParam,
EventParam,
@ -146,7 +146,7 @@ class User:
@classmethod
def _clean_permission(cls, perm: Permission) -> Optional[Permission]:
if len(perm.checkers) == 1 and isinstance(
user_perm := tuple(perm.checkers)[0].call, cls
user_perm := next(iter(perm.checkers)).call, cls
):
return user_perm.perm
return perm

View File

@ -1,13 +1,13 @@
import asyncio
from contextlib import AsyncExitStack
from typing import Set, Union, NoReturn, Optional
from typing import Set, List, Type, Union, ClassVar, NoReturn, Optional
from nonebot.dependencies import Dependent
from nonebot.exception import SkippedException
from nonebot.typing import T_State, T_RuleChecker, T_DependencyCache
from .adapter import Bot, Event
from .params import BotParam, EventParam, StateParam, DependParam, DefaultParam
from .params import Param, BotParam, EventParam, StateParam, DependParam, DefaultParam
class Rule:
@ -28,7 +28,7 @@ class Rule:
__slots__ = ("checkers",)
HANDLER_PARAM_TYPES = [
HANDLER_PARAM_TYPES: ClassVar[List[Type[Param]]] = [
DependParam,
BotParam,
EventParam,

View File

@ -236,7 +236,7 @@ class PluginLoader(SourceFileLoader):
break
# enter plugin context
_plugin_token = _current_plugin_chain.set(parent_plugins + (plugin,))
_plugin_token = _current_plugin_chain.set((*parent_plugins, plugin))
try:
super().exec_module(module)