mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-17 19:40:44 +00:00
✨ Feature: 嵌套插件名称作用域优化 (#2665)
This commit is contained in:
@ -26,8 +26,8 @@ from . import (
|
||||
_managers,
|
||||
_new_plugin,
|
||||
_revert_plugin,
|
||||
_current_plugin_chain,
|
||||
_module_name_to_plugin_name,
|
||||
_current_plugin,
|
||||
_module_name_to_plugin_id,
|
||||
)
|
||||
|
||||
|
||||
@ -36,7 +36,7 @@ class PluginManager:
|
||||
|
||||
参数:
|
||||
plugins: 独立插件模块名集合。
|
||||
search_path: 插件搜索路径(文件夹)。
|
||||
search_path: 插件搜索路径(文件夹),相对于当前工作目录。
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -49,29 +49,38 @@ class PluginManager:
|
||||
self.search_path: set[str] = set(search_path or [])
|
||||
|
||||
# cache plugins
|
||||
self._third_party_plugin_names: dict[str, str] = {}
|
||||
self._searched_plugin_names: dict[str, Path] = {}
|
||||
self.prepare_plugins()
|
||||
self._third_party_plugin_ids: dict[str, str] = {}
|
||||
self._searched_plugin_ids: dict[str, str] = {}
|
||||
self._prepare_plugins()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"PluginManager(plugins={self.plugins}, search_path={self.search_path})"
|
||||
return f"PluginManager(available_plugins={self.controlled_modules})"
|
||||
|
||||
@property
|
||||
def third_party_plugins(self) -> set[str]:
|
||||
"""返回所有独立插件名称。"""
|
||||
return set(self._third_party_plugin_names.keys())
|
||||
"""返回所有独立插件标识符。"""
|
||||
return set(self._third_party_plugin_ids.keys())
|
||||
|
||||
@property
|
||||
def searched_plugins(self) -> set[str]:
|
||||
"""返回已搜索到的插件名称。"""
|
||||
return set(self._searched_plugin_names.keys())
|
||||
"""返回已搜索到的插件标识符。"""
|
||||
return set(self._searched_plugin_ids.keys())
|
||||
|
||||
@property
|
||||
def available_plugins(self) -> set[str]:
|
||||
"""返回当前插件管理器中可用的插件名称。"""
|
||||
"""返回当前插件管理器中可用的插件标识符。"""
|
||||
return self.third_party_plugins | self.searched_plugins
|
||||
|
||||
def _previous_plugins(self) -> set[str]:
|
||||
@property
|
||||
def controlled_modules(self) -> dict[str, str]:
|
||||
"""返回当前插件管理器中控制的插件标识符与模块路径映射字典。"""
|
||||
return dict(
|
||||
chain(
|
||||
self._third_party_plugin_ids.items(), self._searched_plugin_ids.items()
|
||||
)
|
||||
)
|
||||
|
||||
def _previous_controlled_modules(self) -> dict[str, str]:
|
||||
_pre_managers: list[PluginManager]
|
||||
if self in _managers:
|
||||
_pre_managers = _managers[: _managers.index(self)]
|
||||
@ -79,26 +88,35 @@ class PluginManager:
|
||||
_pre_managers = _managers[:]
|
||||
|
||||
return {
|
||||
*chain.from_iterable(manager.available_plugins for manager in _pre_managers)
|
||||
plugin_id: module_name
|
||||
for manager in _pre_managers
|
||||
for plugin_id, module_name in manager.controlled_modules.items()
|
||||
}
|
||||
|
||||
def prepare_plugins(self) -> set[str]:
|
||||
def _prepare_plugins(self) -> set[str]:
|
||||
"""搜索插件并缓存插件名称。"""
|
||||
# get all previous ready to load plugins
|
||||
previous_plugins = self._previous_plugins()
|
||||
searched_plugins: dict[str, Path] = {}
|
||||
third_party_plugins: dict[str, str] = {}
|
||||
previous_plugin_ids = self._previous_controlled_modules()
|
||||
|
||||
# if self not in global managers, merge self's controlled modules
|
||||
def get_controlled_modules():
|
||||
return (
|
||||
previous_plugin_ids
|
||||
if self in _managers
|
||||
else {**previous_plugin_ids, **self.controlled_modules}
|
||||
)
|
||||
|
||||
# check third party plugins
|
||||
for plugin in self.plugins:
|
||||
name = _module_name_to_plugin_name(plugin)
|
||||
if name in third_party_plugins or name in previous_plugins:
|
||||
plugin_id = _module_name_to_plugin_id(plugin, get_controlled_modules())
|
||||
if (
|
||||
plugin_id in self._third_party_plugin_ids
|
||||
or plugin_id in previous_plugin_ids
|
||||
):
|
||||
raise RuntimeError(
|
||||
f"Plugin already exists: {name}! Check your plugin name"
|
||||
f"Plugin already exists: {plugin_id}! Check your plugin name"
|
||||
)
|
||||
third_party_plugins[name] = plugin
|
||||
|
||||
self._third_party_plugin_names = third_party_plugins
|
||||
self._third_party_plugin_ids[plugin_id] = plugin
|
||||
|
||||
# check plugins in search path
|
||||
for module_info in pkgutil.iter_modules(self.search_path):
|
||||
@ -106,47 +124,55 @@ class PluginManager:
|
||||
if module_info.name.startswith("_"):
|
||||
continue
|
||||
|
||||
if (
|
||||
module_info.name in searched_plugins
|
||||
or module_info.name in previous_plugins
|
||||
or module_info.name in third_party_plugins
|
||||
):
|
||||
raise RuntimeError(
|
||||
f"Plugin already exists: {module_info.name}! Check your plugin name"
|
||||
)
|
||||
|
||||
if not (
|
||||
module_spec := module_info.module_finder.find_spec(
|
||||
module_info.name, None
|
||||
)
|
||||
):
|
||||
continue
|
||||
if not (module_path := module_spec.origin):
|
||||
continue
|
||||
searched_plugins[module_info.name] = Path(module_path).resolve()
|
||||
|
||||
self._searched_plugin_names = searched_plugins
|
||||
if not module_spec.origin:
|
||||
continue
|
||||
|
||||
# get module name from path, pkgutil does not return the actual module name
|
||||
module_path = Path(module_spec.origin).resolve()
|
||||
module_name = path_to_module_name(module_path)
|
||||
plugin_id = _module_name_to_plugin_id(module_name, get_controlled_modules())
|
||||
|
||||
if (
|
||||
plugin_id in previous_plugin_ids
|
||||
or plugin_id in self._third_party_plugin_ids
|
||||
or plugin_id in self._searched_plugin_ids
|
||||
):
|
||||
raise RuntimeError(
|
||||
f"Plugin already exists: {plugin_id}! Check your plugin name"
|
||||
)
|
||||
|
||||
self._searched_plugin_ids[plugin_id] = module_name
|
||||
|
||||
return self.available_plugins
|
||||
|
||||
def load_plugin(self, name: str) -> Optional[Plugin]:
|
||||
"""加载指定插件。
|
||||
|
||||
对于独立插件,可以使用完整插件模块名或者插件名称。
|
||||
可以使用完整插件模块名或者插件标识符加载。
|
||||
|
||||
参数:
|
||||
name: 插件名称。
|
||||
name: 插件名称或插件标识符。
|
||||
"""
|
||||
|
||||
try:
|
||||
if name in self.plugins:
|
||||
# load using plugin id
|
||||
if name in self._third_party_plugin_ids:
|
||||
module = importlib.import_module(self._third_party_plugin_ids[name])
|
||||
elif name in self._searched_plugin_ids:
|
||||
module = importlib.import_module(self._searched_plugin_ids[name])
|
||||
# load using module name
|
||||
elif (
|
||||
name in self._third_party_plugin_ids.values()
|
||||
or name in self._searched_plugin_ids.values()
|
||||
):
|
||||
module = importlib.import_module(name)
|
||||
elif name in self._third_party_plugin_names:
|
||||
module = importlib.import_module(self._third_party_plugin_names[name])
|
||||
elif name in self._searched_plugin_names:
|
||||
module = importlib.import_module(
|
||||
path_to_module_name(self._searched_plugin_names[name])
|
||||
)
|
||||
else:
|
||||
raise RuntimeError(f"Plugin not found: {name}! Check your plugin name")
|
||||
|
||||
@ -155,13 +181,13 @@ class PluginManager:
|
||||
) is None or not isinstance(plugin, Plugin):
|
||||
raise RuntimeError(
|
||||
f"Module {module.__name__} is not loaded as a plugin! "
|
||||
"Make sure not to import it before loading."
|
||||
f"Make sure not to import it before loading."
|
||||
)
|
||||
logger.opt(colors=True).success(
|
||||
f'Succeeded to load plugin "<y>{escape_tag(plugin.name)}</y>"'
|
||||
f'Succeeded to load plugin "<y>{escape_tag(plugin.id_)}</y>"'
|
||||
+ (
|
||||
f' from "<m>{escape_tag(plugin.module_name)}</m>"'
|
||||
if plugin.module_name != plugin.name
|
||||
if plugin.module_name != plugin.id_
|
||||
else ""
|
||||
)
|
||||
)
|
||||
@ -193,21 +219,16 @@ class PluginFinder(MetaPathFinder):
|
||||
module_origin = module_spec.origin
|
||||
if not module_origin:
|
||||
return
|
||||
module_path = Path(module_origin).resolve()
|
||||
|
||||
for manager in reversed(_managers):
|
||||
# use path instead of name in case of submodule name conflict
|
||||
if (
|
||||
fullname in manager.plugins
|
||||
or module_path in manager._searched_plugin_names.values()
|
||||
):
|
||||
if fullname in manager.controlled_modules.values():
|
||||
module_spec.loader = PluginLoader(manager, fullname, module_origin)
|
||||
return module_spec
|
||||
return
|
||||
|
||||
|
||||
class PluginLoader(SourceFileLoader):
|
||||
def __init__(self, manager: PluginManager, fullname: str, path) -> None:
|
||||
def __init__(self, manager: PluginManager, fullname: str, path: str) -> None:
|
||||
self.manager = manager
|
||||
self.loaded = False
|
||||
super().__init__(fullname, path)
|
||||
@ -227,17 +248,8 @@ class PluginLoader(SourceFileLoader):
|
||||
plugin = _new_plugin(self.name, module, self.manager)
|
||||
setattr(module, "__plugin__", plugin)
|
||||
|
||||
# detect parent plugin before entering current plugin context
|
||||
parent_plugins = _current_plugin_chain.get()
|
||||
for pre_plugin in reversed(parent_plugins):
|
||||
# ensure parent plugin is declared before current plugin
|
||||
if _managers.index(pre_plugin.manager) < _managers.index(self.manager):
|
||||
plugin.parent_plugin = pre_plugin
|
||||
pre_plugin.sub_plugins.add(plugin)
|
||||
break
|
||||
|
||||
# enter plugin context
|
||||
_plugin_token = _current_plugin_chain.set((*parent_plugins, plugin))
|
||||
_plugin_token = _current_plugin.set(plugin)
|
||||
|
||||
try:
|
||||
super().exec_module(module)
|
||||
@ -246,7 +258,7 @@ class PluginLoader(SourceFileLoader):
|
||||
raise
|
||||
finally:
|
||||
# leave plugin context
|
||||
_current_plugin_chain.reset(_plugin_token)
|
||||
_current_plugin.reset(_plugin_token)
|
||||
|
||||
# get plugin metadata
|
||||
metadata: Optional[PluginMetadata] = getattr(module, "__plugin_meta__", None)
|
||||
|
Reference in New Issue
Block a user