⚗️ add load all plugin function

This commit is contained in:
yanyongyu
2021-02-19 15:15:46 +08:00
parent f26fb9d6fb
commit be674c0efc
6 changed files with 95 additions and 34 deletions

View File

@ -18,6 +18,7 @@
- ``Matchergroup`` => ``nonebot.plugin.MatcherGroup``
- ``load_plugin`` => ``nonebot.plugin.load_plugin``
- ``load_plugins`` => ``nonebot.plugin.load_plugins``
- ``load_all_plugins`` => ``nonebot.plugin.load_all_plugins``
- ``load_builtin_plugins`` => ``nonebot.plugin.load_builtin_plugins``
- ``get_plugin`` => ``nonebot.plugin.get_plugin``
- ``get_loaded_plugins`` => ``nonebot.plugin.get_loaded_plugins``
@ -221,5 +222,5 @@ def run(host: Optional[str] = None,
from nonebot.plugin import on_message, on_notice, on_request, on_metaevent, CommandGroup, MatcherGroup
from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_shell_command, on_regex
from nonebot.plugin import load_plugin, load_plugins, load_builtin_plugins
from nonebot.plugin import load_plugin, load_plugins, load_all_plugins, load_builtin_plugins
from nonebot.plugin import export, require, get_plugin, get_loaded_plugins

View File

@ -948,6 +948,32 @@ class MatcherGroup:
return matcher
def _load_plugin(manager: PluginManager, plugin_name: str) -> Optional[Plugin]:
if plugin_name.startswith("_"):
return None
_tmp_matchers.set(set())
_export.set(Export())
if plugin_name in plugins:
return None
try:
module = manager.load_plugin(plugin_name)
for m in _tmp_matchers.get():
m.module = plugin_name
plugin = Plugin(plugin_name, module, _tmp_matchers.get(), _export.get())
plugins[plugin_name] = plugin
logger.opt(
colors=True).info(f'Succeeded to import "<y>{plugin_name}</y>"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{plugin_name}"</bg #f8bbd0></r>')
return None
def load_plugin(module_path: str) -> Optional[Plugin]:
"""
:说明:
@ -963,34 +989,9 @@ def load_plugin(module_path: str) -> Optional[Plugin]:
- ``Optional[Plugin]``
"""
def _load_plugin(module_path: str) -> Optional[Plugin]:
try:
_tmp_matchers.set(set())
_export.set(Export())
if module_path in plugins:
return plugins[module_path]
elif module_path in sys.modules:
logger.warning(
f"Module {module_path} has been loaded by other plugins! Ignored"
)
return None
module = importlib.import_module(module_path)
for m in _tmp_matchers.get():
m.module = module_path
plugin = Plugin(module_path, module, _tmp_matchers.get(),
_export.get())
plugins[module_path] = plugin
logger.opt(
colors=True).info(f'Succeeded to import "<y>{module_path}</y>"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{module_path}"</bg #f8bbd0></r>'
)
return None
context: Context = copy_context()
return context.run(_load_plugin, module_path)
manager = PluginManager(PLUGIN_NAMESPACE, plugins=[module_path])
return context.run(_load_plugin, manager, module_path)
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
@ -1003,6 +1004,32 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
- ``*plugin_dir: str``: 插件路径
:返回:
- ``Set[Plugin]``
"""
loaded_plugins = set()
manager = PluginManager(PLUGIN_NAMESPACE, search_path=plugin_dir)
for plugin_name in manager.list_plugins():
context: Context = copy_context()
result = context.run(_load_plugin, manager, plugin_name)
if result:
loaded_plugins.add(result)
return loaded_plugins
def load_all_plugins(module_path: Set[str],
plugin_dir: Set[str]) -> Set[Plugin]:
"""
:说明:
导入指定列表中的插件以及指定目录下多个插件,以 ``_`` 开头的插件不会被导入!
:参数:
- ``module_path: Set[str]``: 指定插件集合
- ``plugin_dir: Set[str]``: 指定插件路径集合
:返回:
- ``Set[Plugin]``
@ -1036,7 +1063,7 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
return None
loaded_plugins = set()
manager = PluginManager(PLUGIN_NAMESPACE, search_path=plugin_dir)
manager = PluginManager(PLUGIN_NAMESPACE, module_path, plugin_dir)
for plugin_name in manager.list_plugins():
context: Context = copy_context()
result = context.run(_load_plugin, plugin_name)

View File

@ -176,6 +176,11 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
...
def load_all_plugins(module_path: Set[str],
plugin_dir: Set[str]) -> Set[Plugin]:
...
def load_builtin_plugins(name: str = ...):
...