插件管理添加翻页按钮,支持从内存快速获取当前session状态

This commit is contained in:
2024-04-17 20:45:44 +08:00
parent 0e02af59ca
commit 0e47e3c163
7 changed files with 147 additions and 63 deletions

View File

@ -8,6 +8,11 @@ from liteyuki.utils.base.data import LiteModel
from liteyuki.utils.base.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db
from liteyuki.utils.base.ly_typing import T_MessageEvent
__group_data = {} # 群数据缓存, {group_id: Group}
__user_data = {} # 用户数据缓存, {user_id: User}
__default_enable = {} # 插件默认启用状态缓存, {plugin_name: bool} static
__global_enable = {} # 插件全局启用状态缓存, {plugin_name: bool} dynamic
class PluginTag(LiteModel):
label: str
@ -63,7 +68,7 @@ async def get_store_plugin(plugin_name: str) -> Optional[StorePlugin]:
def get_plugin_default_enable(plugin_name: str) -> bool:
"""
获取插件默认启用状态,由插件定义,不存在则默认为启用
获取插件默认启用状态,由插件定义,不存在则默认为启用,优先从缓存中获取
Args:
plugin_name (str): 插件模块名
@ -71,9 +76,12 @@ def get_plugin_default_enable(plugin_name: str) -> bool:
Returns:
bool: 插件默认状态
"""
plug = nonebot.plugin.get_plugin(plugin_name)
return (plug.metadata.extra.get("default_enable", True)
if plug.metadata else True) if plug else True
if plugin_name not in __default_enable:
plug = nonebot.plugin.get_plugin(plugin_name)
default_enable = (plug.metadata.extra.get("default_enable", True) if plug.metadata else True) if plug else True
__default_enable[plugin_name] = default_enable
return __default_enable[plugin_name]
def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
@ -88,9 +96,19 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
bool: 插件当前状态
"""
if event.message_type == "group":
session: Group = group_db.first(Group(), "group_id = ?", event.group_id, default=Group(group_id=str(event.group_id)))
group_id = str(event.group_id)
if group_id not in __group_data:
group: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
__group_data[str(event.group_id)] = group
session = __group_data[group_id]
else:
session: User = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
# session: User = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
user_id = str(event.user_id)
if user_id not in __user_data:
user: User = user_db.first(User(), "user_id = ?", user_id, default=User(user_id=user_id))
__user_data[user_id] = user
session = __user_data[user_id]
# 默认停用插件在启用列表内表示启用
# 默认停用插件不在启用列表内表示停用
# 默认启用插件在停用列表内表示停用
@ -102,13 +120,81 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
return plugin_name in session.enabled_plugins
def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: bool):
"""
设置插件会话启用状态,同时更新数据库和缓存
Args:
event:
plugin_name:
enable:
Returns:
"""
if event.message_type == "group":
session = group_db.first(Group(), "group_id = ?", str(event.group_id), default=Group(group_id=str(event.group_id)))
else:
session = user_db.first(User(), "user_id = ?", str(event.user_id), default=User(user_id=str(event.user_id)))
default_enable = get_plugin_default_enable(plugin_name)
if default_enable:
if enable:
session.disabled_plugins.remove(plugin_name)
else:
session.disabled_plugins.append(plugin_name)
else:
if enable:
session.enabled_plugins.append(plugin_name)
else:
session.enabled_plugins.remove(plugin_name)
if event.message_type == "group":
__group_data[str(event.group_id)] = session
print(session)
group_db.upsert(session)
else:
__user_data[str(event.user_id)] = session
user_db.upsert(session)
def get_plugin_global_enable(plugin_name: str) -> bool:
nonebot.plugin.get_plugin(plugin_name)
return plugin_db.first(
"""
获取插件全局启用状态, 优先从缓存中获取
Args:
plugin_name:
Returns:
"""
if plugin_name not in __global_enable:
plugin = plugin_db.first(
GlobalPlugin(),
"module_name = ?",
plugin_name,
default=GlobalPlugin(module_name=plugin_name, enabled=True))
__global_enable[plugin_name] = plugin.enabled
return __global_enable[plugin_name]
def set_plugin_global_enable(plugin_name: str, enable: bool):
"""
设置插件全局启用状态,同时更新数据库和缓存
Args:
plugin_name:
enable:
Returns:
"""
plugin = plugin_db.first(
GlobalPlugin(),
"module_name = ?",
plugin_name,
default=GlobalPlugin(module_name=plugin_name, enabled=True)).enabled
default=GlobalPlugin(module_name=plugin_name, enabled=True))
plugin.enabled = enable
plugin_db.upsert(plugin)
__global_enable[plugin_name] = enable
def get_plugin_can_be_toggle(plugin_name: str) -> bool:
@ -135,5 +221,25 @@ def get_group_enable(group_id: str) -> bool:
Returns:
bool: 群组是否启用插件
"""
session: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
return session.enable
group_id = str(group_id)
if group_id not in __group_data:
group: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
__group_data[group_id] = group
return __group_data[group_id].enable
def set_group_enable(group_id: str, enable: bool):
"""
设置群组是否启用插机器人
Args:
group_id (str): 群组ID
enable (bool): 是否启用
"""
group_id = str(group_id)
group: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
group.enable = enable
__group_data[group_id] = group
group_db.upsert(group)

View File

