1
0
forked from bot/app

feat: 添加了自动安装插件功能

This commit is contained in:
2024-03-21 12:10:24 +08:00
parent ca997f727a
commit e24c5c912e
8 changed files with 146 additions and 37 deletions

View File

@ -1 +1,13 @@
from src.utils.data import Database, LiteModel
from src.utils.data_manager import plugin_db
LNPM_COMMAND_START = "lnpm"
class InstalledPlugin(LiteModel):
module_name: str
plugin_db.auto_migrate(InstalledPlugin)

View File

@ -1,23 +1,27 @@
import json
import os.path
import shutil
import sys
from io import StringIO
from typing import Optional
import nonebot
from arclet.alconna import Arparma, MultiVar
from nonebot.permission import SUPERUSER
from nonebot.utils import run_sync
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand
import pip
import aiohttp, aiofiles
from typing_extensions import Any
from src.utils.data import LiteModel
from src.utils.language import get_user_lang
from src.utils.message import button, send_markdown
from src.utils.message import Markdown as md, send_markdown
from src.utils.resource import get_res
from src.utils.typing import T_Bot, T_MessageEvent
from .common import *
npm_alc = on_alconna(
Alconna(
"lnpm",
@ -28,17 +32,17 @@ npm_alc = on_alconna(
Subcommand(
"search",
Args["keywords", MultiVar(str)]["page", int, 1],
alias=["s"],
alias=["s", "搜索"],
),
Subcommand(
"install",
Args["plugin_name", str],
alias=["i"],
alias=["i", "安装"],
),
Subcommand(
"remove",
Args["plugin_name", str],
alias=["rm"],
alias=["rm", "移除", "卸载"],
),
),
permission=SUPERUSER
@ -82,20 +86,49 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
elif result.subcommands.get("search"):
keywords: list[str] = result.subcommands["search"].args.get("keywords")
rs = await npm_search(keywords)
max_show = 20
if len(rs):
reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***"
for plugin in rs[:min(10, len(rs))]:
reply += (f"\n{button(ulang.get('npm.install'), 'lnpm install %s' % plugin.module_name)} | **{plugin.name}**\n"
for plugin in rs[:min(max_show, len(rs))]:
btn_install = md.button(ulang.get('npm.install'), 'lnpm install %s' % plugin.module_name)
link_page = md.link(ulang.get('npm.homepage'), plugin.homepage)
reply += (f"\n{btn_install} | **{plugin.name}**\n"
f"\n > **{plugin.desc}**\n"
f"\n > {ulang.get('npm.author')}: {plugin.author} | [🔗{ulang.get('npm.homepage')}]({plugin.homepage})\n\n***\n")
if len(rs) > 10:
reply += (f"\n{ulang.get('npm.too_many_results')}"
f"\n{button(ulang.get('npm.prev_page'), 'lnpm search %s %s' % (' '.join(keywords), 2))} | "
f"{button(ulang.get('npm.next_page'), 'lnpm search %s %s' % (' '.join(keywords), 2))}")
f"\n > {ulang.get('npm.author')}: {plugin.author} | {link_page}\n\n***\n")
if len(rs) > max_show:
reply += f"\n{ulang.get('npm.too_many_results')}"
else:
reply = ulang.get("npm.search_no_result")
await send_markdown(reply, bot, event=event)
elif result.subcommands.get("install"):
plugin_name: str = result.subcommands["install"].args.get("plugin_name")
r, log = npm_install(plugin_name)
log = log.replace("\\", "/")
if r:
nonebot.load_plugin(plugin_name)
installed_plugin = InstalledPlugin(module_name=plugin_name)
store_plugin = await get_store_plugin(plugin_name)
plugin_db.save(installed_plugin)
await send_markdown(
f"**{ulang.get('npm.install_success', NAME=store_plugin.name)}**\n\n"
f"```\n{log}\n```",
bot,
event=event
)
else:
await send_markdown(
f"{ulang.get('npm.install_success', NAME=plugin_name)}\n\n"
f"```\n{log}\n```",
bot,
event=event
)
elif result.subcommands.get("remove"):
plugin_name: str = result.subcommands["remove"].args.get("plugin_name")
await npm_alc.finish(ulang.get("npm.remove_success"))
async def npm_update() -> bool:
"""
@ -107,7 +140,6 @@ async def npm_update() -> bool:
url_list = [
"https://registry.nonebot.dev/plugins.json",
]
# 用aiohttp请求json文件成功就覆盖本地文件否则尝试下一个url
for url in url_list:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
@ -115,7 +147,6 @@ async def npm_update() -> bool:
async with aiofiles.open("data/liteyuki/plugins.json", "wb") as f:
data = await resp.read()
await f.write(data)
return True
return False
@ -150,10 +181,54 @@ async def npm_search(keywords: list[str]) -> list[StorePlugin]:
return results
def install(plugin_name) -> bool:
try:
pip.main(['install', plugin_name])
return True
except Exception as e:
print(e)
return False
async def get_store_plugin(plugin_module_name: str) -> Optional[StorePlugin]:
"""
获取插件信息
Args:
plugin_module_name (str): 插件模块名
Returns:
Optional[StorePlugin]: 插件信息
"""
async with aiofiles.open("data/liteyuki/plugins.json", "r", encoding="utf-8") as f:
plugins: list[StorePlugin] = [StorePlugin(**pobj) for pobj in json.loads(await f.read())]
for plugin in plugins:
if plugin.module_name == plugin_module_name:
return plugin
return None
def npm_install(plugin_module_name) -> tuple[bool, str]:
"""
Args:
plugin_module_name:
Returns:
tuple[bool, str]:
"""
buffer = StringIO()
sys.stdout = buffer
sys.stderr = buffer
mirrors = [
"https://pypi.tuna.tsinghua.edu.cn/simple",
"https://pypi.org/simple",
]
# 使用pip安装包对每个镜像尝试一次成功后返回值
success = False
for mirror in mirrors:
try:
result = pip.main(['install', plugin_module_name, "-i", mirror])
success = result == 0
break
except Exception as e:
success = False
continue
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
return success, buffer.getvalue()

View File

@ -2,7 +2,7 @@ import nonebot.plugin
from nonebot import on_command
from nonebot.permission import SUPERUSER
from src.utils.message import button, send_markdown
from src.utils.message import Markdown as md, send_markdown
from src.utils.typing import T_Bot, T_MessageEvent
from src.utils.language import get_user_lang
@ -17,11 +17,11 @@ async def _(event: T_MessageEvent, bot: T_Bot):
for plugin in nonebot.get_loaded_plugins():
# 检查是否有 metadata 属性
if plugin.metadata:
reply += (f"\n{button(lang.get('npm.help'), 'help %s' % plugin.name, False, False)} "
reply += (f"\n{md.button(lang.get('npm.help'), 'help %s' % plugin.name, False, False)} "
f"**{plugin.metadata.name}**\n"
f"\n > {plugin.metadata.description}\n\n***\n")
else:
reply += (f"\n{button(lang.get('npm.help'), 'help %s' % plugin.name, False, False)} "
reply += (f"\n{md.button(lang.get('npm.help'), 'help %s' % plugin.name, False, False)} "
f"**{plugin.name}**\n"
f"\n > {lang.get('npm.no_description')}\n\n***\n")
await send_markdown(reply, bot, event=event)