mirror of
				https://github.com/LiteyukiStudio/LiteyukiBot.git
				synced 2025-10-31 05:36:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			204 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			204 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import base64
 | ||
| from typing import Any
 | ||
| 
 | ||
| import nonebot
 | ||
| import pip
 | ||
| from git import Repo
 | ||
| from nonebot import Bot, require, get_driver
 | ||
| from nonebot.exception import MockApiException
 | ||
| from nonebot.internal.matcher import Matcher
 | ||
| from nonebot.permission import SUPERUSER
 | ||
| 
 | ||
| from liteyuki.utils.config import config, load_from_yaml
 | ||
| from liteyuki.utils.data_manager import StoredConfig, common_db
 | ||
| from liteyuki.utils.language import get_user_lang
 | ||
| from liteyuki.utils.ly_typing import T_Bot, T_MessageEvent
 | ||
| from liteyuki.utils.message import Markdown as md
 | ||
| from liteyuki.utils.reloader import Reloader
 | ||
| from liteyuki.utils.resource import get_loaded_resource_packs, load_resources
 | ||
| 
 | ||
| require("nonebot_plugin_alconna"), require("nonebot_plugin_htmlrender")
 | ||
| from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma
 | ||
| 
 | ||
| driver = get_driver()
 | ||
| 
 | ||
| markdown_image = common_db.first(StoredConfig(), default=StoredConfig()).config.get("markdown_image", False)
 | ||
| 
 | ||
| @on_alconna(
 | ||
|     command=Alconna(
 | ||
|         "liteecho",
 | ||
|     ),
 | ||
|     permission=SUPERUSER
 | ||
| ).handle()
 | ||
| async def _(bot: T_Bot, matcher: Matcher):
 | ||
|     await matcher.finish(f"Hello, Liteyuki!\nBot {bot.self_id}")
 | ||
| 
 | ||
| 
 | ||
| @on_alconna(
 | ||
|     aliases={"更新轻雪"},
 | ||
|     command=Alconna(
 | ||
|         "update-liteyuki"
 | ||
|     ),
 | ||
|     permission=SUPERUSER
 | ||
| ).handle()
 | ||
| async def _(bot: T_Bot, event: T_MessageEvent):
 | ||
|     # 使用git pull更新
 | ||
|     ulang = get_user_lang(str(event.user_id))
 | ||
|     origins = ["origin", "origin2"]
 | ||
|     repo = Repo(".")
 | ||
| 
 | ||
|     # Get the current HEAD commit
 | ||
|     current_head_commit = repo.head.commit
 | ||
| 
 | ||
|     # Fetch the latest information from the cloud
 | ||
|     repo.remotes.origin.fetch()
 | ||
| 
 | ||
|     # Get the latest HEAD commit
 | ||
|     new_head_commit = repo.commit('origin/main')
 | ||
| 
 | ||
|     # If the new HEAD commit is different from the current HEAD commit, there is a new commit
 | ||
|     diffs = current_head_commit.diff(new_head_commit)
 | ||
|     logs = ""
 | ||
|     for diff in diffs.iter_change_type('M'):
 | ||
|         logs += f"\n{diff.a_path}"
 | ||
| 
 | ||
|     for origin in origins:
 | ||
|         try:
 | ||
|             repo.remotes[origin].pull()
 | ||
|             break
 | ||
|         except Exception as e:
 | ||
|             nonebot.logger.error(f"Pull from {origin} failed: {e}")
 | ||
|     reply = "Liteyuki updated!\n"
 | ||
|     reply += f"```\n{logs}\n```\n"
 | ||
|     btn_restart = md.btn_cmd(ulang.get("liteyuki.restart_now"), "reload-liteyuki")
 | ||
|     pip.main(["install", "-r", "requirements.txt"])
 | ||
|     reply += f"{ulang.get('liteyuki.update_restart', RESTART=btn_restart)}"
 | ||
|     await md.send_md(reply, bot, event=event, at_sender=False)
 | ||
| 
 | ||
| 
 | ||
| @on_alconna(
 | ||
|     aliases={"重启轻雪"},
 | ||
|     command=Alconna(
 | ||
|         "reload-liteyuki"
 | ||
|     ),
 | ||
|     permission=SUPERUSER
 | ||
| ).handle()
 | ||
| async def _(matcher: Matcher):
 | ||
|     await matcher.send("Liteyuki reloading")
 | ||
|     Reloader.reload(3)
 | ||
| 
 | ||
| 
 | ||
| @on_alconna(
 | ||
|     aliases={"配置"},
 | ||
|     command=Alconna(
 | ||
|         "config",
 | ||
|         Subcommand(
 | ||
|             "set",
 | ||
|             Args["key", str]["value", Any],
 | ||
|             alias=["设置"],
 | ||
| 
 | ||
|         ),
 | ||
|         Subcommand(
 | ||
|             "get",
 | ||
|             Args["key", str, None],
 | ||
|             alias=["查询"]
 | ||
|         )
 | ||
|     ),
 | ||
|     permission=SUPERUSER
 | ||
| ).handle()
 | ||
| async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher):
 | ||
|     ulang = get_user_lang(str(event.user_id))
 | ||
