mirror of
				https://github.com/LiteyukiStudio/LiteyukiBot.git
				synced 2025-11-04 05:16:23 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			257 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			257 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import json
 | 
						|
from typing import Optional
 | 
						|
 | 
						|
import aiofiles
 | 
						|
import nonebot.plugin
 | 
						|
from nonebot.adapters import satori
 | 
						|
 | 
						|
from liteyuki.utils import event as event_utils
 | 
						|
from liteyuki.utils.base.data import LiteModel
 | 
						|
from liteyuki.utils.base.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db
 | 
						|
from liteyuki.utils.base.ly_typing import T_MessageEvent
 | 
						|
 | 
						|
__group_data = {}  # 群数据缓存, {group_id: Group}
 | 
						|
__user_data = {}  # 用户数据缓存, {user_id: User}
 | 
						|
__default_enable = {}  # 插件默认启用状态缓存, {plugin_name: bool} static
 | 
						|
__global_enable = {}  # 插件全局启用状态缓存, {plugin_name: bool} dynamic
 | 
						|
 | 
						|
 | 
						|
class PluginTag(LiteModel):
 | 
						|
    label: str
 | 
						|
    color: str = '#000000'
 | 
						|
 | 
						|
 | 
						|
class StorePlugin(LiteModel):
 | 
						|
    name: str
 | 
						|
    desc: str
 | 
						|
    module_name: str  # 插件商店中的模块名不等于本地的模块名,前者是文件夹名,后者是点分割模块名
 | 
						|
    project_link: str = ""
 | 
						|
    homepage: str = ""
 | 
						|
    author: str = ""
 | 
						|
    type: str | None = None
 | 
						|
    version: str | None = ""
 | 
						|
    time: str = ""
 | 
						|
    tags: list[PluginTag] = []
 | 
						|
    is_official: bool = False
 | 
						|
 | 
						|
 | 
						|
def get_plugin_exist(plugin_name: str) -> bool:
 | 
						|
    """
 | 
						|
    获取插件是否存在于加载列表
 | 
						|
    Args:
 | 
						|
        plugin_name:
 | 
						|
 | 
						|
    Returns:
 | 
						|
 | 
						|
    """
 | 
						|
    for plugin in nonebot.plugin.get_loaded_plugins():
 | 
						|
        if plugin.name == plugin_name:
 | 
						|
            return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
async def get_store_plugin(plugin_name: str) -> Optional[StorePlugin]:
 | 
						|
    """
 | 
						|
    获取插件信息
 | 
						|
 | 
						|
    Args:
 | 
						|
        plugin_name (str): 插件模块名
 | 
						|
 | 
						|
    Returns:
 | 
						|
        Optional[StorePlugin]: 插件信息
 | 
						|
    """
 | 
						|
    async with aiofiles.open("data/liteyuki/plugins.json", "r", encoding="utf-8") as f:
 | 
						|
        plugins: list[StorePlugin] = [StorePlugin(**pobj) for pobj in json.loads(await f.read())]
 | 
						|
    for plugin in plugins:
 | 
						|
        if plugin.module_name == plugin_name:
 | 
						|
            return plugin
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
def get_plugin_default_enable(plugin_name: str) -> bool:
 | 
						|
    """
 | 
						|
    获取插件默认启用状态,由插件定义,不存在则默认为启用,优先从缓存中获取
 | 
						|
 | 
						|
    Args:
 | 
						|
        plugin_name (str): 插件模块名
 | 
						|
 | 
						|
    Returns:
 | 
						|
        bool: 插件默认状态
 | 
						|
    """
 | 
						|
    if plugin_name not in __default_enable:
 | 
						|
        plug = nonebot.plugin.get_plugin(plugin_name)
 | 
						|
        default_enable = (plug.metadata.extra.get("default_enable", True) if plug.metadata else True) if plug else True
 | 
						|
        __default_enable[plugin_name] = default_enable
 | 
						|
 | 
						|
    return __default_enable[plugin_name]
 | 
						|
 | 
						|
 | 
						|
