mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-27 08:11:38 +00:00
🔧 change command and add builtin plugin
This commit is contained in:
@ -10,8 +10,9 @@ from importlib._bootstrap import _load
|
||||
from nonebot.log import logger
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.permission import Permission
|
||||
from nonebot.typing import Handler, RuleChecker
|
||||
from nonebot.rule import Rule, startswith, endswith, command, regex
|
||||
from nonebot.typing import Set, Dict, Type, Tuple, Union, Optional, ModuleType, RuleChecker
|
||||
from nonebot.typing import Set, List, Dict, Type, Tuple, Union, Optional, ModuleType
|
||||
|
||||
plugins: Dict[str, "Plugin"] = {}
|
||||
|
||||
@ -20,10 +21,9 @@ _tmp_matchers: Set[Type[Matcher]] = set()
|
||||
|
||||
class Plugin(object):
|
||||
|
||||
# TODO: store plugin informations
|
||||
def __init__(self, module_path: str, module: ModuleType,
|
||||
def __init__(self, name: str, module: ModuleType,
|
||||
matchers: Set[Type[Matcher]]):
|
||||
self.module_path = module_path
|
||||
self.name = name
|
||||
self.module = module
|
||||
self.matchers = matchers
|
||||
|
||||
@ -31,7 +31,7 @@ class Plugin(object):
|
||||
def on(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
permission: Permission = Permission(),
|
||||
*,
|
||||
handlers: Optional[list] = None,
|
||||
handlers: Optional[List[Handler]] = None,
|
||||
temp: bool = False,
|
||||
priority: int = 1,
|
||||
block: bool = False,
|
||||
@ -50,7 +50,7 @@ def on(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
|
||||
def on_metaevent(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
*,
|
||||
handlers: Optional[list] = None,
|
||||
handlers: Optional[List[Handler]] = None,
|
||||
temp: bool = False,
|
||||
priority: int = 1,
|
||||
block: bool = False,
|
||||
@ -70,7 +70,7 @@ def on_metaevent(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
def on_message(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
permission: Permission = Permission(),
|
||||
*,
|
||||
handlers: Optional[list] = None,
|
||||
handlers: Optional[List[Handler]] = None,
|
||||
temp: bool = False,
|
||||
priority: int = 1,
|
||||
block: bool = True,
|
||||
@ -89,7 +89,7 @@ def on_message(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
|
||||
def on_notice(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
*,
|
||||
handlers: Optional[list] = None,
|
||||
handlers: Optional[List[Handler]] = None,
|
||||
temp: bool = False,
|
||||
priority: int = 1,
|
||||
block: bool = False,
|
||||
@ -108,7 +108,7 @@ def on_notice(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
|
||||
def on_request(rule: Union[Rule, RuleChecker] = Rule(),
|
||||
*,
|
||||
handlers: Optional[list] = None,
|
||||
handlers: Optional[List[Handler]] = None,
|
||||
temp: bool = False,
|
||||
priority: int = 1,
|
||||
block: bool = False,
|
||||
@ -149,9 +149,19 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
|
||||
**kwargs) -> Type[Matcher]:
|
||||
if isinstance(cmd, str):
|
||||
cmd = (cmd,)
|
||||
return on_message(command(cmd) &
|
||||
rule, permission, **kwargs) if rule else on_message(
|
||||
command(cmd), permission, **kwargs)
|
||||
|
||||
async def _strip_cmd(bot, event, state: dict):
|
||||
message = event.message
|
||||
event.message = message.__class__(
|
||||
str(message)[len(state["_prefix"]["raw_command"]):].strip())
|
||||
|
||||
handlers = kwargs.pop("handlers", [])
|
||||
handlers.insert(0, _strip_cmd)
|
||||
|
||||
return on_message(
|
||||
command(cmd) &
|
||||
rule, permission, handlers=handlers, **kwargs) if rule else on_message(
|
||||
command(cmd), permission, handlers=handlers, **kwargs)
|
||||
|
||||
|
||||
def on_regex(pattern: str,
|
||||
@ -167,12 +177,20 @@ def on_regex(pattern: str,
|
||||
def load_plugin(module_path: str) -> Optional[Plugin]:
|
||||
try:
|
||||
_tmp_matchers.clear()
|
||||
if module_path in plugins:
|
||||
return plugins[module_path]
|
||||
elif module_path in sys.modules:
|
||||
logger.warning(
|
||||
f"Module {module_path} has been loaded by other plugins! Ignored"
|
||||
)
|
||||
return
|
||||
module = importlib.import_module(module_path)
|
||||
for m in _tmp_matchers:
|
||||
m.module = module_path
|
||||
plugin = Plugin(module_path, module, _tmp_matchers.copy())
|
||||
plugins[module_path] = plugin
|
||||
logger.opt(colors=True).info(f'Succeeded to import "{module_path}"')
|
||||
logger.opt(
|
||||
colors=True).info(f'Succeeded to import "<y>{module_path}</y>"')
|
||||
return plugin
|
||||
except Exception as e:
|
||||
logger.opt(colors=True, exception=e).error(
|
||||
@ -189,7 +207,11 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
|
||||
continue
|
||||
|
||||
spec = module_info.module_finder.find_spec(name)
|
||||
if spec.name in sys.modules:
|
||||
if spec.name in plugins:
|
||||
continue
|
||||
elif spec.name in sys.modules:
|
||||
logger.warning(
|
||||
f"Module {spec.name} has been loaded by other plugin! Ignored")
|
||||
continue
|
||||
|
||||
try:
|
||||
@ -207,5 +229,9 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
|
||||
return loaded_plugins
|
||||
|
||||
|
||||
def load_builtin_plugins():
|
||||
return load_plugin("nonebot.plugins.base")
|
||||
|
||||
|
||||
def get_loaded_plugins() -> Set[Plugin]:
|
||||
return set(plugins.values())
|
||||
|
Reference in New Issue
Block a user