|     stored_config: StoredConfig = common_db.first(StoredConfig(), default=StoredConfig())
 | ||
|     if result.subcommands.get("set"):
 | ||
|         key, value = result.subcommands.get("set").args.get("key"), result.subcommands.get("set").args.get("value")
 | ||
|         try:
 | ||
|             value = eval(value)
 | ||
|         except:
 | ||
|             pass
 | ||
|         stored_config.config[key] = value
 | ||
|         common_db.upsert(stored_config)
 | ||
|         await matcher.finish(f"{ulang.get('liteyuki.config_set_success', KEY=key, VAL=value)}")
 | ||
|     elif result.subcommands.get("get"):
 | ||
|         key = result.subcommands.get("get").args.get("key")
 | ||
|         file_config = load_from_yaml("config.yml")
 | ||
|         reply = f"{ulang.get('liteyuki.current_config')}"
 | ||
|         if key:
 | ||
|             reply += f"```dotenv\n{key}={file_config.get(key, stored_config.config.get(key))}\n```"
 | ||
|         else:
 | ||
|             reply = f"{ulang.get('liteyuki.current_config')}"
 | ||
|             reply += f"\n{ulang.get('liteyuki.static_config')}\n```dotenv"
 | ||
|             for k, v in file_config.items():
 | ||
|                 reply += f"\n{k}={v}"
 | ||
|             reply += "\n```"
 | ||
|             if len(stored_config.config) > 0:
 | ||
|                 reply += f"\n{ulang.get('liteyuki.stored_config')}\n```dotenv"
 | ||
|                 for k, v in stored_config.config.items():
 | ||
|                     reply += f"\n{k}={v}"
 | ||
|                 reply += "\n```"
 | ||
|         await md.send_md(reply, bot, event=event)
 | ||
| 
 | ||
| 
 | ||
| @on_alconna(
 | ||
|     aliases={"切换图片模式"},
 | ||
|     command=Alconna(
 | ||
|         "switch-image-mode"
 | ||
|     ),
 | ||
|     permission=SUPERUSER
 | ||
| ).handle()
 | ||
| async def _(event: T_MessageEvent, matcher: Matcher):
 | ||
|     global markdown_image
 | ||
|     # 切换图片模式,False以图片形式发送,True以markdown形式发送
 | ||
|     ulang = get_user_lang(str(event.user_id))
 | ||
|     stored_config: StoredConfig = common_db.first(StoredConfig(), default=StoredConfig())
 | ||
|     stored_config.config["markdown_image"] = not stored_config.config.get("markdown_image", False)
 | ||
|     markdown_image = stored_config.config["markdown_image"]
 | ||
|     common_db.upsert(stored_config)
 | ||
|     await matcher.finish(ulang.get("liteyuki.image_mode_on" if stored_config.config["markdown_image"] else "liteyuki.image_mode_off"))
 | ||
| 
 | ||
| 
 | ||
| @on_alconna(
 | ||
|     command=Alconna(
 | ||
|         "liteyuki-docs",
 | ||
|     ),
 | ||
|     aliases={"轻雪文档"},
 | ||
| ).handle()
 | ||
| async def _(matcher: Matcher):
 | ||
|     matcher.finish("https://bot.liteyuki.icu/usage")
 | ||
| 
 | ||
| # system hook
 | ||
| @Bot.on_calling_api     # 图片模式检测
 | ||
| async def test_for_md_image(bot: T_Bot, api: str, data: dict):
 | ||
|     if api in ["send_msg", "send_private_msg", "send_group_msg"] and markdown_image and data.get("user_id") != bot.self_id:
 | ||
|         if api == "send_msg" and data.get("message_type") == "private" or api == "send_private_msg":
 | ||
|             session_type = "private"
 | ||
|             session_id = data.get("user_id")
 | ||
|         elif api == "send_msg" and data.get("message_type") == "group" or api == "send_group_msg":
 | ||
|             session_type = "group"
 | ||
|             session_id = data.get("group_id")
 | ||
|         else:
 | ||
|             return
 | ||
|         if len(data.get("message", [])) == 1 and data["message"][0].get("type") == "image":
 | ||
|             file: str = data["message"][0].data.get("file")
 | ||
|             # file:// http:// base64://
 | ||
|             if file.startswith("http"):
 | ||
|                 result = await md.send_md(await md.image_async(file), bot, message_type=session_type, session_id=session_id)
 | ||
|             elif file.startswith("file"):
 | ||
|                 file = file.replace("file://", "")
 | ||
|                 result = await md.send_image(open(file, "rb").read(), bot, message_type=session_type, session_id=session_id)
 | ||
|             elif file.startswith("base64"):
 | ||
|                 file_bytes = base64.b64decode(file.replace("base64://", ""))
 | ||
|                 result = await md.send_image(file_bytes, bot, message_type=session_type, session_id=session_id)
 | ||
|             else:
 | ||
|                 return
 | ||
|             raise MockApiException(result=result)
 | ||
| 
 | ||
| 
 | ||
| @driver.on_startup
 | ||
| async def on_startup():
 | ||
|     pass
 | ||
| 
 | ||
| 
 | ||
| @driver.on_shutdown
 | ||
| async def on_shutdown():
 | ||
|     pass
 |