def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
 | 
						|
    """
 | 
						|
    获取插件当前会话启用状态
 | 
						|
 | 
						|
    Args:
 | 
						|
        event: 会话事件
 | 
						|
        plugin_name (str): 插件模块名
 | 
						|
 | 
						|
    Returns:
 | 
						|
        bool: 插件当前状态
 | 
						|
    """
 | 
						|
    if isinstance(event, satori.event.Event):
 | 
						|
        if event.guild is not None:
 | 
						|
            message_type = "group"
 | 
						|
        else:
 | 
						|
            message_type = "private"
 | 
						|
    else:
 | 
						|
        message_type = event.message_type
 | 
						|
    if message_type == "group":
 | 
						|
        group_id = str(event.guild.id if isinstance(event, satori.event.Event) else event.group_id)
 | 
						|
        if group_id not in __group_data:
 | 
						|
            group: Group = group_db.where_one(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
 | 
						|
            __group_data[str(group_id)] = group
 | 
						|
 | 
						|
        session = __group_data[group_id]
 | 
						|
    else:
 | 
						|
        # session: User = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
 | 
						|
        user_id = str(event.user.id if isinstance(event, satori.event.Event) else event.user_id)
 | 
						|
        if user_id not in __user_data:
 | 
						|
            user: User = user_db.where_one(User(), "user_id = ?", user_id, default=User(user_id=user_id))
 | 
						|
            __user_data[user_id] = user
 | 
						|
        session = __user_data[user_id]
 | 
						|
    # 默认停用插件在启用列表内表示启用
 | 
						|
    # 默认停用插件不在启用列表内表示停用
 | 
						|
    # 默认启用插件在停用列表内表示停用
 | 
						|
    # 默认启用插件不在停用列表内表示启用
 | 
						|
    default_enable = get_plugin_default_enable(plugin_name)
 | 
						|
    if default_enable:
 | 
						|
        return plugin_name not in session.disabled_plugins
 | 
						|
    else:
 | 
						|
        return plugin_name in session.enabled_plugins
 | 
						|
 | 
						|
 | 
						|
def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: bool):
 | 
						|
    """
 | 
						|
    设置插件会话启用状态,同时更新数据库和缓存
 | 
						|
    Args:
 | 
						|
        event:
 | 
						|
        plugin_name:
 | 
						|
        enable:
 | 
						|
 | 
						|
    Returns:
 | 
						|
 | 
						|
    """
 | 
						|
    if event_utils.get_message_type(event) == "group":
 | 
						|
        session = group_db.where_one(Group(), "group_id = ?", str(event_utils.get_group_id(event)),
 | 
						|
                                     default=Group(group_id=str(event_utils.get_group_id(event))))
 | 
						|
    else:
 | 
						|
        session = user_db.where_one(User(), "user_id = ?", str(event_utils.get_user_id(event)),
 | 
						|
                                    default=User(user_id=str(event_utils.get_user_id(event))))
 | 
						|
    default_enable = get_plugin_default_enable(plugin_name)
 | 
						|
    if default_enable:
 | 
						|
        if enable:
 | 
						|
            session.disabled_plugins.remove(plugin_name)
 | 
						|
        else:
 | 
						|
            session.disabled_plugins.append(plugin_name)
 | 
						|
    else:
 | 
						|
        if enable:
 | 
						|
            session.enabled_plugins.append(plugin_name)
 | 
						|
        else:
 | 
						|
            session.enabled_plugins.remove(plugin_name)
 | 
						|
 | 
						|
    if event_utils.get_message_type(event) == "group":
 | 
						|
        __group_data[str(event_utils.get_group_id(event))] = session
 | 
						|
        print(session)
 | 
						|
        group_db.save(session)
 | 
						|
    else:
 | 
						|
        __user_data[str(event_utils.get_user_id(event))] = session
 | 
						|
        user_db.save(session)
 | 
						|
 | 
						|
 | 
						|
def get_plugin_global_enable(plugin_name: str) -> bool:
 | 
						|
    """
 | 
						|
    获取插件全局启用状态, 优先从缓存中获取
 | 
						|
    Args:
 | 
						|
        plugin_name:
 | 
						|
 | 
						|
    Returns:
 | 
						|
 | 
						|
    """
 | 
						|
    if plugin_name not in __global_enable:
 | 
						|
        plugin = plugin_db.where_one(
 | 
						|
            GlobalPlugin(),
 | 
						|
            "module_name = ?",
 | 
						|
            plugin_name,
 | 
						|
            default=GlobalPlugin(module_name=plugin_name, enabled=True))
 | 
						|
        __global_enable[plugin_name] = plugin.enabled
 | 
						|
 | 
						|
    return __global_enable[plugin_name]
 | 
						|
 | 
						|
 | 
						|
def set_plugin_global_enable(plugin_name: str, enable: bool):
 | 
						|
    """
 | 
						|
    设置插件全局启用状态,同时更新数据库和缓存
 | 
						|
    Args:
 | 
						|
        plugin_name:
 | 
						|
        enable:
 | 
						|
 | 
						|
    Returns:
 | 
						|
 | 
						|
    """
 | 
						|
    plugin = plugin_db.where_one(
 | 
						|
        GlobalPlugin(),
 | 
						|
        "module_name = ?",
 | 
						|
        plugin_name,
 | 
						|
        default=GlobalPlugin(module_name=plugin_name, enabled=True))
 | 
						|
    plugin.enabled = enable
 | 
						|
 | 
						|
    plugin_db.save(plugin)
 | 
						|
    __global_enable[plugin_name] = enable
 | 
						|
 | 
						|
 | 
						|
def get_plugin_can_be_toggle(plugin_name: str) -> bool:
 | 
						|
    """
 | 
						|
    获取插件是否可以被启用/停用
 | 
						|
 | 
						|
    Args:
 | 
						|
        plugin_name (str): 插件模块名
 | 
						|
 | 
						|
    Returns:
 | 
						|
        bool: 插件是否可以被启用/停用
 | 
						|
    """
 | 
						|
    plug = nonebot.plugin.get_plugin(plugin_name)
 | 
						|
    return plug.metadata.extra.get("toggleable", True) if plug and plug.metadata else True
 | 
						|
 | 
						|
 | 
						|
def get_group_enable(group_id: str) -> bool:
 | 
						|
    """
 | 
						|
    获取群组是否启用插机器人
 | 
						|
 | 
						|
    Args:
 | 
						|
        group_id (str): 群组ID
 | 
						|
 | 
						|
    Returns:
 | 
						|
        bool: 群组是否启用插件
 | 
						|
    """
 | 
						|
    group_id = str(group_id)
 | 
						|
    if group_id not in __group_data:
 | 
						|
        group: Group = group_db.where_one(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
 | 
						|
        __group_data[group_id] = group
 | 
						|
 | 
						|
    return __group_data[group_id].enable
 | 
						|
 | 
						|
 | 
						|
def set_group_enable(group_id: str, enable: bool):
 | 
						|
    """
 | 
						|
    设置群组是否启用插机器人
 | 
						|
 | 
						|
    Args:
 | 
						|
        group_id (str): 群组ID
 | 
						|
        enable (bool): 是否启用
 | 
						|
    """
 | 
						|
    group_id = str(group_id)
 | 
						|
    group: Group = group_db.where_one(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
 | 
						|
    group.enable = enable
 | 
						|
 | 
						|
    __group_data[group_id] = group
 | 
						|
    group_db.save(group)
 |