mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-16 11:00:54 +00:00
⬆️ auto update by pre-commit hooks (#2565)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Ju4tCode <42488585+yanyongyu@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
07e6c3f977
commit
4dae23d3bb
@ -296,9 +296,11 @@ def init(*, _env_file: Optional[DOTENV_TYPE] = None, **kwargs: Any) -> None:
|
||||
_env_file = _env_file or f".env.{env.environment}"
|
||||
config = Config(
|
||||
**kwargs,
|
||||
_env_file=(".env", _env_file)
|
||||
if isinstance(_env_file, (str, os.PathLike))
|
||||
else _env_file,
|
||||
_env_file=(
|
||||
(".env", _env_file)
|
||||
if isinstance(_env_file, (str, os.PathLike))
|
||||
else _env_file
|
||||
),
|
||||
)
|
||||
|
||||
logger.configure(
|
||||
|
@ -8,7 +8,7 @@ FrontMatter:
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, is_dataclass
|
||||
from typing_extensions import Self, Annotated, is_typeddict
|
||||
from typing_extensions import Self, Annotated, get_args, get_origin, is_typeddict
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
@ -25,6 +25,8 @@ from typing import (
|
||||
|
||||
from pydantic import VERSION, BaseModel
|
||||
|
||||
from nonebot.typing import origin_is_annotated
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
PYDANTIC_V2 = int(VERSION.split(".", 1)[0]) == 2
|
||||
@ -33,8 +35,7 @@ if TYPE_CHECKING:
|
||||
|
||||
class _CustomValidationClass(Protocol):
|
||||
@classmethod
|
||||
def __get_validators__(cls) -> Generator[Callable[..., Any], None, None]:
|
||||
...
|
||||
def __get_validators__(cls) -> Generator[Callable[..., Any], None, None]: ...
|
||||
|
||||
CVC = TypeVar("CVC", bound=_CustomValidationClass)
|
||||
|
||||
@ -131,11 +132,15 @@ if PYDANTIC_V2: # pragma: pydantic-v2
|
||||
TypeAdapter raise error when annotation has config
|
||||
and given config is not None.
|
||||
"""
|
||||
type_is_annotated = origin_is_annotated(get_origin(self.annotation))
|
||||
inner_type = (
|
||||
get_args(self.annotation)[0] if type_is_annotated else self.annotation
|
||||
)
|
||||
try:
|
||||
return (
|
||||
issubclass(self.annotation, BaseModel)
|
||||
or is_dataclass(self.annotation)
|
||||
or is_typeddict(self.annotation)
|
||||
issubclass(inner_type, BaseModel)
|
||||
or is_dataclass(inner_type)
|
||||
or is_typeddict(inner_type)
|
||||
)
|
||||
except TypeError:
|
||||
return False
|
||||
|
@ -55,8 +55,7 @@ DOTENV_TYPE: TypeAlias = Union[
|
||||
ENV_FILE_SENTINEL = Path("")
|
||||
|
||||
|
||||
class SettingsError(ValueError):
|
||||
...
|
||||
class SettingsError(ValueError): ...
|
||||
|
||||
|
||||
class BaseSettingsSource(abc.ABC):
|
||||
|
@ -179,8 +179,7 @@ class WebSocket(BaseWebSocket):
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
class Driver(Mixin, NoneDriver):
|
||||
...
|
||||
class Driver(Mixin, NoneDriver): ...
|
||||
|
||||
else:
|
||||
Driver = combine_driver(NoneDriver, Mixin)
|
||||
|
@ -15,7 +15,6 @@ FrontMatter:
|
||||
description: nonebot.drivers.fastapi 模块
|
||||
"""
|
||||
|
||||
|
||||
import logging
|
||||
import contextlib
|
||||
from functools import wraps
|
||||
|
@ -72,8 +72,7 @@ class Mixin(HTTPClientMixin):
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
class Driver(Mixin, NoneDriver):
|
||||
...
|
||||
class Driver(Mixin, NoneDriver): ...
|
||||
|
||||
else:
|
||||
Driver = combine_driver(NoneDriver, Mixin)
|
||||
|
@ -128,8 +128,7 @@ class WebSocket(BaseWebSocket):
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
class Driver(Mixin, NoneDriver):
|
||||
...
|
||||
class Driver(Mixin, NoneDriver): ...
|
||||
|
||||
else:
|
||||
Driver = combine_driver(NoneDriver, Mixin)
|
||||
|
@ -14,8 +14,7 @@ if TYPE_CHECKING:
|
||||
from .message import Message, MessageSegment
|
||||
|
||||
class _ApiCall(Protocol):
|
||||
async def __call__(self, **kwargs: Any) -> Any:
|
||||
...
|
||||
async def __call__(self, **kwargs: Any) -> Any: ...
|
||||
|
||||
|
||||
class Bot(abc.ABC):
|
||||
|
@ -27,8 +27,7 @@ if TYPE_CHECKING:
|
||||
|
||||
def formatter_field_name_split( # noqa: F811
|
||||
field_name: str,
|
||||
) -> Tuple[str, List[Tuple[bool, str]]]:
|
||||
...
|
||||
) -> Tuple[str, List[Tuple[bool, str]]]: ...
|
||||
|
||||
|
||||
TM = TypeVar("TM", bound="Message")
|
||||
@ -53,8 +52,7 @@ class MessageTemplate(Formatter, Generic[TF]):
|
||||
template: str,
|
||||
factory: Type[str] = str,
|
||||
private_getattr: bool = False,
|
||||
) -> None:
|
||||
...
|
||||
) -> None: ...
|
||||
|
||||
@overload
|
||||
def __init__(
|
||||
@ -62,8 +60,7 @@ class MessageTemplate(Formatter, Generic[TF]):
|
||||
template: Union[str, TM],
|
||||
factory: Type[TM],
|
||||
private_getattr: bool = False,
|
||||
) -> None:
|
||||
...
|
||||
) -> None: ...
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -6,18 +6,15 @@ D = TypeVar("D", bound="Driver")
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
class CombinedDriver(Driver, Mixin):
|
||||
...
|
||||
class CombinedDriver(Driver, Mixin): ...
|
||||
|
||||
|
||||
@overload
|
||||
def combine_driver(driver: Type[D]) -> Type[D]:
|
||||
...
|
||||
def combine_driver(driver: Type[D]) -> Type[D]: ...
|
||||
|
||||
|
||||
@overload
|
||||
def combine_driver(driver: Type[D], *mixins: Type[Mixin]) -> Type["CombinedDriver"]:
|
||||
...
|
||||
def combine_driver(driver: Type[D], *mixins: Type[Mixin]) -> Type["CombinedDriver"]: ...
|
||||
|
||||
|
||||
def combine_driver(
|
||||
|
@ -65,12 +65,10 @@ class MatcherManager(MutableMapping[int, List[Type["Matcher"]]]):
|
||||
return self.provider.items()
|
||||
|
||||
@overload
|
||||
def get(self, key: int) -> Optional[List[Type["Matcher"]]]:
|
||||
...
|
||||
def get(self, key: int) -> Optional[List[Type["Matcher"]]]: ...
|
||||
|
||||
@overload
|
||||
def get(self, key: int, default: T) -> Union[List[Type["Matcher"]], T]:
|
||||
...
|
||||
def get(self, key: int, default: T) -> Union[List[Type["Matcher"]], T]: ...
|
||||
|
||||
def get(
|
||||
self, key: int, default: Optional[T] = None
|
||||
|
@ -262,16 +262,20 @@ class Matcher(metaclass=MatcherMeta):
|
||||
"type": type_,
|
||||
"rule": rule or Rule(),
|
||||
"permission": permission or Permission(),
|
||||
"handlers": [
|
||||
handler
|
||||
if isinstance(handler, Dependent)
|
||||
else Dependent[Any].parse(
|
||||
call=handler, allow_types=cls.HANDLER_PARAM_TYPES
|
||||
)
|
||||
for handler in handlers
|
||||
]
|
||||
if handlers
|
||||
else [],
|
||||
"handlers": (
|
||||
[
|
||||
(
|
||||
handler
|
||||
if isinstance(handler, Dependent)
|
||||
else Dependent[Any].parse(
|
||||
call=handler, allow_types=cls.HANDLER_PARAM_TYPES
|
||||
)
|
||||
)
|
||||
for handler in handlers
|
||||
]
|
||||
if handlers
|
||||
else []
|
||||
),
|
||||
"temp": temp,
|
||||
"expire_time": (
|
||||
expire_time
|
||||
@ -658,12 +662,10 @@ class Matcher(metaclass=MatcherMeta):
|
||||
raise SkippedException
|
||||
|
||||
@overload
|
||||
def get_receive(self, id: str) -> Union[Event, None]:
|
||||
...
|
||||
def get_receive(self, id: str) -> Union[Event, None]: ...
|
||||
|
||||
@overload
|
||||
def get_receive(self, id: str, default: T) -> Union[Event, T]:
|
||||
...
|
||||
def get_receive(self, id: str, default: T) -> Union[Event, T]: ...
|
||||
|
||||
def get_receive(
|
||||
self, id: str, default: Optional[T] = None
|
||||
@ -680,12 +682,10 @@ class Matcher(metaclass=MatcherMeta):
|
||||
self.state[LAST_RECEIVE_KEY] = event
|
||||
|
||||
@overload
|
||||
def get_last_receive(self) -> Union[Event, None]:
|
||||
...
|
||||
def get_last_receive(self) -> Union[Event, None]: ...
|
||||
|
||||
@overload
|
||||
def get_last_receive(self, default: T) -> Union[Event, T]:
|
||||
...
|
||||
def get_last_receive(self, default: T) -> Union[Event, T]: ...
|
||||
|
||||
def get_last_receive(
|
||||
self, default: Optional[T] = None
|
||||
@ -697,12 +697,10 @@ class Matcher(metaclass=MatcherMeta):
|
||||
return self.state.get(LAST_RECEIVE_KEY, default)
|
||||
|
||||
@overload
|
||||
def get_arg(self, key: str) -> Union[Message, None]:
|
||||
...
|
||||
def get_arg(self, key: str) -> Union[Message, None]: ...
|
||||
|
||||
@overload
|
||||
def get_arg(self, key: str, default: T) -> Union[Message, T]:
|
||||
...
|
||||
def get_arg(self, key: str, default: T) -> Union[Message, T]: ...
|
||||
|
||||
def get_arg(
|
||||
self, key: str, default: Optional[T] = None
|
||||
@ -724,12 +722,10 @@ class Matcher(metaclass=MatcherMeta):
|
||||
self.state[REJECT_TARGET] = target
|
||||
|
||||
@overload
|
||||
def get_target(self) -> Union[str, None]:
|
||||
...
|
||||
def get_target(self) -> Union[str, None]: ...
|
||||
|
||||
@overload
|
||||
def get_target(self, default: T) -> Union[str, T]:
|
||||
...
|
||||
def get_target(self, default: T) -> Union[str, T]: ...
|
||||
|
||||
def get_target(self, default: Optional[T] = None) -> Optional[Union[str, T]]:
|
||||
return self.state.get(REJECT_TARGET, default)
|
||||
|
@ -39,10 +39,12 @@ class Permission:
|
||||
|
||||
def __init__(self, *checkers: Union[T_PermissionChecker, Dependent[bool]]) -> None:
|
||||
self.checkers: Set[Dependent[bool]] = {
|
||||
checker
|
||||
if isinstance(checker, Dependent)
|
||||
else Dependent[bool].parse(
|
||||
call=checker, allow_types=self.HANDLER_PARAM_TYPES
|
||||
(
|
||||
checker
|
||||
if isinstance(checker, Dependent)
|
||||
else Dependent[bool].parse(
|
||||
call=checker, allow_types=self.HANDLER_PARAM_TYPES
|
||||
)
|
||||
)
|
||||
for checker in checkers
|
||||
}
|
||||
|
@ -38,10 +38,12 @@ class Rule:
|
||||
|
||||
def __init__(self, *checkers: Union[T_RuleChecker, Dependent[bool]]) -> None:
|
||||
self.checkers: Set[Dependent[bool]] = {
|
||||
checker
|
||||
if isinstance(checker, Dependent)
|
||||
else Dependent[bool].parse(
|
||||
call=checker, allow_types=self.HANDLER_PARAM_TYPES
|
||||
(
|
||||
checker
|
||||
if isinstance(checker, Dependent)
|
||||
else Dependent[bool].parse(
|
||||
call=checker, allow_types=self.HANDLER_PARAM_TYPES
|
||||
)
|
||||
)
|
||||
for checker in checkers
|
||||
}
|
||||
|
@ -170,20 +170,17 @@ def _regex_str(
|
||||
|
||||
|
||||
@overload
|
||||
def RegexStr(__group: Literal[0] = 0) -> str:
|
||||
...
|
||||
def RegexStr(__group: Literal[0] = 0) -> str: ...
|
||||
|
||||
|
||||
@overload
|
||||
def RegexStr(__group: Union[str, int]) -> Union[str, Any]:
|
||||
...
|
||||
def RegexStr(__group: Union[str, int]) -> Union[str, Any]: ...
|
||||
|
||||
|
||||
@overload
|
||||
def RegexStr(
|
||||
__group1: Union[str, int], __group2: Union[str, int], *groups: Union[str, int]
|
||||
) -> Tuple[Union[str, Any], ...]:
|
||||
...
|
||||
) -> Tuple[Union[str, Any], ...]: ...
|
||||
|
||||
|
||||
def RegexStr(*groups: Union[str, int]) -> Union[str, Tuple[Union[str, Any], ...], Any]:
|
||||
|
@ -213,8 +213,10 @@ def inherit_supported_adapters(*names: str) -> Optional[Set[str]]:
|
||||
)
|
||||
|
||||
return final_supported and {
|
||||
f"nonebot.adapters.{adapter_name[1:]}"
|
||||
if adapter_name.startswith("~")
|
||||
else adapter_name
|
||||
(
|
||||
f"nonebot.adapters.{adapter_name[1:]}"
|
||||
if adapter_name.startswith("~")
|
||||
else adapter_name
|
||||
)
|
||||
for adapter_name in final_supported
|
||||
}
|
||||
|
@ -460,45 +460,38 @@ class ArgumentParser(ArgParser):
|
||||
self,
|
||||
args: Optional[Sequence[Union[str, MessageSegment]]] = None,
|
||||
namespace: None = None,
|
||||
) -> Tuple[Namespace, List[Union[str, MessageSegment]]]:
|
||||
...
|
||||
) -> Tuple[Namespace, List[Union[str, MessageSegment]]]: ...
|
||||
|
||||
@overload
|
||||
def parse_known_args(
|
||||
self, args: Optional[Sequence[Union[str, MessageSegment]]], namespace: T
|
||||
) -> Tuple[T, List[Union[str, MessageSegment]]]:
|
||||
...
|
||||
) -> Tuple[T, List[Union[str, MessageSegment]]]: ...
|
||||
|
||||
@overload
|
||||
def parse_known_args(
|
||||
self, *, namespace: T
|
||||
) -> Tuple[T, List[Union[str, MessageSegment]]]:
|
||||
...
|
||||
) -> Tuple[T, List[Union[str, MessageSegment]]]: ...
|
||||
|
||||
def parse_known_args(
|
||||
self,
|
||||
args: Optional[Sequence[Union[str, MessageSegment]]] = None,
|
||||
namespace: Optional[T] = None,
|
||||
) -> Tuple[Union[Namespace, T], List[Union[str, MessageSegment]]]:
|
||||
...
|
||||
) -> Tuple[Union[Namespace, T], List[Union[str, MessageSegment]]]: ...
|
||||
|
||||
@overload
|
||||
def parse_args(
|
||||
self,
|
||||
args: Optional[Sequence[Union[str, MessageSegment]]] = None,
|
||||
namespace: None = None,
|
||||
) -> Namespace:
|
||||
...
|
||||
) -> Namespace: ...
|
||||
|
||||
@overload
|
||||
def parse_args(
|
||||
self, args: Optional[Sequence[Union[str, MessageSegment]]], namespace: T
|
||||
) -> T:
|
||||
...
|
||||
) -> T: ...
|
||||
|
||||
@overload
|
||||
def parse_args(self, *, namespace: T) -> T:
|
||||
...
|
||||
def parse_args(self, *, namespace: T) -> T: ...
|
||||
|
||||
def parse_args(
|
||||
self,
|
||||
|
@ -13,6 +13,7 @@ FrontMatter:
|
||||
import sys
|
||||
import types
|
||||
import warnings
|
||||
import contextlib
|
||||
import typing as t
|
||||
import typing_extensions as t_ext
|
||||
from typing import TYPE_CHECKING, TypeVar
|
||||
@ -71,6 +72,13 @@ def all_literal_values(type_: t.Type[t.Any]) -> t.List[t.Any]:
|
||||
return [x for value in _literal_values(type_) for x in all_literal_values(value)]
|
||||
|
||||
|
||||
def origin_is_annotated(origin: t.Optional[t.Type[t.Any]]) -> bool:
|
||||
"""判断是否是 Annotated 类型"""
|
||||
with contextlib.suppress(TypeError):
|
||||
return origin is not None and issubclass(origin, t_ext.Annotated)
|
||||
return False
|
||||
|
||||
|
||||
NONE_TYPES = {None, type(None), t.Literal[None], t_ext.Literal[None]}
|
||||
if sys.version_info >= (3, 10):
|
||||
NONE_TYPES.add(types.NoneType)
|
||||
|
@ -218,8 +218,7 @@ async def run_coro_with_catch(
|
||||
coro: Coroutine[Any, Any, T],
|
||||
exc: Tuple[Type[Exception], ...],
|
||||
return_on_err: None = None,
|
||||
) -> Union[T, None]:
|
||||
...
|
||||
) -> Union[T, None]: ...
|
||||
|
||||
|
||||
@overload
|
||||
@ -227,8 +226,7 @@ async def run_coro_with_catch(
|
||||
coro: Coroutine[Any, Any, T],
|
||||
exc: Tuple[Type[Exception], ...],
|
||||
return_on_err: R,
|
||||
) -> Union[T, R]:
|
||||
...
|
||||
) -> Union[T, R]: ...
|
||||
|
||||
|
||||
async def run_coro_with_catch(
|
||||
|
Reference in New Issue
Block a user