⚗️ change import hook

This commit is contained in:
yanyongyu
2021-02-19 14:58:26 +08:00
parent 6cd6750729
commit f26fb9d6fb
5 changed files with 205 additions and 27 deletions

1119
nonebot/plugin/__init__.py Normal file

File diff suppressed because it is too large Load Diff

407
nonebot/plugin/__init__.pyi Normal file
View File

@ -0,0 +1,407 @@
import re
from types import ModuleType
from contextvars import ContextVar
from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.rule import Rule, ArgumentParser
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
plugins: Dict[str, "Plugin"] = ...
_export: ContextVar["Export"] = ...
_tmp_matchers: ContextVar[Set[Type[Matcher]]] = ...
class Export(dict):
def __call__(self, func, **kwargs):
...
def __setattr__(self, name, value):
...
def __getattr__(self, name):
...
class Plugin(object):
name: str
module: ModuleType
matcher: Set[Type[Matcher]]
export: Export
def on(type: str = ...,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
permission: Optional[Permission] = ...,
*,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_metaevent(
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
*,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = ...,
permission: Optional[Permission] = ...,
*,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = ...,
*,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = ...,
*,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_startswith(
msg: str,
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = ...,
*,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_endswith(msg: str,
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = ...,
*,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_keyword(keywords: Set[str],
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = ...,
*,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
*,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_shell_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
...
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = ...,
*,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def load_plugin(module_path: str) -> Optional[Plugin]:
...
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
...
def load_builtin_plugins(name: str = ...):
...
def get_plugin(name: str) -> Optional[Plugin]:
...
def get_loaded_plugins() -> Set[Plugin]:
...
def export() -> Export:
...
def require(name: str) -> Export:
...
class CommandGroup:
def __init__(self,
cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
permission: Optional[Permission] = ...,
*,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...):
self.basecmd: Tuple[str, ...] = ...
self.base_kwargs: Dict[str, Any] = ...
def command(self,
cmd: Union[str, Tuple[str, ...]],
*,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def shell_command(
self,
cmd: Union[str, Tuple[str, ...]],
*,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
parser: Optional[ArgumentParser] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
class MatcherGroup:
def __init__(self,
*,
type: str = ...,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...):
...
def on(self,
*,
type: str = ...,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_metaevent(
self,
*,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
handlers: Optional[List[T_Handler]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_message(
self,
*,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
permission: Optional[Permission] = None,
handlers: Optional[List[T_Handler]] = None,
temp: bool = False,
priority: int = 1,
block: bool = True,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_notice(
self,
*,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
handlers: Optional[List[T_Handler]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_request(
self,
*,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
handlers: Optional[List[T_Handler]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_startswith(
self,
msg: str,
*,
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_endswith(
self,
msg: str,
*,
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_keyword(
self,
keywords: Set[str],
*,
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_command(
self,
cmd: Union[str, Tuple[str, ...]],
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
*,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_shell_command(
self,
cmd: Union[str, Tuple[str, ...]],
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
parser: Optional[ArgumentParser] = ...,
*,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_regex(
self,
pattern: str,
flags: Union[int, re.RegexFlag] = 0,
*,
rule: Optional[Rule] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...

177
nonebot/plugin/manager.py Normal file
View File

@ -0,0 +1,177 @@
import sys
import uuid
import pkgutil
import importlib
from hashlib import md5
from types import ModuleType
from collections import Counter
from importlib.abc import MetaPathFinder
from importlib.machinery import PathFinder
from typing import Set, List, Optional, Iterable
_internal_space = ModuleType(__name__ + "._internal")
_internal_space.__path__ = [] # type: ignore
sys.modules[_internal_space.__name__] = _internal_space
_manager_stack: List["PluginManager"] = []
class _NamespaceModule(ModuleType):
"""Simple namespace module to store plugins."""
@property
def __path__(self):
return []
def __getattr__(self, name: str):
try:
return super().__getattr__(name) # type: ignore
except AttributeError:
if name.startswith("__"):
raise
raise RuntimeError("Plugin manager not activated!")
class _InternalModule(ModuleType):
"""Internal module for each plugin manager."""
def __init__(self, plugin_manager: "PluginManager"):
super().__init__(
f"{_internal_space.__name__}.{plugin_manager.internal_id}")
self.__plugin_manager__ = plugin_manager
@property
def __path__(self) -> List[str]:
return list(self.__plugin_manager__.search_path)
class PluginManager:
def __init__(self,
namespace: Optional[str] = None,
plugins: Optional[Iterable[str]] = None,
search_path: Optional[Iterable[str]] = None,
*,
id: Optional[str] = None):
self.namespace: Optional[str] = namespace
self.namespace_module: Optional[ModuleType] = self._setup_namespace(
namespace)
self.id: str = id or str(uuid.uuid4())
self.internal_id: str = md5(
((self.namespace or "") + self.id).encode()).hexdigest()
self.internal_module = self._setup_internal_module(self.internal_id)
# simple plugin not in search path
self.plugins: Set[str] = set(plugins or [])
self.search_path: Set[str] = set(search_path or [])
# ensure can be loaded
self.list_plugins()
def _setup_namespace(self,
namespace: Optional[str] = None
) -> Optional[ModuleType]:
if not namespace:
return None
try:
module = importlib.import_module(namespace)
except ImportError:
module = _NamespaceModule(namespace)
if "." in namespace:
parent = importlib.import_module(namespace.rsplit(".", 1)[0])
setattr(parent, namespace.rsplit(".", 1)[1], module)
sys.modules[namespace] = module
return module
def _setup_internal_module(self, internal_id: str) -> ModuleType:
if hasattr(_internal_space, internal_id):
raise RuntimeError("Plugin manager already exists!")
module = _InternalModule(self)
sys.modules[module.__name__] = module
setattr(_internal_space, internal_id, module)
return module
def __enter__(self):
if self in _manager_stack:
raise RuntimeError("Plugin manager already activated!")
_manager_stack.append(self)
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
_manager_stack.pop()
except IndexError:
pass
def search_plugins(self) -> List[str]:
return [
module_info.name
for module_info in pkgutil.iter_modules(self.search_path)
]
def list_plugins(self) -> Set[str]:
_pre_managers: List[PluginManager]
if self in _manager_stack:
_pre_managers = _manager_stack[:_manager_stack.index(self)]
else:
_pre_managers = _manager_stack[:]
_search_path: Set[str] = set()
for manager in _pre_managers:
_search_path |= manager.search_path
if _search_path & self.search_path:
raise RuntimeError("Duplicate plugin search path!")
_search_plugins = self.search_plugins()
c = Counter([*_search_plugins, *self.plugins])
conflict = [name for name, num in c.items() if num > 1]
if conflict:
raise RuntimeError(
f"More than one plugin named {' / '.join(conflict)}!")
return set(_search_plugins) | self.plugins
def load_plugin(self, name) -> ModuleType:
if name in self.plugins:
return importlib.import_module(name)
if "." in name:
raise ValueError("Plugin name cannot contain '.'")
with self:
return importlib.import_module(f"{self.namespace}.{name}")
def load_all_plugins(self) -> List[ModuleType]:
return [self.load_plugin(name) for name in self.list_plugins()]
def _rewrite_module_name(self, module_name) -> Optional[str]:
if module_name == self.namespace:
return self.internal_module.__name__
elif module_name.startswith(self.namespace + "."):
path = module_name.split(".")
length = self.namespace.count(".") + 1
return f"{self.internal_module.__name__}.{'.'.join(path[length:])}"
elif module_name in self.search_plugins():
return f"{self.internal_module.__name__}.{module_name}"
return None
class PluginFinder(MetaPathFinder):
def find_spec(self, fullname: str, path, target):
if _manager_stack:
index = -1
while -index <= len(_manager_stack):
manager = _manager_stack[index]
newname = manager._rewrite_module_name(fullname)
if newname:
spec = PathFinder.find_spec(newname,
list(manager.search_path),
target)
if spec:
return spec
index -= 1
return None
sys.meta_path.insert(0, PluginFinder())