🎨 Apply black formatting

This commit is contained in:
2024-12-14 04:43:03 +08:00
parent 59e0871840
commit a9938d30ed
16 changed files with 361 additions and 82 deletions

View File

@ -0,0 +1,7 @@
"""该功能目前正在开发中,暂时不可用,受影响的文件夹 `plugin`, `plugins`
"""
from .load import *
from .models import *
from .register import *
from .utils import *

View File

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
本模块为工具加载模块
"""
import os
import traceback
from importlib import import_module
from pathlib import Path
from typing import Optional
from nonebot import logger
from .models import Plugin, PluginMetadata
from .utils import path_to_module_name
_plugins: dict[str, Plugin] = {}
__all__ = [
"load_plugin",
"load_plugins",
"_plugins",
]
def get_plugin(name: str) -> Plugin | None:
"""获取插件对象
Args:
name: 插件名称
Returns:
Optional[Plugin]: 插件对象
"""
return _plugins.get(name)
def get_plugins() -> dict[str, Plugin]:
"""获取所有插件
Returns:
dict[str, Plugin]: 插件集合
"""
return _plugins
def load_plugin(module_path: str | Path) -> Optional[Plugin]:
"""加载单个插件,可以是本地插件或是通过 `pip` 安装的插件。
该函数产生的副作用在于将插件加载到 `_plugins` 中。
Args:
module_path: 插件名称 `path.to.your.plugin`
或插件路径 `pathlib.Path(path/to/your/plugin)`
Returns:
Optional[Plugin]: 插件对象
"""
module_path = (
path_to_module_name(Path(module_path))
if isinstance(module_path, Path)
else module_path
)
try:
module = import_module(module_path) # 导入模块对象
plugin = Plugin(
name=module.__name__,
module=module,
module_name=module_path,
)
_plugins[plugin.name] = plugin
plugin.metadata = getattr(module, "__marsho_meta__", None)
if plugin.metadata is None:
logger.opt(colors=True).warning(
f"成功加载小棉插件 <y>{plugin.name}</y>, 但是没有定义元数据"
)
else:
logger.opt(colors=True).success(
f'成功加载小棉插件 <c>"{plugin.metadata.name}"</c>'
)
return plugin
except Exception as e:
logger.opt(colors=True).success(f'加载小棉插件失败 "<r>{module_path}</r>"')
traceback.print_exc()
return None
def load_plugins(*plugin_dirs: str) -> set[Plugin]:
"""导入文件夹下多个插件
参数:
plugin_dir: 文件夹路径
ignore_warning: 是否忽略警告,通常是目录不存在或目录为空
返回:
set[Plugin]: 插件集合
"""
plugins = set()
for plugin_dir in plugin_dirs:
for f in os.listdir(plugin_dir):
path = Path(os.path.join(plugin_dir, f))
module_name = None
if os.path.isfile(path) and f.endswith(".py"):
"""单文件加载"""
module_name = f"{path_to_module_name(Path(plugin_dir))}.{f[:-3]}"
elif os.path.isdir(path) and os.path.exists(
os.path.join(path, "__init__.py")
):
"""包加载"""
module_name = path_to_module_name(path)
if module_name and (plugin := load_plugin(module_name)):
plugins.add(plugin)
return plugins

View File

@ -0,0 +1,69 @@
from types import ModuleType
from typing import Any
from pydantic import BaseModel
class PluginMetadata(BaseModel):
"""
Marsho 插件 对象元数据
Attributes:
----------
name: str
友好名称: 例如Marsho Test
description: str
插件描述
usage: str
插件使用方法
type: str
插件类型
author: str
插件作者
homepage: str
插件主页
extra: dict[str, Any]
额外信息,自定义键值对
"""
name: str
description: str = ""
usage: str = ""
author: str = ""
homepage: str = ""
extra: dict[str, Any] = {}
class Plugin(BaseModel):
"""
存储插件信息
Attributes:
----------
name: str
包名称 例如marsho_test
module: ModuleType
插件模块对象
module_name: str
点分割模块路径 例如a.b.c
metadata: "PluginMeta" | None
"""
name: str
"""包名称 例如marsho_test"""
module: ModuleType
"""插件模块对象"""
module_name: str
"""点分割模块路径 例如a.b.c"""
metadata: PluginMetadata | None = None
""""""
class Config:
arbitrary_types_allowed = True
def __hash__(self) -> int:
return hash(self.name)
def __eq__(self, other: Any) -> bool:
return self.name == other.name

View File

@ -0,0 +1,55 @@
"""此模块用于获取function call中函数定义信息以及注册函数
"""
import inspect
from typing import Any, Callable, Coroutine, TypeAlias
import nonebot
from .utils import is_coroutine_callable
SYNC_FUNCTION_CALL: TypeAlias = Callable[..., str]
ASYNC_FUNCTION_CALL: TypeAlias = Callable[..., Coroutine[str, Any, str]]
FUNCTION_CALL: TypeAlias = SYNC_FUNCTION_CALL | ASYNC_FUNCTION_CALL
_loaded_functions: dict[str, FUNCTION_CALL] = {}
def async_wrapper(func: SYNC_FUNCTION_CALL) -> ASYNC_FUNCTION_CALL:
"""将同步函数包装为异步函数,但是不会真正异步执行,仅用于统一调用及函数签名
Args:
func: 同步函数
Returns:
ASYNC_FUNCTION_CALL: 异步函数
"""
async def wrapper(*args, **kwargs) -> str:
return func(*args, **kwargs)
return wrapper
def function_call(*funcs: FUNCTION_CALL):
"""返回一个装饰器,装饰一个函数, 使其注册为一个可被AI调用的function call函数
Args:
func: 函数对象,要有完整的 Google Style Docstring
Returns:
str: 函数定义信息
"""
for func in funcs:
if module := inspect.getmodule(func):
module_name = module.__name__ + "."
else:
module_name = ""
name = func.__name__
if not is_coroutine_callable(func):
func = async_wrapper(func) # type: ignore
_loaded_functions[name] = func
nonebot.logger.opt(colors=True).info(
f"加载 function call: <c>{module_name}{name}</c>"
)

View File

@ -0,0 +1,34 @@
import inspect
from pathlib import Path
from typing import Any, Callable
def path_to_module_name(path: Path) -> str:
"""
转换路径为模块名
Args:
path: 路径a/b/c/d -> a.b.c.d
Returns:
str: 模块名
"""
rel_path = path.resolve().relative_to(Path.cwd().resolve())
if rel_path.stem == "__init__":
return ".".join(rel_path.parts[:-1])
else:
return ".".join(rel_path.parts[:-1] + (rel_path.stem,))
def is_coroutine_callable(call: Callable[..., Any]) -> bool:
"""
判断是否为async def 函数
Args:
call: 可调用对象
Returns:
bool: 是否为协程可调用对象
"""
if inspect.isroutine(call):
return inspect.iscoroutinefunction(call)
if inspect.isclass(call):
return False
func_ = getattr(call, "__call__", None)
return inspect.iscoroutinefunction(func_)