@ -101,8 +101,6 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
session_enable = get_plugin_session_enable(event, plugin_name) # 获取插件当前状态
default_enable = get_plugin_default_enable(plugin_name) # 获取插件默认状态
can_be_toggled = get_plugin_can_be_toggle(plugin_name) # 获取插件是否可以被启用/停用
if not plugin_exist:
@ -123,22 +121,9 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
else:
raise FinishedException(ulang.get("Permission Denied"))
try:
if toggle:
if default_enable:
session.disabled_plugins.remove(plugin_name)
else:
session.enabled_plugins.append(plugin_name)
else:
if default_enable:
session.disabled_plugins.append(plugin_name)
else:
session.enabled_plugins.remove(plugin_name)
if event.message_type == "private":
user_db.upsert(session)
else:
group_db.upsert(session)
set_plugin_session_enable(event, plugin_name, toggle)
except Exception as e:
print(e)
nonebot.logger.error(e)
await npm.finish(
ulang.get(
"npm.toggle_failed",
@ -173,14 +158,8 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
ulang.get("npm.plugin_already", NAME=plugin_name, STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable")))
try:
storePlugin = plugin_db.first(GlobalPlugin(), "module_name = ?", plugin_name, default=GlobalPlugin(module_name=plugin_name))
if toggle:
storePlugin.enabled = True
else:
storePlugin.enabled = False
plugin_db.upsert(storePlugin)
set_plugin_global_enable(plugin_name, toggle)
except Exception as e:
print(e)
await npm.finish(
ulang.get(
"npm.toggle_failed",
@ -348,6 +327,10 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
reply += f" {btn_uninstall} {btn_toggle_global}"
reply += "\n\n***\n"
# 根据页数添加翻页按钮。第一页显示上一页文本而不是按钮,最后一页显示下一页文本而不是按钮
btn_prev = md.btn_cmd(ulang.get("npm.prev_page"), f"npm list {page - 1} {num_per_page}") if page > 1 else ulang.get("npm.prev_page")
btn_next = md.btn_cmd(ulang.get("npm.next_page"), f"npm list {page + 1} {num_per_page}") if page < total else ulang.get("npm.next_page")
reply += f"\n{btn_prev} {page}/{total} {btn_next}"
await md.send_md(reply, bot, event=event)
else:
@ -385,16 +368,12 @@ async def _(bot: T_Bot, event: T_MessageEvent, gm: Matcher, result: Arparma):
await gm.finish(ulang.get("liteyuki.invalid_command"), liteyuki_pass=True)
enabled = get_group_enable(group_id)
print(enabled, to_enable)
if enabled == to_enable:
await gm.finish(ulang.get("liteyuki.group_already", STATUS=ulang.get("npm.enable") if to_enable else ulang.get("npm.disable"), GROUP=group_id),
liteyuki_pass=True)
else:
group: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
if to_enable:
group.enable = True
else:
group.enable = False
group_db.upsert(group)
set_group_enable(group_id, to_enable)
await gm.finish(
ulang.get("liteyuki.group_success", STATUS=ulang.get("npm.enable") if to_enable else ulang.get("npm.disable"), GROUP=group_id),
liteyuki_pass=True
@ -438,10 +417,10 @@ async def _(result: Arparma, matcher: Matcher, event: T_MessageEvent, bot: T_Bot
reply = [
mdc.heading(escape_md(loaded_plugin.metadata.name)),
mdc.quote(mdc.bold(ulang.get("npm.author")) + " " +
mdc.link(store_plugin.author, f"https://github/{store_plugin.author}") if store_plugin.author else "Unknown"),
(mdc.link(store_plugin.author, f"https://github.com/{store_plugin.author}") if store_plugin.author else "Unknown")),
mdc.quote(mdc.bold(ulang.get("npm.description")) + " " + mdc.paragraph(max(loaded_plugin.metadata.description, store_plugin.desc))),
mdc.heading(ulang.get("npm.usage"), 2),
mdc.paragraph(loaded_plugin.metadata.usage),
mdc.paragraph(escape_md(loaded_plugin.metadata.usage)),
]
await md.send_md(compile_md(reply), bot, event=event)
else:

View File

@ -5,7 +5,7 @@ from nonebot import require
from liteyuki.utils.base.data import LiteModel
from liteyuki.utils.base.data_manager import User, user_db
from liteyuki.utils.base.language import Language, get_all_lang, get_user_lang
from liteyuki.utils.base.language import Language, change_user_lang, get_all_lang, get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md
from .const import representative_timezones_list
@ -46,7 +46,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
if result.subcommands.get("set"):
if result.subcommands["set"].args.get("value"):
# 对合法性进行校验后设置
r = set_profile(result.args["key"], result.args["value"])
r = set_profile(result.args["key"], result.args["value"], str(event.user_id))
if r:
user.profile[result.args["key"]] = result.args["value"]
user_db.upsert(user) # 数据库保存
@ -126,9 +126,10 @@ def get_profile_menu(key: str, ulang: Language) -> Optional[str]:
return reply
def set_profile(key: str, value: str) -> bool:
def set_profile(key: str, value: str, user_id: str) -> bool:
"""设置属性使用if分支对每一个合法性进行检查
Args:
user_id:
key:
value:
@ -138,6 +139,7 @@ def set_profile(key: str, value: str) -> bool:
"""
if key == "lang":
if value in get_all_lang():
change_user_lang(user_id, value)
return True
elif key == "timezone":
if value in pytz.all_timezones: