diff --git a/nonebot/dependencies/__init__.py b/nonebot/dependencies/__init__.py deleted file mode 100644 index 95e6a040..00000000 --- a/nonebot/dependencies/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .models import Depends as Depends -from .utils import get_dependent as get_dependent diff --git a/nonebot/dependencies/models.py b/nonebot/dependencies/models.py deleted file mode 100644 index 4232ccec..00000000 --- a/nonebot/dependencies/models.py +++ /dev/null @@ -1,89 +0,0 @@ -from enum import Enum -from typing import Any, List, Callable, Optional - -from pydantic.fields import Required, FieldInfo, ModelField - -from nonebot.utils import get_name - - -class Depends: - - def __init__(self, - dependency: Optional[Callable[..., Any]] = None, - *, - use_cache: bool = True) -> None: - self.dependency = dependency - self.use_cache = use_cache - - def __repr__(self) -> str: - dep = get_name(self.dependency) - cache = "" if self.use_cache else ", use_cache=False" - return f"{self.__class__.__name__}({dep}{cache})" - - -class Dependent: - - def __init__(self, - *, - func: Optional[Callable[..., Any]] = None, - name: Optional[str] = None, - bot_param: Optional[ModelField] = None, - event_param: Optional[ModelField] = None, - state_param: Optional[ModelField] = None, - matcher_param: Optional[ModelField] = None, - simple_params: Optional[List[ModelField]] = None, - dependencies: Optional[List["Dependent"]] = None, - use_cache: bool = True) -> None: - self.func = func - self.name = name - self.bot_param = bot_param - self.event_param = event_param - self.state_param = state_param - self.matcher_param = matcher_param - self.simple_params = simple_params or [] - self.dependencies = dependencies or [] - self.use_cache = use_cache - self.cache_key = (self.func,) - - -class ParamTypes(Enum): - BOT = "bot" - EVENT = "event" - STATE = "state" - MATCHER = "matcher" - SIMPLE = "simple" - - -class Param(FieldInfo): - in_: ParamTypes - - def __init__(self, default: Any): - super().__init__(default=default) - - def __repr__(self) -> str: - return f"{self.__class__.__name__}" - - -class BotParam(Param): - in_ = ParamTypes.BOT - - -class EventParam(Param): - in_ = ParamTypes.EVENT - - -class StateParam(Param): - in_ = ParamTypes.STATE - - -class MatcherParam(Param): - in_ = ParamTypes.MATCHER - - -class SimpleParam(Param): - in_ = ParamTypes.SIMPLE - - def __init__(self, default: Any): - if default is Required: - raise ValueError("SimpleParam should be given a default value") - super().__init__(default) diff --git a/nonebot/dependencies/utils.py b/nonebot/dependencies/utils.py deleted file mode 100644 index 9503fbf1..00000000 --- a/nonebot/dependencies/utils.py +++ /dev/null @@ -1,150 +0,0 @@ -import inspect -from typing import Any, Dict, Type, Union, Callable, Optional, ForwardRef - -from pydantic import BaseConfig -from pydantic.class_validators import Validator -from pydantic.typing import evaluate_forwardref -from pydantic.schema import get_annotation_from_field_info -from pydantic.fields import Required, FieldInfo, ModelField, UndefinedType - -from .models import Param, Depends, Dependent, ParamTypes, SimpleParam - - -def get_typed_signature(call: Callable[..., Any]) -> inspect.Signature: - signature = inspect.signature(call) - globalns = getattr(call, "__globals__", {}) - typed_params = [ - inspect.Parameter( - name=param.name, - kind=param.kind, - default=param.default, - annotation=get_typed_annotation(param, globalns), - ) for param in signature.parameters.values() - ] - typed_signature = inspect.Signature(typed_params) - return typed_signature - - -def get_typed_annotation(param: inspect.Parameter, globalns: Dict[str, - Any]) -> Any: - annotation = param.annotation - if isinstance(annotation, str): - annotation = ForwardRef(annotation) - annotation = evaluate_forwardref(annotation, globalns, globalns) - return annotation - - -def get_param_sub_dependent(*, param: inspect.Parameter) -> Dependent: - depends: Depends = param.default - if depends.dependency: - dependency = depends.dependency - else: - dependency = param.annotation - return get_sub_dependant( - depends=depends, - dependency=dependency, - name=param.name, - ) - - -def get_parameterless_sub_dependant(*, depends: Depends) -> Dependent: - assert callable( - depends.dependency - ), "A parameter-less dependency must have a callable dependency" - return get_sub_dependant(depends=depends, dependency=depends.dependency) - - -def get_sub_dependant( - *, - depends: Depends, - dependency: Callable[..., Any], - name: Optional[str] = None, -) -> Dependent: - sub_dependant = get_dependent( - func=dependency, - name=name, - use_cache=depends.use_cache, - ) - return sub_dependant - - -def get_dependent(*, - func: Callable[..., Any], - name: Optional[str] = None, - use_cache: bool = True) -> Dependent: - signature = get_typed_signature(func) - params = signature.parameters - dependent = Dependent(func=func, name=name, use_cache=use_cache) - for param_name, param in params.items(): - if isinstance(param.default, Depends): - sub_dependent = get_param_sub_dependent(param=param) - dependent.dependencies.append(sub_dependent) - continue - param_field = get_param_field(param=param, - param_name=param_name, - default_field_info=SimpleParam) - - return dependent - - -def get_param_field(*, - param: inspect.Parameter, - param_name: str, - default_field_info: Type[Param] = Param, - force_type: Optional[ParamTypes] = None, - ignore_default: bool = False) -> ModelField: - default_value = Required - if param.default != param.empty and not ignore_default: - default_value = param.default - if isinstance(default_value, FieldInfo): - field_info = default_value - default_value = field_info.default - if (isinstance(field_info, Param) and - getattr(field_info, "in_", None) is None): - field_info.in_ = default_field_info.in_ - if force_type: - field_info.in_ = force_type # type: ignore - else: - field_info = default_field_info(default_value) - required: bool = default_value == Required - annotation: Any = Any - if param.annotation != param.empty: - annotation = param.annotation - annotation = get_annotation_from_field_info(annotation, field_info, - param_name) - if not field_info.alias and getattr(field_info, "convert_underscores", - None): - alias = param.name.replace("_", "-") - else: - alias = field_info.alias or param.name - field = create_field( - name=param.name, - type_=annotation, - default=None if required else default_value, - alias=alias, - required=required, - field_info=field_info, - ) - # field.required = required - - return field - - -def create_field(name: str, - type_: Type[Any], - class_validators: Optional[Dict[str, Validator]] = None, - default: Optional[Any] = None, - required: Union[bool, UndefinedType] = False, - model_config: Type[BaseConfig] = BaseConfig, - field_info: Optional[FieldInfo] = None, - alias: Optional[str] = None) -> ModelField: - class_validators = class_validators or {} - field_info = field_info or FieldInfo(None) - return ModelField(name=name, - type_=type_, - class_validators=class_validators, - model_config=model_config, - default=default, - required=required, - alias=alias, - field_info=field_info) diff --git a/nonebot/handler.py b/nonebot/handler.py deleted file mode 100644 index ee31d4cf..00000000 --- a/nonebot/handler.py +++ /dev/null @@ -1,27 +0,0 @@ -""" -事件处理函数 -============ - -该模块实现事件处理函数的封装,以实现动态参数等功能。 -""" - -import inspect -from typing import Optional - -from pydantic.typing import evaluate_forwardref - -from nonebot.utils import get_name -from nonebot.typing import T_Handler - - -class Handler: - """事件处理函数类""" - - def __init__(self, func: T_Handler, *, name: Optional[str] = None): - """装饰事件处理函数以便根据动态参数运行""" - self.func: T_Handler = func - """ - :类型: ``T_Handler`` - :说明: 事件处理函数 - """ - self.name = get_name(func) if name is None else name diff --git a/nonebot/processor/__init__.py b/nonebot/processor/__init__.py new file mode 100644 index 00000000..73b86bbc --- /dev/null +++ b/nonebot/processor/__init__.py @@ -0,0 +1,77 @@ +import inspect +from typing import Any, Callable, Optional + +from .models import Dependent +from .models import Depends as Depends +from nonebot.adapters import Bot, Event +from .utils import get_typed_signature, generic_check_issubclass + + +def get_param_sub_dependent(*, param: inspect.Parameter) -> Dependent: + depends: Depends = param.default + if depends.dependency: + dependency = depends.dependency + else: + dependency = param.annotation + return get_sub_dependant( + depends=depends, + dependency=dependency, + name=param.name, + ) + + +def get_parameterless_sub_dependant(*, depends: Depends) -> Dependent: + assert callable( + depends.dependency + ), "A parameter-less dependency must have a callable dependency" + return get_sub_dependant(depends=depends, dependency=depends.dependency) + + +def get_sub_dependant( + *, + depends: Depends, + dependency: Callable[..., Any], + name: Optional[str] = None, +) -> Dependent: + sub_dependant = get_dependent( + func=dependency, + name=name, + use_cache=depends.use_cache, + ) + return sub_dependant + + +def get_dependent(*, + func: Callable[..., Any], + name: Optional[str] = None, + use_cache: bool = True) -> Dependent: + signature = get_typed_signature(func) + params = signature.parameters + dependent = Dependent(func=func, name=name, use_cache=use_cache) + for param_name, param in params.items(): + if isinstance(param.default, Depends): + sub_dependent = get_param_sub_dependent(param=param) + dependent.dependencies.append(sub_dependent) + continue + + if generic_check_issubclass(param.annotation, Bot): + dependent.bot_param_name = param_name + continue + elif generic_check_issubclass(param.annotation, Event): + dependent.event_param_name = param_name + continue + elif generic_check_issubclass(param.annotation, dict): + dependent.state_param_name = param_name + continue + elif generic_check_issubclass(param.annotation, Matcher): + dependent.matcher_param_name = param_name + continue + + raise ValueError( + f"Unknown parameter {param_name} with type {param.annotation}") + + return dependent + + +from .handler import Handler as Handler +from .matcher import Matcher as Matcher diff --git a/nonebot/processor/handler.py b/nonebot/processor/handler.py new file mode 100644 index 00000000..66366cb7 --- /dev/null +++ b/nonebot/processor/handler.py @@ -0,0 +1,43 @@ +""" +事件处理函数 +============ + +该模块实现事件处理函数的封装,以实现动态参数等功能。 +""" +from typing import TYPE_CHECKING, List, Optional + +from .models import Depends +from nonebot.utils import get_name +from nonebot.typing import T_State, T_Handler +from . import get_dependent, get_parameterless_sub_dependant + +if TYPE_CHECKING: + from .matcher import Matcher + from nonebot.adapters import Bot, Event + + +class Handler: + """事件处理函数类""" + + def __init__(self, + func: T_Handler, + *, + name: Optional[str] = None, + dependencies: Optional[List[Depends]] = None): + """装饰事件处理函数以便根据动态参数运行""" + self.func: T_Handler = func + """ + :类型: ``T_Handler`` + :说明: 事件处理函数 + """ + self.name = get_name(func) if name is None else name + + self.dependencies = dependencies or [] + self.dependent = get_dependent(func=func) + for depends in self.dependencies[::-1]: + self.dependent.dependencies.insert( + 0, get_parameterless_sub_dependant(depends=depends)) + + def __call__(self, bot: Bot, event: Event, state: T_State, + matcher: "Matcher"): + ... diff --git a/nonebot/matcher.py b/nonebot/processor/matcher.py similarity index 99% rename from nonebot/matcher.py rename to nonebot/processor/matcher.py index b8a874f2..256c787d 100644 --- a/nonebot/matcher.py +++ b/nonebot/processor/matcher.py @@ -13,9 +13,9 @@ from collections import defaultdict from typing import (TYPE_CHECKING, Any, Dict, List, Type, Union, Callable, NoReturn, Optional) +from .handler import Handler from nonebot.rule import Rule from nonebot.log import logger -from nonebot.handler import Handler from nonebot.adapters import MessageTemplate from nonebot.permission import USER, Permission from nonebot.exception import (PausedException, StopPropagation, diff --git a/nonebot/processor/models.py b/nonebot/processor/models.py new file mode 100644 index 00000000..14af1699 --- /dev/null +++ b/nonebot/processor/models.py @@ -0,0 +1,41 @@ +from typing import Any, List, Callable, Optional + +from nonebot.utils import get_name + + +class Depends: + + def __init__(self, + dependency: Optional[Callable[..., Any]] = None, + *, + use_cache: bool = True) -> None: + self.dependency = dependency + self.use_cache = use_cache + + def __repr__(self) -> str: + dep = get_name(self.dependency) + cache = "" if self.use_cache else ", use_cache=False" + return f"{self.__class__.__name__}({dep}{cache})" + + +class Dependent: + + def __init__(self, + *, + func: Optional[Callable[..., Any]] = None, + name: Optional[str] = None, + bot_param_name: Optional[str] = None, + event_param_name: Optional[str] = None, + state_param_name: Optional[str] = None, + matcher_param_name: Optional[str] = None, + dependencies: Optional[List["Dependent"]] = None, + use_cache: bool = True) -> None: + self.func = func + self.name = name + self.bot_param_name = bot_param_name + self.event_param_name = event_param_name + self.state_param_name = state_param_name + self.matcher_param_name = matcher_param_name + self.dependencies = dependencies or [] + self.use_cache = use_cache + self.cache_key = (self.func,) diff --git a/nonebot/processor/utils.py b/nonebot/processor/utils.py new file mode 100644 index 00000000..7c3729da --- /dev/null +++ b/nonebot/processor/utils.py @@ -0,0 +1,43 @@ +import inspect +from typing import Any, Dict, Type, Tuple, Union, Callable + +from pydantic.typing import (ForwardRef, GenericAlias, get_args, get_origin, + evaluate_forwardref) + + +def get_typed_signature(call: Callable[..., Any]) -> inspect.Signature: + signature = inspect.signature(call) + globalns = getattr(call, "__globals__", {}) + typed_params = [ + inspect.Parameter( + name=param.name, + kind=param.kind, + default=param.default, + annotation=get_typed_annotation(param, globalns), + ) for param in signature.parameters.values() + ] + typed_signature = inspect.Signature(typed_params) + return typed_signature + + +def get_typed_annotation(param: inspect.Parameter, globalns: Dict[str, + Any]) -> Any: + annotation = param.annotation + if isinstance(annotation, str): + annotation = ForwardRef(annotation) + annotation = evaluate_forwardref(annotation, globalns, globalns) + return annotation + + +def generic_check_issubclass( + cls: Any, class_or_tuple: Union[Type[Any], Tuple[Type[Any], + ...]]) -> bool: + try: + return isinstance(cls, type) and issubclass(cls, class_or_tuple) + except TypeError: + if get_origin(cls) is Union: + for type_ in get_args(cls): + if not generic_check_issubclass(type_, class_or_tuple): + return False + return True + raise