mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-15 10:31:04 +00:00
💥 Remove: 移除 Python 3.8 支持 (#2641)
This commit is contained in:
101
nonebot/utils.py
101
nonebot/utils.py
@ -10,36 +10,23 @@ import json
|
||||
import asyncio
|
||||
import inspect
|
||||
import importlib
|
||||
import contextlib
|
||||
import dataclasses
|
||||
from pathlib import Path
|
||||
from collections import deque
|
||||
from contextvars import copy_context
|
||||
from functools import wraps, partial
|
||||
from contextlib import asynccontextmanager
|
||||
from contextlib import AbstractContextManager, asynccontextmanager
|
||||
from typing_extensions import ParamSpec, get_args, override, get_origin
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
Type,
|
||||
Tuple,
|
||||
Union,
|
||||
Generic,
|
||||
Mapping,
|
||||
TypeVar,
|
||||
Callable,
|
||||
Optional,
|
||||
Sequence,
|
||||
Coroutine,
|
||||
AsyncGenerator,
|
||||
ContextManager,
|
||||
overload,
|
||||
)
|
||||
from collections.abc import Mapping, Sequence, Coroutine, AsyncGenerator
|
||||
from typing import Any, Union, Generic, TypeVar, Callable, Optional, overload
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from nonebot.log import logger
|
||||
from nonebot.typing import (
|
||||
is_none_type,
|
||||
type_has_args,
|
||||
origin_is_union,
|
||||
origin_is_literal,
|
||||
all_literal_values,
|
||||
@ -64,8 +51,8 @@ def escape_tag(s: str) -> str:
|
||||
|
||||
|
||||
def deep_update(
|
||||
mapping: Dict[K, Any], *updating_mappings: Dict[K, Any]
|
||||
) -> Dict[K, Any]:
|
||||
mapping: dict[K, Any], *updating_mappings: dict[K, Any]
|
||||
) -> dict[K, Any]:
|
||||
"""深度更新合并字典"""
|
||||
updated_mapping = mapping.copy()
|
||||
for updating_mapping in updating_mappings:
|
||||
@ -82,7 +69,7 @@ def deep_update(
|
||||
|
||||
|
||||
def lenient_issubclass(
|
||||
cls: Any, class_or_tuple: Union[Type[Any], Tuple[Type[Any], ...]]
|
||||
cls: Any, class_or_tuple: Union[type[Any], tuple[type[Any], ...]]
|
||||
) -> bool:
|
||||
"""检查 cls 是否是 class_or_tuple 中的一个类型子类并忽略类型错误。"""
|
||||
try:
|
||||
@ -92,7 +79,7 @@ def lenient_issubclass(
|
||||
|
||||
|
||||
def generic_check_issubclass(
|
||||
cls: Any, class_or_tuple: Union[Type[Any], Tuple[Type[Any], ...]]
|
||||
cls: Any, class_or_tuple: Union[type[Any], tuple[type[Any], ...]]
|
||||
) -> bool:
|
||||
"""检查 cls 是否是 class_or_tuple 中的一个类型子类。
|
||||
|
||||
@ -106,46 +93,46 @@ def generic_check_issubclass(
|
||||
则会检查其 `__bound__` 或 `__constraints__`
|
||||
是否是 class_or_tuple 中一个类型的子类或 None。
|
||||
"""
|
||||
try:
|
||||
return issubclass(cls, class_or_tuple)
|
||||
except TypeError:
|
||||
origin = get_origin(cls)
|
||||
if origin_is_union(origin):
|
||||
if not type_has_args(cls):
|
||||
with contextlib.suppress(TypeError):
|
||||
return issubclass(cls, class_or_tuple)
|
||||
|
||||
origin = get_origin(cls)
|
||||
if origin_is_union(origin):
|
||||
return all(
|
||||
is_none_type(type_) or generic_check_issubclass(type_, class_or_tuple)
|
||||
for type_ in get_args(cls)
|
||||
)
|
||||
elif origin_is_literal(origin):
|
||||
return all(
|
||||
is_none_type(value) or isinstance(value, class_or_tuple)
|
||||
for value in all_literal_values(cls)
|
||||
)
|
||||
# ensure generic List, Dict can be checked
|
||||
elif origin:
|
||||
# avoid class check error (typing.Final, typing.ClassVar, etc...)
|
||||
try:
|
||||
return issubclass(origin, class_or_tuple)
|
||||
except TypeError:
|
||||
return False
|
||||
elif isinstance(cls, TypeVar):
|
||||
if cls.__constraints__:
|
||||
return all(
|
||||
is_none_type(type_) or generic_check_issubclass(type_, class_or_tuple)
|
||||
for type_ in get_args(cls)
|
||||
for type_ in cls.__constraints__
|
||||
)
|
||||
elif origin_is_literal(origin):
|
||||
return all(
|
||||
is_none_type(value) or isinstance(value, class_or_tuple)
|
||||
for value in all_literal_values(cls)
|
||||
)
|
||||
# ensure generic List, Dict can be checked
|
||||
elif origin:
|
||||
# avoid class check error (typing.Final, typing.ClassVar, etc...)
|
||||
try:
|
||||
return issubclass(origin, class_or_tuple)
|
||||
except TypeError:
|
||||
return False
|
||||
elif isinstance(cls, TypeVar):
|
||||
if cls.__constraints__:
|
||||
return all(
|
||||
is_none_type(type_)
|
||||
or generic_check_issubclass(type_, class_or_tuple)
|
||||
for type_ in cls.__constraints__
|
||||
)
|
||||
elif cls.__bound__:
|
||||
return generic_check_issubclass(cls.__bound__, class_or_tuple)
|
||||
return False
|
||||
elif cls.__bound__:
|
||||
return generic_check_issubclass(cls.__bound__, class_or_tuple)
|
||||
return False
|
||||
|
||||
|
||||
def type_is_complex(type_: Type[Any]) -> bool:
|
||||
def type_is_complex(type_: type[Any]) -> bool:
|
||||
"""检查 type_ 是否是复杂类型"""
|
||||
origin = get_origin(type_)
|
||||
return _type_is_complex_inner(type_) or _type_is_complex_inner(origin)
|
||||
|
||||
|
||||
def _type_is_complex_inner(type_: Optional[Type[Any]]) -> bool:
|
||||
def _type_is_complex_inner(type_: Optional[type[Any]]) -> bool:
|
||||
if lenient_issubclass(type_, (str, bytes)):
|
||||
return False
|
||||
|
||||
@ -200,7 +187,7 @@ def run_sync(call: Callable[P, R]) -> Callable[P, Coroutine[None, None, R]]:
|
||||
|
||||
@asynccontextmanager
|
||||
async def run_sync_ctx_manager(
|
||||
cm: ContextManager[T],
|
||||
cm: AbstractContextManager[T],
|
||||
) -> AsyncGenerator[T, None]:
|
||||
"""一个用于包装 sync context manager 为 async context manager 的执行函数"""
|
||||
try:
|
||||
@ -216,7 +203,7 @@ async def run_sync_ctx_manager(
|
||||
@overload
|
||||
async def run_coro_with_catch(
|
||||
coro: Coroutine[Any, Any, T],
|
||||
exc: Tuple[Type[Exception], ...],
|
||||
exc: tuple[type[Exception], ...],
|
||||
return_on_err: None = None,
|
||||
) -> Union[T, None]: ...
|
||||
|
||||
@ -224,14 +211,14 @@ async def run_coro_with_catch(
|
||||
@overload
|
||||
async def run_coro_with_catch(
|
||||
coro: Coroutine[Any, Any, T],
|
||||
exc: Tuple[Type[Exception], ...],
|
||||
exc: tuple[type[Exception], ...],
|
||||
return_on_err: R,
|
||||
) -> Union[T, R]: ...
|
||||
|
||||
|
||||
async def run_coro_with_catch(
|
||||
coro: Coroutine[Any, Any, T],
|
||||
exc: Tuple[Type[Exception], ...],
|
||||
exc: tuple[type[Exception], ...],
|
||||
return_on_err: Optional[R] = None,
|
||||
) -> Optional[Union[T, R]]:
|
||||
"""运行协程并当遇到指定异常时返回指定值。
|
||||
@ -289,7 +276,7 @@ class classproperty(Generic[T]):
|
||||
def __init__(self, func: Callable[[Any], T]) -> None:
|
||||
self.func = func
|
||||
|
||||
def __get__(self, instance: Any, owner: Optional[Type[Any]] = None) -> T:
|
||||
def __get__(self, instance: Any, owner: Optional[type[Any]] = None) -> T:
|
||||
return self.func(type(instance) if owner is None else owner)
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user