♻️ remove plugin namespace

This commit is contained in:
yanyongyu
2021-11-11 17:33:30 +08:00
parent 08f56db385
commit 2ccbc93e48
9 changed files with 210 additions and 204 deletions

View File

@ -2,20 +2,18 @@ import sys
import pkgutil
import importlib
from pathlib import Path
from itertools import chain
from types import ModuleType
from collections import Counter
from importlib.abc import MetaPathFinder
from importlib.machinery import PathFinder, SourceFileLoader
from typing import Set, List, Union, Iterable, Optional, Sequence
from typing import Set, Dict, List, Union, Iterable, Optional, Sequence
from .export import Export
from . import _current_plugin
from nonebot.log import logger
from nonebot.utils import escape_tag
from .plugin import Plugin, _new_plugin
_manager_stack: List["PluginManager"] = []
from . import _managers, _current_plugin
# TODO
class PluginManager:
def __init__(
@ -27,81 +25,79 @@ class PluginManager:
# simple plugin not in search path
self.plugins: Set[str] = set(plugins or [])
self.search_path: Set[str] = set(search_path or [])
# ensure can be loaded
# cache plugins
self.searched_plugins: Dict[str, Path] = {}
self.list_plugins()
def search_plugins(self) -> List[str]:
def _path_to_module_name(self, path: Path) -> str:
rel_path = path.resolve().relative_to(Path(".").resolve())
if rel_path.stem == "__init__":
return ".".join(rel_path.parts[:-1])
else:
return ".".join(rel_path.parts[:-1] + (rel_path.stem,))
def _previous_plugins(self) -> List[str]:
_pre_managers: List[PluginManager]
if self in _managers:
_pre_managers = _managers[:_managers.index(self)]
else:
_pre_managers = _managers[:]
return [
module_info.name
for module_info in pkgutil.iter_modules(self.search_path)
*chain.from_iterable(
[*manager.plugins, *manager.searched_plugins.keys()]
for manager in _pre_managers)
]
def list_plugins(self) -> Set[str]:
_pre_managers: List[PluginManager]
if self in _manager_stack:
_pre_managers = _manager_stack[:_manager_stack.index(self)]
else:
_pre_managers = _manager_stack[:]
# get all previous ready to load plugins
previous_plugins = self._previous_plugins()
searched_plugins: Dict[str, Path] = {}
_search_path: Set[str] = set()
for manager in _pre_managers:
_search_path |= manager.search_path
if _search_path & self.search_path:
raise RuntimeError("Duplicate plugin search path!")
_search_plugins = self.search_plugins()
c = Counter([*_search_plugins, *self.plugins])
conflict = [name for name, num in c.items() if num > 1]
if conflict:
raise RuntimeError(
f"More than one plugin named {' / '.join(conflict)}!")
return set(_search_plugins) | self.plugins
def load_plugin(self, name) -> ModuleType:
if name in self.plugins:
return importlib.import_module(name)
if "." in name:
raise ValueError("Plugin name cannot contain '.'")
return importlib.import_module(f"{self.namespace}.{name}")
def load_all_plugins(self) -> List[ModuleType]:
return [self.load_plugin(name) for name in self.list_plugins()]
def _rewrite_module_name(self, module_name: str) -> Optional[str]:
prefix = f"{self.internal_module.__name__}."
raw_name = module_name[len(self.namespace) +
1:] if module_name.startswith(self.namespace +
".") else None
# dir plugins
if raw_name and raw_name.split(".")[0] in self.search_plugins():
return f"{prefix}{raw_name}"
# third party plugin or renamed dir plugins
elif module_name in self.plugins or module_name.startswith(prefix):
return module_name
# dir plugins
elif module_name in self.search_plugins():
return f"{prefix}{module_name}"
return None
def _check_absolute_import(self, origin_path: str) -> Optional[str]:
if not self.search_path:
return
paths = set([
*self.search_path,
*(str(Path(path).resolve()) for path in self.search_path)
])
for path in paths:
try:
rel_path = Path(origin_path).relative_to(path)
if rel_path.stem == "__init__":
return f"{self.internal_module.__name__}." + ".".join(
rel_path.parts[:-1])
return f"{self.internal_module.__name__}." + ".".join(
rel_path.parts[:-1] + (rel_path.stem,))
except ValueError:
for module_info in pkgutil.iter_modules(self.search_path):
if module_info.name.startswith("_"):
continue
if module_info.name in searched_plugins.keys(
) or module_info.name in previous_plugins:
raise RuntimeError(
f"Plugin already exists: {module_info.name}! Check your plugin name"
)
module_spec = module_info.module_finder.find_spec(
module_info.name, None)
if not module_spec:
continue
module_path = module_spec.origin
if not module_path:
continue
searched_plugins[module_info.name] = Path(module_path).resolve()
self.searched_plugins = searched_plugins
return self.plugins | set(self.searched_plugins.keys())
def load_plugin(self, name) -> Optional[Plugin]:
try:
if name in self.plugins:
module = importlib.import_module(name)
elif name not in self.searched_plugins:
raise RuntimeError(
f"Plugin not found: {name}! Check your plugin name")
else:
module = importlib.import_module(
self._path_to_module_name(self.searched_plugins[name]))
logger.opt(colors=True).success(
f'Succeeded to import "<y>{escape_tag(name)}</y>"')
return getattr(module, "__plugin__", None)
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{escape_tag(name)}"</bg #f8bbd0></r>'
)
def load_all_plugins(self) -> Set[Plugin]:
return set(
filter(None,
(self.load_plugin(name) for name in self.list_plugins())))
class PluginFinder(MetaPathFinder):
@ -110,28 +106,27 @@ class PluginFinder(MetaPathFinder):
fullname: str,
path: Optional[Sequence[Union[bytes, str]]],
target: Optional[ModuleType] = None):
if _manager_stack:
if _managers:
index = -1
origin_spec = PathFinder.find_spec(fullname, path, target)
while -index <= len(_manager_stack):
manager = _manager_stack[index]
module_spec = PathFinder.find_spec(fullname, path, target)
if not module_spec:
return
module_origin = module_spec.origin
if not module_origin:
return
module_path = Path(module_origin).resolve()
rel_name = None
if origin_spec and origin_spec.origin:
rel_name = manager._check_absolute_import(
origin_spec.origin)
while -index <= len(_managers):
manager = _managers[index]
if fullname in manager.plugins or module_path in manager.searched_plugins.values(
):
module_spec.loader = PluginLoader(manager, fullname,
module_origin)
return module_spec
newname = manager._rewrite_module_name(rel_name or fullname)
if newname:
spec = PathFinder.find_spec(
newname, path or [*manager.search_path, *sys.path],
target)
if spec:
spec.loader = PluginLoader( # type: ignore
manager, newname, spec.origin)
return spec
index -= 1
return None
return
class PluginLoader(SourceFileLoader):
@ -152,20 +147,15 @@ class PluginLoader(SourceFileLoader):
if self.loaded:
return
export = Export()
_export_token = _export.set(export)
plugin = _new_plugin(self.name, module)
parent_plugin = _current_plugin.get()
if parent_plugin:
plugin.parent_plugin = parent_plugin
parent_plugin.sub_plugins.add(plugin)
prefix = self.manager.internal_module.__name__
is_dir_plugin = self.name.startswith(prefix + ".")
module_name = self.name[len(prefix) +
1:] if is_dir_plugin else self.name
_plugin_token = _current_plugin.set(module)
_plugin_token = _current_plugin.set(plugin)
setattr(module, "__export__", export)
setattr(module, "__plugin_name__",
module_name.split(".")[0] if is_dir_plugin else module_name)
setattr(module, "__module_name__", module_name)
setattr(module, "__module_prefix__", prefix if is_dir_plugin else "")
setattr(module, "__plugin__", plugin)
# try:
# super().exec_module(module)
@ -176,7 +166,6 @@ class PluginLoader(SourceFileLoader):
super().exec_module(module)
_current_plugin.reset(_plugin_token)
_export.reset(_export_token)
return