🔀手动Merge轻雪主仓库a77f97f

This commit is contained in:
2024-10-06 02:39:10 +08:00
parent 4cc2ae61db
commit f8b57bfe9a
108 changed files with 3131 additions and 3574 deletions

View File

@ -2,7 +2,6 @@ from nonebot.plugin import PluginMetadata
from .core import *
from .loader import *
from .dev import *
__author__ = "snowykami"
__plugin_meta__ = PluginMetadata(
@ -18,29 +17,6 @@ __plugin_meta__ = PluginMetadata(
from ..utils.base.language import Language, get_default_lang_code
print(
"\033[34m"
+ r"""
▅▅▅▅▅▅▅▅▅▅▅▅▅▅██ ▅▅▅▅▅▅▅▅▅▅▅▅▅▅██ ██ ▅▅▅▅▅▅▅▅▅▅█™
▛ ██ ██ ▛ ██ ███ ██ ██
██ ██ ███████████████ ██ ████████▅ ██
███████████████ ██ ███ ██ ██
██ ██ ▅██████████████▛ ██ ████████████
██ ██ ███ ███
████████████████ ██▅ ███ ██ ▅▅▅▅▅▅▅▅▅▅▅██
███ █ ▜███████ ██ ███ ██ ██ ██ ██
███ ███ █████▛ ██ ██ ██ ██ ██
███ ██ ███ █ ██ ██ ██ ██ ██
███ █████ ██████ ███ ██████████████
商标标记 © 2024 金羿Eilles
版权所有 © 2020-2024 神羽SnowyKami & 金羿Eilles\\
with LiteyukiStudio & TriM Org.
保留所有权利
"""
+ "\033[0m"
)
sys_lang = Language(get_default_lang_code())
nonebot.logger.info(
sys_lang.get("main.current_language", LANG=sys_lang.get("language.name"))

View File

@ -1,44 +1,43 @@
import base64
import time
from typing import Any, AnyStr
from typing import AnyStr
import time
from typing import AnyStr
import nonebot
import pip
from nonebot import Bot, get_driver, require # type: ignore
from nonebot import get_driver, require
from nonebot.adapters import onebot, satori
from nonebot.adapters.onebot.v11 import Message, escape, unescape
from nonebot.exception import MockApiException
from nonebot.adapters.onebot.v11 import Message, unescape
from nonebot.internal.matcher import Matcher
from nonebot.permission import SUPERUSER
# from src.liteyuki.core import Reloader
from src.utils import event as event_utils, satori_utils
from src.utils.base.config import get_config, load_from_yaml
from src.utils.base.data_manager import StoredConfig, TempConfig, common_db
from src.utils.base.config import get_config
from src.utils.base.data_manager import TempConfig, common_db
from src.utils.base.language import get_user_lang
from src.utils.base.ly_typing import T_Bot, T_MessageEvent
from src.utils.message.message import MarkdownMessage as md, broadcast_to_superusers
from .api import update_liteyuki
from ..utils.base import reload
from ..utils.base.ly_function import get_function
from .api import update_liteyuki # type: ignore
from ..utils.base import reload # type: ignore
from ..utils.base.ly_function import get_function # type: ignore
from ..utils.message.html_tool import md_to_pic
require("nonebot_plugin_alconna")
require("nonebot_plugin_apscheduler")
from nonebot_plugin_alconna import (
UniMessage,
on_alconna,
Alconna,
Args,
Subcommand,
Arparma,
MultiVar,
)
from nonebot_plugin_apscheduler import scheduler
driver = get_driver()
markdown_image = common_db.where_one(StoredConfig(), default=StoredConfig()).config.get(
"markdown_image", False
)
driver = get_driver()
@on_alconna(
@ -50,8 +49,8 @@ markdown_image = common_db.where_one(StoredConfig(), default=StoredConfig()).con
).handle()
# Satori OK
async def _(bot: T_Bot, matcher: Matcher, result: Arparma):
if result.main_args.get("text"):
await matcher.finish(Message(unescape(result.main_args.get("text")))) # type: ignore
if text := result.main_args.get("text"):
await matcher.finish(Message(unescape(text)))
else:
await matcher.finish(f"君安!灵温向你问好~\n此机 {bot.self_id}")
@ -72,7 +71,7 @@ async def _(bot: T_Bot, matcher: Matcher, result: Arparma):
aliases={"更新灵温"}, command=Alconna("update-ryoun"), permission=SUPERUSER
).handle()
# Satori OK
async def _(bot: T_Bot, event: T_MessageEvent):
async def _(bot: T_Bot, event: T_MessageEvent, matcher: Matcher):
# 使用git pull更新
ulang = get_user_lang(
@ -84,7 +83,9 @@ async def _(bot: T_Bot, event: T_MessageEvent):
btn_restart = md.btn_cmd(ulang.get("liteyuki.restart_now"), "reload-liteyuki")
pip.main(["install", "-r", "requirements.txt"])
reply += f"{ulang.get('liteyuki.update_restart', RESTART=btn_restart)}"
await md.send_md(reply, bot, event=event, at_sender=False)
# await md.send_md(reply, bot)
img_bytes = await md_to_pic(reply)
await UniMessage.send(UniMessage.image(raw=img_bytes))
@on_alconna(
@ -115,108 +116,9 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
)
common_db.save(temp_data)
reload()
@on_alconna(
aliases={"配置"},
command=Alconna(
"config",
Subcommand(
"set",
Args["key", str]["value", Any],
alias=["设置"],
),
Subcommand("get", Args["key", str, None], alias=["查询", "获取"]),
Subcommand("remove", Args["key", str], alias=["删除"]),
),
permission=SUPERUSER,
).handle()
# Satori OK
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher):
ulang = get_user_lang(str(event_utils.get_user_id(event)))
stored_config: StoredConfig = common_db.where_one(
StoredConfig(), default=StoredConfig()
)
if result.subcommands.get("set"):
key, value = result.subcommands.get("set").args.get(
"key"
), result.subcommands.get("set").args.get("value")
try:
value = eval(value)
except:
pass
stored_config.config[key] = value
common_db.save(stored_config)
await matcher.finish(
f"{ulang.get('liteyuki.config_set_success', KEY=key, VAL=value)}"
)
elif result.subcommands.get("get"):
key = result.subcommands.get("get").args.get("key")
file_config = load_from_yaml("config.yml")
reply = f"{ulang.get('liteyuki.current_config')}"
if key:
reply += f"```dotenv\n{key}={file_config.get(key, stored_config.config.get(key))}\n```"
else:
reply = f"{ulang.get('liteyuki.current_config')}"
reply += f"\n{ulang.get('liteyuki.static_config')}\n```dotenv"
for k, v in file_config.items():
reply += f"\n{k}={v}"
reply += "\n```"
if len(stored_config.config) > 0:
reply += f"\n{ulang.get('liteyuki.stored_config')}\n```dotenv"
for k, v in stored_config.config.items():
reply += f"\n{k}={v} {type(v)}"
reply += "\n```"
await md.send_md(reply, bot, event=event)
elif result.subcommands.get("remove"):
key = result.subcommands.get("remove").args.get("key")
if key in stored_config.config:
stored_config.config.pop(key)
common_db.save(stored_config)
await matcher.finish(
f"{ulang.get('liteyuki.config_remove_success', KEY=key)}"
)
else:
await matcher.finish(f"{ulang.get('liteyuki.invalid_command', TEXT=key)}")
@on_alconna(
aliases={"切换图片模式"}, command=Alconna("switch-image-mode"), permission=SUPERUSER
).handle()
# Satori OK
async def _(event: T_MessageEvent, matcher: Matcher):
global markdown_image
# 切换图片模式False以图片形式发送True以markdown形式发送
ulang = get_user_lang(str(event_utils.get_user_id(event)))
stored_config: StoredConfig = common_db.where_one(
StoredConfig(), default=StoredConfig()
)
stored_config.config["markdown_image"] = not stored_config.config.get(
"markdown_image", False
)
markdown_image = stored_config.config["markdown_image"]
common_db.save(stored_config)
await matcher.finish(
ulang.get(
"liteyuki.image_mode_on"
if stored_config.config["markdown_image"]
else "liteyuki.image_mode_off"
)
)
# @on_alconna(
# command=Alconna(
# "liteyuki-docs",
# ),
# aliases={"轻雪文档"},
# ).handle()
# # Satori OK
# async def _(matcher: Matcher):
# await matcher.finish("https://bot.liteyuki.icu/usage")
@on_alconna(
command=Alconna(
@ -318,62 +220,6 @@ async def _(result: Arparma, bot: T_Bot, event: T_MessageEvent, matcher: Matcher
await matcher.finish(f"API: {api_name}\n\nArgs: \n{args_show}\n\nResult: {result}")
# system hook
@Bot.on_calling_api # 图片模式检测
async def test_for_md_image(bot: T_Bot, api: str, data: dict):
# 截获大图发送转换为markdown发送
if (
api in ["send_msg", "send_private_msg", "send_group_msg"]
and markdown_image
and data.get("user_id") != bot.self_id
):
if (
api == "send_msg"
and data.get("message_type") == "private"
or api == "send_private_msg"
):
session_type = "private"
session_id = data.get("user_id")
elif (
api == "send_msg"
and data.get("message_type") == "group"
or api == "send_group_msg"
):
session_type = "group"
session_id = data.get("group_id")
else:
return
if (
len(data.get("message", [])) == 1
and data["message"][0].get("type") == "image"
):
file: str = data["message"][0].data.get("file")
# file:// http:// base64://
if file.startswith("http"):
result = await md.send_md(
await md.image_async(file),
bot,
message_type=session_type,
session_id=session_id,
)
elif file.startswith("file"):
file = file.replace("file://", "")
result = await md.send_image(
open(file, "rb").read(),
bot,
message_type=session_type,
session_id=session_id,
)
elif file.startswith("base64"):
file_bytes = base64.b64decode(file.replace("base64://", ""))
result = await md.send_image(
file_bytes, bot, message_type=session_type, session_id=session_id
)
else:
return
raise MockApiException(result=result)
@driver.on_startup
async def on_startup():
temp_data = common_db.where_one(TempConfig(), default=TempConfig())
@ -426,6 +272,14 @@ async def _(bot: T_Bot):
group_id=reload_session_id,
message=return_msg,
)
elif isinstance(bot, onebot.v12.Bot):
await bot.send_msg(
message_type=reload_session_type,
user_id=reload_session_id,
group_id=reload_session_id,
message=return_msg,
detail_type="group",
)
else:
await bot.call_api(
"send_msg",
@ -445,7 +299,6 @@ async def every_day_update():
if result:
await broadcast_to_superusers(f"灵温已更新:```\n{logs}\n```")
nonebot.logger.info(f"灵温已更新:{logs}")
# ProcessingManager.restart(3)
reload()
else:
nonebot.logger.info(logs)

View File

@ -1,33 +0,0 @@
import nonebot
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from liteyuki.bot import get_bot
from src.utils.base import reload
from src.utils.base.config import get_config
from src.utils.base.resource import load_resources
if get_config("debug", False):
liteyuki_bot = get_bot()
res_directories = (
"src/resources",
"resources",
)
class ResourceModifiedHandler(FileSystemEventHandler):
"""
Handler for resource file changes
"""
def on_modified(self, event):
nonebot.logger.info(f"资源 {event.src_path} 变更,重载资源包……")
load_resources()
resource_modified_handle = ResourceModifiedHandler()
observer = Observer()
for directory in res_directories:
observer.schedule(resource_modified_handle, directory, recursive=True)
observer.start()

View File

@ -8,15 +8,10 @@ from src.utils.base.data_manager import InstalledPlugin, plugin_db
from src.utils.base.resource import load_resources
from src.utils.message.tools import check_for_package
from liteyuki import get_bot, chan
from nonebot_plugin_apscheduler import scheduler
load_resources()
init_log()
driver = get_driver()
liteyuki_bot = get_bot()
@driver.on_startup
@ -32,7 +27,7 @@ async def load_plugins():
for installed_plugin in installed_plugins:
if not check_for_package(installed_plugin.module_name):
nonebot.logger.error(
f"插件 {installed_plugin.module_name} 在加载列表中但未安装。请使用超管账户对机器人发送 `npm fixup` 以重新安装。"
f"插件 {installed_plugin.module_name} 在加载列表中但未安装。"
)
else:
nonebot.load_plugin(installed_plugin.module_name)

View File

@ -1,3 +0,0 @@
# 说明
此目录为**轻雪插件**目录,非其他插件目录。

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/20 上午5:12
@Author : snowykami
@Email : snowykami@outlook.com
@File : liteyuki_reply.py
@Software: PyCharm
"""
from liteyuki.plugin import PluginMetadata, PluginType
from liteyuki.message.on import on_message
from liteyuki.message.event import MessageEvent
__plugin_meta__ = PluginMetadata(
name="你好轻雪",
type=PluginType.APPLICATION
)
@on_message().handle()
async def _(event: MessageEvent):
if str(event.raw_message) == "你好轻雪":
event.reply("你好呀")

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
#
# @Time : 2024/7/22 上午11:25
# @Author : snowykami
# @Email : snowykami@outlook.com
# @File : asa.py
# @Software: PyCharm
import asyncio
import multiprocessing
from liteyuki.plugin import PluginMetadata, PluginType
from liteyuki import get_bot, logger
from liteyuki.comm.channel import get_channel
__plugin_meta__ = PluginMetadata(
name="生命周期日志",
type=PluginType.SERVICE,
)
bot = get_bot()
@bot.on_before_start
def _():
logger.info("生命周期监控器:准备启动")
@bot.on_before_process_shutdown
def _(name="name"):
logger.info("生命周期监控器:准备停止")
@bot.on_before_process_restart
def _(name="name"):
logger.info("生命周期监控器:准备重启")
@bot.on_after_start
async def _():
logger.info("生命周期监控器:启动完成")

View File

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/11 下午5:24
@Author : snowykami
@Email : snowykami@outlook.com
@File : __init__.py.py
@Software: PyCharm
"""
import nonebot
from liteyuki.utils import IS_MAIN_PROCESS
from liteyuki.plugin import PluginMetadata, PluginType
from .nb_utils import adapter_manager, driver_manager # type: ignore
from liteyuki.log import logger
__plugin_meta__ = PluginMetadata(
name="NoneBot2启动器",
type=PluginType.APPLICATION,
)
def nb_run(*args, **kwargs):
"""
初始化NoneBot并运行在子进程
Args:
**kwargs:
Returns:
"""
# 给子进程传递通道对象
kwargs.update(kwargs.get("nonebot", {})) # nonebot配置优先
nonebot.init(**kwargs)
driver_manager.init(config=kwargs)
adapter_manager.init(kwargs)
adapter_manager.register()
try:
# nonebot.load_plugin("nonebot-plugin-lnpm") # 尝试加载轻雪NoneBot插件加载器Nonebot插件
nonebot.load_plugin("src.liteyuki_main") # 尝试加载轻雪主插件Nonebot插件
except Exception as e:
pass
nonebot.run()
if IS_MAIN_PROCESS:
from liteyuki import get_bot
from .dev_reloader import *
liteyuki = get_bot()
liteyuki.process_manager.add_target(name="nonebot", target=nb_run, args=(), kwargs=liteyuki.config)

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
"""
NoneBot 开发环境重载监视器
"""
import os.path
from liteyuki.dev import observer
from liteyuki import get_bot, logger
from liteyuki.utils import IS_MAIN_PROCESS
from watchdog.events import FileSystemEvent
liteyuki = get_bot()
exclude_extensions = (".pyc", ".pyo")
@observer.on_file_system_event(
directories=("src/nonebot_plugins",),
event_filter=lambda event: not event.src_path.endswith(exclude_extensions) and ("__pycache__" not in event.src_path ) and os.path.isfile(event.src_path)
)
def restart_nonebot_process(event: FileSystemEvent):
logger.debug(f"文件 {event.src_path} 已更新,正在重载 nonebot")
liteyuki.restart_process("nonebot")

View File

@ -0,0 +1,14 @@
from . import (
satori,
onebot
)
def init(config: dict):
onebot.init()
satori.init(config)
def register():
onebot.register()
satori.register()

View File

@ -0,0 +1,12 @@
import nonebot
from nonebot.adapters.onebot import v11, v12
def init():
pass
def register():
driver = nonebot.get_driver()
driver.register_adapter(v11.Adapter)
driver.register_adapter(v12.Adapter)

View File

@ -0,0 +1,26 @@
import json
import os
import nonebot
from nonebot.adapters import satori
def init(config: dict):
if config.get("satori", None) is None:
nonebot.logger.info("未寻得 Satori 设定信息,跳过初始化")
return None
satori_config = config.get("satori")
if not satori_config.get("enable", False):
nonebot.logger.info("Satori 未启用,跳过初始化")
return None
if os.getenv("SATORI_CLIENTS", None) is not None:
nonebot.logger.info("Satori 客户端已在环境变量中配置,跳过初始化")
os.environ["SATORI_CLIENTS"] = json.dumps(satori_config.get("hosts", []), ensure_ascii=False)
config['satori_clients'] = satori_config.get("hosts", [])
return
def register():
if os.getenv("SATORI_CLIENTS", None) is not None:
driver = nonebot.get_driver()
driver.register_adapter(satori.Adapter)

View File

@ -0,0 +1,6 @@
from .auto_set_env import auto_set_env
def init(config: dict):
auto_set_env(config)
return

View File

@ -0,0 +1,20 @@
import os
import dotenv
import nonebot
from .defines import *
def auto_set_env(config: dict):
dotenv.load_dotenv(".env")
if os.getenv("DRIVER", None) is not None:
nonebot.logger.info("Driver 已在环境变量中配置,跳过自动设定")
return
if config.get("satori", {'enable': False}).get("enable", False):
os.environ["DRIVER"] = get_driver_string(ASGI_DRIVER, HTTPX_DRIVER, WEBSOCKETS_DRIVER)
nonebot.logger.info("启用 Satori已设定 Driver 为 ASGI+HTTPX+WEBSOCKETS")
else:
os.environ["DRIVER"] = get_driver_string(ASGI_DRIVER)
nonebot.logger.info("禁用 Satori已设定 Driver 为 ASGI")
return

View File

@ -0,0 +1,17 @@
ASGI_DRIVER = "~fastapi"
HTTPX_DRIVER = "~httpx"
WEBSOCKETS_DRIVER = "~websockets"
def get_driver_string(*argv):
output_string = ""
if ASGI_DRIVER in argv:
output_string += ASGI_DRIVER
for arg in argv:
if arg != ASGI_DRIVER:
output_string = f"{output_string}+{arg}"
return output_string
def get_driver_full_string(*argv):
return f"DRIVER={get_driver_string(argv)}"

View File

@ -0,0 +1,8 @@
from liteyuki.plugin import PluginMetadata, PluginType
__plugin_meta__ = PluginMetadata(
name="进程管理器",
author="snowykami",
description="进程管理器,用于管理子进程",
type=PluginType.SERVICE
)

View File

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/10 下午11:25
@Author : snowykami
@Email : snowykami@outlook.com
@File : register_service.py
@Software: PyCharm
"""
import json
import os.path
import platform
from aiohttp import ClientSession
from git import Repo
from liteyuki.plugin import PluginMetadata, PluginType
from liteyuki import get_bot, logger
__plugin_meta__ = PluginMetadata(
name="注册服务",
type=PluginType.SERVICE
)
liteyuki = get_bot()
commit_hash = Repo(".").head.commit.hexsha
async def register_bot():
url = "https://api.liteyuki.icu/register"
data = {
"name" : "尹灵温|轻雪-睿乐",
"version" : "即时更新",
"hash" : commit_hash,
"version_i": 99,
"python" : f"{platform.python_implementation()} {platform.python_version()}",
"os" : f"{platform.system()} {platform.version()} {platform.machine()}"
}
try:
logger.info("正在等待 Liteyuki 注册服务器…")
async with ClientSession() as session:
async with session.post(url, json=data, timeout=15) as resp:
if resp.status == 200:
data = await resp.json()
if liteyuki_id := data.get("liteyuki_id"):
with open("data/liteyuki/liteyuki.json", "wb") as f:
f.write(json.dumps(data).encode("utf-8"))
logger.success("成功将 {} 注册到 Liteyuki 服务器".format(liteyuki_id))
else:
raise ValueError(f"无法向 Liteyuki 服务器注册:{data}")
else:
raise ValueError(f"无法向 Liteyuki 服务器注册:{resp.status}")
except Exception as e:
logger.warning(f"虽然向 Liteyuki 服务器注册失败,但无关紧要:{e}")
@liteyuki.on_before_start
async def _():
if not os.path.exists("data/liteyuki/liteyuki.json"):
if not os.path.exists("data/liteyuki"):
os.makedirs("data/liteyuki")
await register_bot()

View File

@ -0,0 +1,8 @@
from liteyuki.plugin import PluginMetadata, PluginType
__plugin_meta__ = PluginMetadata(
name="资源加载器",
author="snowykami",
description="进程管理器,用于管理子进程",
type=PluginType.SERVICE
)

View File

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/15 下午11:29
@Author : snowykami
@Email : snowykami@outlook.com
@File : __init__.py.py
@Software: PyCharm
"""
from liteyuki.plugin import PluginMetadata, PluginType
from .divided_by_lifespan import *
__plugin_meta__ = PluginMetadata(
name="计划任务",
description="计划任务插件,一些杂项任务的计划执行。",
type=PluginType.SERVICE
)

View File

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/15 下午11:32
@Author : snowykami
@Email : snowykami@outlook.com
@File : __init__.py
@Software: PyCharm
"""
from .after_start import *

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/15 下午11:32
@Author : snowykami
@Email : snowykami@outlook.com
@File : after_start.py
@Software: PyCharm
"""
import time
from liteyuki import get_bot
from liteyuki.comm.storage import shared_memory
liteyuki = get_bot()
@liteyuki.on_before_start
def save_startup_timestamp():
"""
储存启动的时间戳
"""
startup_timestamp = time.time()
shared_memory.set("startup_timestamp", startup_timestamp)

View File

@ -1,27 +0,0 @@
import multiprocessing
from nonebot.plugin import PluginMetadata
from liteyuki.comm import get_channel
from .rt_guide import *
from .crt_matchers import *
__plugin_meta__ = PluginMetadata(
name="CRT生成工具",
description="一些CRT牌子生成器",
usage="我觉得你应该会用",
type="application",
homepage="https://github.com/snowykami/LiteyukiBot",
extra={
"liteyuki" : True,
"toggleable" : True,
"default_enable": True,
}
)
# chan = get_channel("nonebot-passive")
#
#
# @chan.on_receive()
# async def _(d):
# print("CRT子进程接收到数据", d)
# chan.send("CRT子进程已接收到数据")

View File

@ -1,575 +0,0 @@
import os
import uuid
from typing import Tuple, Union, List
import nonebot
from PIL import Image, ImageFont, ImageDraw
default_color = (255, 255, 255, 255)
default_font = "resources/fonts/MiSans-Semibold.ttf"
def render_canvas_from_json(file: str, background: Image) -> "Canvas":
pass
class BasePanel:
def __init__(self,
uv_size: Tuple[Union[int, float], Union[int, float]] = (1.0, 1.0),
box_size: Tuple[Union[int, float], Union[int, float]] = (1.0, 1.0),
parent_point: Tuple[float, float] = (0.5, 0.5),
point: Tuple[float, float] = (0.5, 0.5)):
"""
:param uv_size: 底面板大小
:param box_size: 子(自身)面板大小
:param parent_point: 底面板锚点
:param point: 子(自身)面板锚点
"""
self.canvas: Canvas | None = None
self.uv_size = uv_size
self.box_size = box_size
self.parent_point = parent_point
self.point = point
self.parent: BasePanel | None = None
self.canvas_box: Tuple[float, float, float, float] = (0.0, 0.0, 1.0, 1.0)
# 此节点在父节点上的盒子
self.box = (
self.parent_point[0] - self.point[0] * self.box_size[0] / self.uv_size[0],
self.parent_point[1] - self.point[1] * self.box_size[1] / self.uv_size[1],
self.parent_point[0] + (1 - self.point[0]) * self.box_size[0] / self.uv_size[0],
self.parent_point[1] + (1 - self.point[1]) * self.box_size[1] / self.uv_size[1]
)
def load(self, only_calculate=False):
"""
将对象写入画布
此处仅作声明
由各子类重写
:return:
"""
self.actual_pos = self.canvas_box
def save_as(self, canvas_box, only_calculate=False):
"""
此函数执行时间较长,建议异步运行
:param only_calculate:
:param canvas_box 此节点在画布上的盒子,并不是在父节点上的盒子
:return:
"""
for name, child in self.__dict__.items():
# 此节点在画布上的盒子
if isinstance(child, BasePanel) and name not in ["canvas", "parent"]:
child.parent = self
if isinstance(self, Canvas):
child.canvas = self
else:
child.canvas = self.canvas
dxc = canvas_box[2] - canvas_box[0]
dyc = canvas_box[3] - canvas_box[1]
child.canvas_box = (
canvas_box[0] + dxc * child.box[0],
canvas_box[1] + dyc * child.box[1],
canvas_box[0] + dxc * child.box[2],
canvas_box[1] + dyc * child.box[3]
)
child.load(only_calculate)
child.save_as(child.canvas_box, only_calculate)
class Canvas(BasePanel):
def __init__(self, base_img: Image.Image):
self.base_img = base_img
self.canvas = self
super(Canvas, self).__init__()
self.draw_line_list = []
def export(self, file, alpha=False):
self.base_img = self.base_img.convert("RGBA")
self.save_as((0, 0, 1, 1))
draw = ImageDraw.Draw(self.base_img)
for line in self.draw_line_list:
draw.line(*line)
if not alpha:
self.base_img = self.base_img.convert("RGB")
self.base_img.save(file)
def delete(self):
os.remove(self.file)
def get_actual_box(self, path: str) -> Union[None, Tuple[float, float, float, float]]:
"""
获取控件实际相对大小
函数执行时间较长
:param path: 控件路径
:return:
"""
sub_obj = self
self.save_as((0, 0, 1, 1), True)
control_path = ""
for i, seq in enumerate(path.split(".")):
if seq not in sub_obj.__dict__:
raise KeyError(f"{control_path}中找不到控件:{seq}")
control_path += f".{seq}"
sub_obj = sub_obj.__dict__[seq]
return sub_obj.actual_pos
def get_actual_pixel_size(self, path: str) -> Union[None, Tuple[int, int]]:
"""
获取控件实际像素长宽
函数执行时间较长
:param path: 控件路径
:return:
"""
sub_obj = self
self.save_as((0, 0, 1, 1), True)
control_path = ""
for i, seq in enumerate(path.split(".")):
if seq not in sub_obj.__dict__:
raise KeyError(f"{control_path}中找不到控件:{seq}")
control_path += f".{seq}"
sub_obj = sub_obj.__dict__[seq]
dx = int(sub_obj.canvas.base_img.size[0] * (sub_obj.actual_pos[2] - sub_obj.actual_pos[0]))
dy = int(sub_obj.canvas.base_img.size[1] * (sub_obj.actual_pos[3] - sub_obj.actual_pos[1]))
return dx, dy
def get_actual_pixel_box(self, path: str) -> Union[None, Tuple[int, int, int, int]]:
"""
获取控件实际像素大小盒子
函数执行时间较长
:param path: 控件路径
:return:
"""
sub_obj = self
self.save_as((0, 0, 1, 1), True)
control_path = ""
for i, seq in enumerate(path.split(".")):
if seq not in sub_obj.__dict__:
raise KeyError(f"{control_path}中找不到控件:{seq}")
control_path += f".{seq}"
sub_obj = sub_obj.__dict__[seq]
x1 = int(sub_obj.canvas.base_img.size[0] * sub_obj.actual_pos[0])
y1 = int(sub_obj.canvas.base_img.size[1] * sub_obj.actual_pos[1])
x2 = int(sub_obj.canvas.base_img.size[2] * sub_obj.actual_pos[2])
y2 = int(sub_obj.canvas.base_img.size[3] * sub_obj.actual_pos[3])
return x1, y1, x2, y2
def get_parent_box(self, path: str) -> Union[None, Tuple[float, float, float, float]]:
"""
获取控件在父节点的大小
函数执行时间较长
:param path: 控件路径
:return:
"""
sub_obj = self.get_control_by_path(path)
on_parent_pos = (
(sub_obj.actual_pos[0] - sub_obj.parent.actual_pos[0]) / (sub_obj.parent.actual_pos[2] - sub_obj.parent.actual_pos[0]),
(sub_obj.actual_pos[1] - sub_obj.parent.actual_pos[1]) / (sub_obj.parent.actual_pos[3] - sub_obj.parent.actual_pos[1]),
(sub_obj.actual_pos[2] - sub_obj.parent.actual_pos[0]) / (sub_obj.parent.actual_pos[2] - sub_obj.parent.actual_pos[0]),
(sub_obj.actual_pos[3] - sub_obj.parent.actual_pos[1]) / (sub_obj.parent.actual_pos[3] - sub_obj.parent.actual_pos[1])
)
return on_parent_pos
def get_control_by_path(self, path: str) -> Union[BasePanel, "Img", "Rectangle", "Text"]:
sub_obj = self
self.save_as((0, 0, 1, 1), True)
control_path = ""
for i, seq in enumerate(path.split(".")):
if seq not in sub_obj.__dict__:
raise KeyError(f"{control_path}中找不到控件:{seq}")
control_path += f".{seq}"
sub_obj = sub_obj.__dict__[seq]
return sub_obj
def draw_line(self, path: str, p1: Tuple[float, float], p2: Tuple[float, float], color, width):
"""
画线
:param color:
:param width:
:param path:
:param p1:
:param p2:
:return:
"""
ac_pos = self.get_actual_box(path)
control = self.get_control_by_path(path)
dx = ac_pos[2] - ac_pos[0]
dy = ac_pos[3] - ac_pos[1]
xy_box = int((ac_pos[0] + dx * p1[0]) * control.canvas.base_img.size[0]), int((ac_pos[1] + dy * p1[1]) * control.canvas.base_img.size[1]), int(
(ac_pos[0] + dx * p2[0]) * control.canvas.base_img.size[0]), int((ac_pos[1] + dy * p2[1]) * control.canvas.base_img.size[1])
self.draw_line_list.append((xy_box, color, width))
class Panel(BasePanel):
def __init__(self, uv_size, box_size, parent_point, point):
super(Panel, self).__init__(uv_size, box_size, parent_point, point)
class TextSegment:
def __init__(self, text, **kwargs):
if not isinstance(text, str):
raise TypeError("请输入字符串")
self.text = text
self.color = kwargs.get("color", None)
self.font = kwargs.get("font", None)
@staticmethod
def text2text_segment_list(text: str):
"""
暂时没写好
:param text: %FFFFFFFF%1123%FFFFFFFF%21323
:return:
"""
pass
class Text(BasePanel):
def __init__(self, uv_size, box_size, parent_point, point, text: Union[str, list], font=default_font, color=(255, 255, 255, 255), vertical=False,
line_feed=False, force_size=False, fill=(0, 0, 0, 0), fillet=0, outline=(0, 0, 0, 0), outline_width=0, rectangle_side=0, font_size=None, dp: int = 5,
anchor: str = "la"):
"""
:param uv_size:
:param box_size:
:param parent_point:
:param point:
:param text: list[TextSegment] | str
:param font:
:param color:
:param vertical: 是否竖直
:param line_feed: 是否换行
:param force_size: 强制大小
:param dp: 字体大小递减精度
:param anchor : https://www.zhihu.com/question/474216280
:param fill: 底部填充颜色
:param fillet: 填充圆角
:param rectangle_side: 边框宽度
:param outline: 填充矩形边框颜色
:param outline_width: 填充矩形边框宽度
"""
self.actual_pos = None
self.outline_width = outline_width
self.outline = outline
self.fill = fill
self.fillet = fillet
self.font = font
self.text = text
self.color = color
self.force_size = force_size
self.vertical = vertical
self.line_feed = line_feed
self.dp = dp
self.font_size = font_size
self.rectangle_side = rectangle_side
self.anchor = anchor
super(Text, self).__init__(uv_size, box_size, parent_point, point)
def load(self, only_calculate=False):
"""限制区域像素大小"""
if isinstance(self.text, str):
self.text = [
TextSegment(text=self.text, color=self.color, font=self.font)
]
all_text = str()
for text in self.text:
all_text += text.text
limited_size = int((self.canvas_box[2] - self.canvas_box[0]) * self.canvas.base_img.size[0]), int((self.canvas_box[3] - self.canvas_box[1]) * self.canvas.base_img.size[1])
font_size = limited_size[1] if self.font_size is None else self.font_size
image_font = ImageFont.truetype(self.font, font_size)
actual_size = image_font.getsize(all_text)
while (actual_size[0] > limited_size[0] or actual_size[1] > limited_size[1]) and not self.force_size:
font_size -= self.dp
image_font = ImageFont.truetype(self.font, font_size)
actual_size = image_font.getsize(all_text)
draw = ImageDraw.Draw(self.canvas.base_img)
if isinstance(self.parent, Img) or isinstance(self.parent, Text):
self.parent.canvas_box = self.parent.actual_pos
dx0 = self.parent.canvas_box[2] - self.parent.canvas_box[0]
dy0 = self.parent.canvas_box[3] - self.parent.canvas_box[1]
dx1 = actual_size[0] / self.canvas.base_img.size[0]
dy1 = actual_size[1] / self.canvas.base_img.size[1]
start_point = [
int((self.parent.canvas_box[0] + dx0 * self.parent_point[0] - dx1 * self.point[0]) * self.canvas.base_img.size[0]),
int((self.parent.canvas_box[1] + dy0 * self.parent_point[1] - dy1 * self.point[1]) * self.canvas.base_img.size[1])
]
self.actual_pos = (
start_point[0] / self.canvas.base_img.size[0],
start_point[1] / self.canvas.base_img.size[1],
(start_point[0] + actual_size[0]) / self.canvas.base_img.size[0],
(start_point[1] + actual_size[1]) / self.canvas.base_img.size[1],
)
self.font_size = font_size
if not only_calculate:
for text_segment in self.text:
if text_segment.color is None:
text_segment.color = self.color
if text_segment.font is None:
text_segment.font = self.font
image_font = ImageFont.truetype(font=text_segment.font, size=font_size)
if self.fill[-1] > 0:
rectangle = Shape.rectangle(size=(actual_size[0] + 2 * self.rectangle_side, actual_size[1] + 2 * self.rectangle_side), fillet=self.fillet, fill=self.fill,
width=self.outline_width, outline=self.outline)
self.canvas.base_img.paste(im=rectangle, box=(start_point[0] - self.rectangle_side,
start_point[1] - self.rectangle_side,
start_point[0] + actual_size[0] + self.rectangle_side,
start_point[1] + actual_size[1] + self.rectangle_side),
mask=rectangle.split()[-1])
draw.text((start_point[0] - self.rectangle_side, start_point[1] - self.rectangle_side),
text_segment.text, text_segment.color, font=image_font, anchor=self.anchor)
text_width = image_font.getsize(text_segment.text)
start_point[0] += text_width[0]
class Img(BasePanel):
def __init__(self, uv_size, box_size, parent_point, point, img: Image.Image, keep_ratio=True):
self.img_base_img = img
self.keep_ratio = keep_ratio
super(Img, self).__init__(uv_size, box_size, parent_point, point)
def load(self, only_calculate=False):
self.preprocess()
self.img_base_img = self.img_base_img.convert("RGBA")
limited_size = int((self.canvas_box[2] - self.canvas_box[0]) * self.canvas.base_img.size[0]), \
int((self.canvas_box[3] - self.canvas_box[1]) * self.canvas.base_img.size[1])
if self.keep_ratio:
"""保持比例"""
actual_ratio = self.img_base_img.size[0] / self.img_base_img.size[1]
limited_ratio = limited_size[0] / limited_size[1]
if actual_ratio >= limited_ratio:
# 图片过长
self.img_base_img = self.img_base_img.resize(
(int(self.img_base_img.size[0] * limited_size[0] / self.img_base_img.size[0]),
int(self.img_base_img.size[1] * limited_size[0] / self.img_base_img.size[0]))
)
else:
self.img_base_img = self.img_base_img.resize(
(int(self.img_base_img.size[0] * limited_size[1] / self.img_base_img.size[1]),
int(self.img_base_img.size[1] * limited_size[1] / self.img_base_img.size[1]))
)
else:
"""不保持比例"""
self.img_base_img = self.img_base_img.resize(limited_size)
# 占比长度
if isinstance(self.parent, Img) or isinstance(self.parent, Text):
self.parent.canvas_box = self.parent.actual_pos
dx0 = self.parent.canvas_box[2] - self.parent.canvas_box[0]
dy0 = self.parent.canvas_box[3] - self.parent.canvas_box[1]
dx1 = self.img_base_img.size[0] / self.canvas.base_img.size[0]
dy1 = self.img_base_img.size[1] / self.canvas.base_img.size[1]
start_point = (
int((self.parent.canvas_box[0] + dx0 * self.parent_point[0] - dx1 * self.point[0]) * self.canvas.base_img.size[0]),
int((self.parent.canvas_box[1] + dy0 * self.parent_point[1] - dy1 * self.point[1]) * self.canvas.base_img.size[1])
)
alpha = self.img_base_img.split()[3]
self.actual_pos = (
start_point[0] / self.canvas.base_img.size[0],
start_point[1] / self.canvas.base_img.size[1],
(start_point[0] + self.img_base_img.size[0]) / self.canvas.base_img.size[0],
(start_point[1] + self.img_base_img.size[1]) / self.canvas.base_img.size[1],
)
if not only_calculate:
self.canvas.base_img.paste(self.img_base_img, start_point, alpha)
def preprocess(self):
pass
class Rectangle(Img):
def __init__(self, uv_size, box_size, parent_point, point, fillet: Union[int, float] = 0, img: Union[Image.Image] = None, keep_ratio=True,
color=default_color, outline_width=0, outline_color=default_color):
"""
圆角图
:param uv_size:
:param box_size:
:param parent_point:
:param point:
:param fillet: 圆角半径浮点或整数
:param img:
:param keep_ratio:
"""
self.fillet = fillet
self.color = color
self.outline_width = outline_width
self.outline_color = outline_color
super(Rectangle, self).__init__(uv_size, box_size, parent_point, point, img, keep_ratio)
def preprocess(self):
limited_size = (int(self.canvas.base_img.size[0] * (self.canvas_box[2] - self.canvas_box[0])),
int(self.canvas.base_img.size[1] * (self.canvas_box[3] - self.canvas_box[1])))
if not self.keep_ratio and self.img_base_img is not None and self.img_base_img.size[0] / self.img_base_img.size[1] != limited_size[0] / limited_size[1]:
self.img_base_img = self.img_base_img.resize(limited_size)
self.img_base_img = Shape.rectangle(size=limited_size, fillet=self.fillet, fill=self.color, width=self.outline_width, outline=self.outline_color)
class Color:
GREY = (128, 128, 128, 255)
RED = (255, 0, 0, 255)
GREEN = (0, 255, 0, 255)
BLUE = (0, 0, 255, 255)
YELLOW = (255, 255, 0, 255)
PURPLE = (255, 0, 255, 255)
CYAN = (0, 255, 255, 255)
WHITE = (255, 255, 255, 255)
BLACK = (0, 0, 0, 255)
@staticmethod
def hex2dec(colorHex: str) -> Tuple[int, int, int, int]:
"""
:param colorHex: FFFFFFFF ARGB-> (R, G, B, A)
:return:
"""
return int(colorHex[2:4], 16), int(colorHex[4:6], 16), int(colorHex[6:8], 16), int(colorHex[0:2], 16)
class Shape:
@staticmethod
def circular(radius: int, fill: tuple, width: int = 0, outline: tuple = Color.BLACK) -> Image.Image:
"""
:param radius: 半径(像素)
:param fill: 填充颜色
:param width: 轮廓粗细(像素)
:param outline: 轮廓颜色
:return: 圆形Image对象
"""
img = Image.new("RGBA", (radius * 2, radius * 2), color=radius)
draw = ImageDraw.Draw(img)
draw.ellipse(xy=(0, 0, radius * 2, radius * 2), fill=fill, outline=outline, width=width)
return img
@staticmethod
def rectangle(size: Tuple[int, int], fill: tuple, width: int = 0, outline: tuple = Color.BLACK, fillet: int = 0) -> Image.Image:
"""
:param fillet: 圆角半径(像素)
:param size: 长宽(像素)
:param fill: 填充颜色
:param width: 轮廓粗细(像素)
:param outline: 轮廓颜色
:return: 矩形Image对象
"""
img = Image.new("RGBA", size, color=fill)
draw = ImageDraw.Draw(img)
draw.rounded_rectangle(xy=(0, 0, size[0], size[1]), fill=fill, outline=outline, width=width, radius=fillet)
return img
@staticmethod
def ellipse(size: Tuple[int, int], fill: tuple, outline: int = 0, outline_color: tuple = Color.BLACK) -> Image.Image:
"""
:param size: 长宽(像素)
:param fill: 填充颜色
:param outline: 轮廓粗细(像素)
:param outline_color: 轮廓颜色
:return: 椭圆Image对象
"""
img = Image.new("RGBA", size, color=fill)
draw = ImageDraw.Draw(img)
draw.ellipse(xy=(0, 0, size[0], size[1]), fill=fill, outline=outline_color, width=outline)
return img
@staticmethod
def polygon(points: List[Tuple[int, int]], fill: tuple, outline: int, outline_color: tuple) -> Image.Image:
"""
:param points: 多边形顶点列表
:param fill: 填充颜色
:param outline: 轮廓粗细(像素)
:param outline_color: 轮廓颜色
:return: 多边形Image对象
"""
img = Image.new("RGBA", (max(points)[0], max(points)[1]), color=fill)
draw = ImageDraw.Draw(img)
draw.polygon(xy=points, fill=fill, outline=outline_color, width=outline)
return img
@staticmethod
def line(points: List[Tuple[int, int]], fill: tuple, width: int) -> Image:
"""
:param points: 线段顶点列表
:param fill: 填充颜色
:param width: 线段粗细(像素)
:return: 线段Image对象
"""
img = Image.new("RGBA", (max(points)[0], max(points)[1]), color=fill)
draw = ImageDraw.Draw(img)
draw.line(xy=points, fill=fill, width=width)
return img
class Utils:
@staticmethod
def central_clip_by_ratio(img: Image.Image, size: Tuple, use_cache=True):
"""
:param use_cache: 是否使用缓存,剪切过一次后默认生成缓存
:param img:
:param size: 仅为比例,满填充裁剪
:return:
"""
cache_file_path = str()
if use_cache:
filename_without_end = ".".join(os.path.basename(img.fp.name).split(".")[0:-1]) + f"_{size[0]}x{size[1]}" + ".png"
cache_file_path = os.path.join(".cache", filename_without_end)
if os.path.exists(cache_file_path):
nonebot.logger.info("本次使用缓存加载图片,不裁剪")
return Image.open(os.path.join(".cache", filename_without_end))
img_ratio = img.size[0] / img.size[1]
limited_ratio = size[0] / size[1]
if limited_ratio > img_ratio:
actual_size = (
img.size[0],
img.size[0] / size[0] * size[1]
)
box = (
0, (img.size[1] - actual_size[1]) // 2,
img.size[0], img.size[1] - (img.size[1] - actual_size[1]) // 2
)
else:
actual_size = (
img.size[1] / size[1] * size[0],
img.size[1],
)
box = (
(img.size[0] - actual_size[0]) // 2, 0,
img.size[0] - (img.size[0] - actual_size[0]) // 2, img.size[1]
)
img = img.crop(box).resize(size)
if use_cache:
img.save(cache_file_path)
return img
@staticmethod
def circular_clip(img: Image.Image):
"""
裁剪为alpha圆形
:param img:
:return:
"""
length = min(img.size)
alpha_cover = Image.new("RGBA", (length, length), color=(0, 0, 0, 0))
if img.size[0] > img.size[1]:
box = (
(img.size[0] - img[1]) // 2, 0,
(img.size[0] - img[1]) // 2 + img.size[1], img.size[1]
)
else:
box = (
0, (img.size[1] - img.size[0]) // 2,
img.size[0], (img.size[1] - img.size[0]) // 2 + img.size[0]
)
img = img.crop(box).resize((length, length))
draw = ImageDraw.Draw(alpha_cover)
draw.ellipse(xy=(0, 0, length, length), fill=(255, 255, 255, 255))
alpha = alpha_cover.split()[-1]
img.putalpha(alpha)
return img
@staticmethod
def open_img(path) -> Image.Image:
return Image.open(path, "RGBA")

View File

@ -1,78 +0,0 @@
from urllib.parse import quote
import aiohttp
from nonebot import require
from src.utils.event import get_user_id
from src.utils.base.language import Language
from src.utils.base.ly_typing import T_MessageEvent
from src.utils.base.resource import get_path
from src.utils.message.html_tool import template2image
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import UniMessage, on_alconna, Alconna, Args, Subcommand, Arparma, Option
crt_cmd = on_alconna(
Alconna(
"crt",
Subcommand(
"route",
Args["start", str, "沙坪坝"]["end", str, "上新街"],
alias=("r",),
help_text="查询两地之间的地铁路线"
),
)
)
@crt_cmd.assign("route")
async def _(result: Arparma, event: T_MessageEvent):
# 获取语言
ulang = Language(get_user_id(event))
# 获取参数
# 你也别问我为什么要quote两次问就是CRT官网的锅只有这样才可以运行
start = quote(quote(result.other_args.get("start")))
end = quote(quote(result.other_args.get("end")))
# 判断参数语言
query_lang_code = ""
if start.isalpha() and end.isalpha():
query_lang_code = "Eng"
# 构造请求 URL
url = f"https://www.cqmetro.cn/Front/html/TakeLine!queryYs{query_lang_code}TakeLine.action?entity.startStaName={start}&entity.endStaName={end}"
# 请求数据
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.json()
# 检查结果/无则终止
if not result.get("result"):
await crt_cmd.send(ulang.get("crt.no_result"))
return
# 模板传参定义
templates = {
"data" : {
"result": result["result"],
},
"localization": ulang.get_many(
"crt.station",
"crt.hour",
"crt.minute",
)
}
# 生成图片
image = await template2image(
template=get_path("templates/crt_route.html"),
templates=templates,
debug=True
)
# 发送图片
await crt_cmd.send(UniMessage.image(raw=image))

View File

@ -1,419 +0,0 @@
import json
from typing import List, Any
from PIL import Image
from arclet.alconna import Alconna
from nb_cli import run_sync
from nonebot import on_command
from nonebot_plugin_alconna import on_alconna, Alconna, Subcommand, Args, MultiVar, Arparma, UniMessage
from pydantic import BaseModel
from .canvas import *
from ...utils.base.resource import get_path
resolution = 256
class Entrance(BaseModel):
identifier: str
size: tuple[int, int]
dest: List[str]
class Station(BaseModel):
identifier: str
chineseName: str
englishName: str
position: tuple[int, int]
class Line(BaseModel):
identifier: str
chineseName: str
englishName: str
color: Any
stations: List["Station"]
font_light = get_path("templates/fonts/MiSans/MiSans-Light.woff2")
font_bold = get_path("templates/fonts/MiSans/MiSans-Bold.woff2")
@run_sync
def generate_entrance_sign(name: str, aliases: List[str], lineInfo: List[Line], entranceIdentifier: str, ratio: tuple[int | float, int | float],
reso: int = resolution):
"""
Generates an entrance sign for the ride.
"""
width, height = ratio[0] * reso, ratio[1] * reso
baseCanvas = Canvas(Image.new("RGBA", (width, height), Color.WHITE))
# 加黑色图框
baseCanvas.outline = Img(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0, 0),
point=(0, 0),
img=Shape.rectangle(
size=(width, height),
fillet=0,
fill=(0, 0, 0, 0),
width=15,
outline=Color.BLACK
)
)
baseCanvas.contentPanel = Panel(
uv_size=(width, height),
box_size=(width - 28, height - 28),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
)
linePanelHeight = 0.7 * ratio[1]
linePanelWidth = linePanelHeight * 1.3
# 画线路面板部分
for i, line in enumerate(lineInfo):
linePanel = baseCanvas.contentPanel.__dict__[f"Line_{i}_Panel"] = Panel(
uv_size=ratio,
box_size=(linePanelWidth, linePanelHeight),
parent_point=(i * linePanelWidth / ratio[0], 1),
point=(0, 1),
)
linePanel.colorCube = Img(
uv_size=(1, 1),
box_size=(0.15, 1),
parent_point=(0.125, 1),
point=(0, 1),
img=Shape.rectangle(
size=(100, 100),
fillet=0,
fill=line.color,
),
keep_ratio=False
)
textPanel = linePanel.TextPanel = Panel(
uv_size=(1, 1),
box_size=(0.625, 1),
parent_point=(1, 1),
point=(1, 1)
)
# 中文线路名
textPanel.namePanel = Panel(
uv_size=(1, 1),
box_size=(1, 2 / 3),
parent_point=(0, 0),
point=(0, 0),
)
nameSize = baseCanvas.get_actual_pixel_size("contentPanel.Line_{}_Panel.TextPanel.namePanel".format(i))
textPanel.namePanel.text = Text(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
text=line.chineseName,
color=Color.BLACK,
font_size=int(nameSize[1] * 0.5),
force_size=True,
font=font_bold
)
# 英文线路名
textPanel.englishNamePanel = Panel(
uv_size=(1, 1),
box_size=(1, 1 / 3),
parent_point=(0, 1),
point=(0, 1),
)
englishNameSize = baseCanvas.get_actual_pixel_size("contentPanel.Line_{}_Panel.TextPanel.englishNamePanel".format(i))
textPanel.englishNamePanel.text = Text(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
text=line.englishName,
color=Color.BLACK,
font_size=int(englishNameSize[1] * 0.6),
force_size=True,
font=font_light
)
# 画名称部分
namePanel = baseCanvas.contentPanel.namePanel = Panel(
uv_size=(1, 1),
box_size=(1, 0.4),
parent_point=(0.5, 0),
point=(0.5, 0),
)
namePanel.text = Text(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
text=name,
color=Color.BLACK,
font_size=int(height * 0.3),
force_size=True,
font=font_bold
)
aliasesPanel = baseCanvas.contentPanel.aliasesPanel = Panel(
uv_size=(1, 1),
box_size=(1, 0.5),
parent_point=(0.5, 1),
point=(0.5, 1),
)
for j, alias in enumerate(aliases):
aliasesPanel.__dict__[alias] = Text(
uv_size=(1, 1),
box_size=(0.35, 0.5),
parent_point=(0.5, 0.5 * j),
point=(0.5, 0),
text=alias,
color=Color.BLACK,
font_size=int(height * 0.15),
font=font_light
)
# 画入口标识
entrancePanel = baseCanvas.contentPanel.entrancePanel = Panel(
uv_size=(1, 1),
box_size=(0.2, 1),
parent_point=(1, 0.5),
point=(1, 0.5),
)
# 中文文本
entrancePanel.namePanel = Panel(
uv_size=(1, 1),
box_size=(1, 0.5),
parent_point=(1, 0),
point=(1, 0),
)
entrancePanel.namePanel.text = Text(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0, 0.5),
point=(0, 0.5),
text=f"{entranceIdentifier}出入口",
color=Color.BLACK,
font_size=int(height * 0.2),
force_size=True,
font=font_bold
)
# 英文文本
entrancePanel.englishNamePanel = Panel(
uv_size=(1, 1),
box_size=(1, 0.5),
parent_point=(1, 1),
point=(1, 1),
)
entrancePanel.englishNamePanel.text = Text(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0, 0.5),
point=(0, 0.5),
text=f"Entrance {entranceIdentifier}",
color=Color.BLACK,
font_size=int(height * 0.15),
force_size=True,
font=font_light
)
return baseCanvas.base_img.tobytes()
crt_alc = on_alconna(
Alconna(
"crt",
Subcommand(
"entrance",
Args["name", str]["lines", str, ""]["entrance", int, 1], # /crt entrance 璧山&Bishan 1号线&Line1&#ff0000,27号线&Line1&#ff0000 1A
)
)
)
@crt_alc.assign("entrance")
async def _(result: Arparma):
args = result.subcommands.get("entrance").args
name = args["name"]
lines = args["lines"]
entrance = args["entrance"]
line_info = []
for line in lines.split(","):
line_args = line.split("&")
line_info.append(Line(
identifier=1,
chineseName=line_args[0],
englishName=line_args[1],
color=line_args[2],
stations=[]
))
img_bytes = await generate_entrance_sign(
name=name,
aliases=name.split("&"),
lineInfo=line_info,
entranceIdentifier=entrance,
ratio=(8, 1),
reso=256,
)
await crt_alc.finish(
UniMessage.image(raw=img_bytes)
)
def generate_platform_line_pic(line: Line, station: Station, ratio=None, reso: int = resolution):
"""
生成站台线路图
:param line: 线路对象
:param station: 本站点对象
:param ratio: 比例
:param reso: 分辨率1reso
:return: 两个方向的站牌
"""
if ratio is None:
ratio = [4, 1]
width, height = ratio[0] * reso, ratio[1] * reso
baseCanvas = Canvas(Image.new("RGBA", (width, height), Color.YELLOW))
# 加黑色图框
baseCanvas.linePanel = Panel(
uv_size=(1, 1),
box_size=(0.8, 0.15),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
)
# 直线块
baseCanvas.linePanel.recLine = Img(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
img=Shape.rectangle(
size=(10, 10),
fill=line.color,
),
keep_ratio=False
)
# 灰色直线块
baseCanvas.linePanel.recLineGrey = Img(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
img=Shape.rectangle(
size=(10, 10),
fill=Color.GREY,
),
keep_ratio=False
)
# 生成各站圆点
outline_width = 40
circleForward = Shape.circular(
radius=200,
fill=Color.WHITE,
width=outline_width,
outline=line.color,
)
circleThisPanel = Canvas(Image.new("RGBA", (200, 200), (0, 0, 0, 0)))
circleThisPanel.circleOuter = Img(
uv_size=(1, 1),
box_size=(1, 1),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
img=Shape.circular(
radius=200,
fill=Color.WHITE,
width=outline_width,
outline=line.color,
),
)
circleThisPanel.circleOuter.circleInner = Img(
uv_size=(1, 1),
box_size=(0.7, 0.7),
parent_point=(0.5, 0.5),
point=(0.5, 0.5),
img=Shape.circular(
radius=200,
fill=line.color,
width=0,
outline=line.color,
),
)
circleThisPanel.export("a.png", alpha=True)
circleThis = circleThisPanel.base_img
circlePassed = Shape.circular(
radius=200,
fill=Color.WHITE,
width=outline_width,
outline=Color.GREY,
)
arrival = False
distance = 1 / (len(line.stations) - 1)
for i, sta in enumerate(line.stations):
box_size = (1.618, 1.618)
if sta.identifier == station.identifier:
arrival = True
baseCanvas.linePanel.recLine.__dict__["station_{}".format(sta.identifier)] = Img(
uv_size=(1, 1),
box_size=(1.8, 1.8),
parent_point=(distance * i, 0.5),
point=(0.5, 0.5),
img=circleThis,
keep_ratio=True
)
continue
if arrival:
# 后方站绘制
baseCanvas.linePanel.recLine.__dict__["station_{}".format(sta.identifier)] = Img(
uv_size=(1, 1),
box_size=box_size,
parent_point=(distance * i, 0.5),
point=(0.5, 0.5),
img=circleForward,
keep_ratio=True
)
else:
# 前方站绘制
baseCanvas.linePanel.recLine.__dict__["station_{}".format(sta.identifier)] = Img(
uv_size=(1, 1),
box_size=box_size,
parent_point=(distance * i, 0.5),
point=(0.5, 0.5),
img=circlePassed,
keep_ratio=True
)
return baseCanvas
def generate_platform_sign(name: str, aliases: List[str], lineInfo: List[Line], entranceIdentifier: str, ratio: tuple[int | float, int | float],
reso: int = resolution
):
pass
# def main():
# generate_entrance_sign(
# "璧山",
# aliases=["Bishan"],
# lineInfo=[
#
# Line(identifier="2", chineseName="1号线", englishName="Line 1", color=Color.RED, stations=[]),
# Line(identifier="3", chineseName="27号线", englishName="Line 27", color="#685bc7", stations=[]),
# Line(identifier="1", chineseName="璧铜线", englishName="BT Line", color="#685BC7", stations=[]),
# ],
# entranceIdentifier="1",
# ratio=(8, 1)
# )
#
#
# main()

View File

@ -1,15 +0,0 @@
from nonebot.plugin import PluginMetadata
from .minesweeper import *
__plugin_meta__ = PluginMetadata(
name="轻雪小游戏",
description="内置了一些小游戏",
usage="",
type="application",
homepage="https://github.com/snowykami/LiteyukiBot",
extra={
"liteyuki": True,
"toggleable" : True,
"default_enable" : True,
}
)

View File

@ -1,168 +0,0 @@
import random
from pydantic import BaseModel
from src.utils.message.message import MarkdownMessage as md
class Dot(BaseModel):
row: int
col: int
mask: bool = True
value: int = 0
flagged: bool = False
class Minesweeper:
# 0-8: number of mines around, 9: mine, -1: undefined
NUMS = "⓪①②③④⑤⑥⑦⑧🅑⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳"
MASK = "🅜"
FLAG = "🅕"
MINE = "🅑"
def __init__(self, rows, cols, num_mines, session_type, session_id):
assert rows > 0 and cols > 0 and 0 < num_mines < rows * cols
self.session_type = session_type
self.session_id = session_id
self.rows = rows
self.cols = cols
self.num_mines = num_mines
self.board: list[list[Dot]] = [[Dot(row=i, col=j) for j in range(cols)] for i in range(rows)]
self.is_first = True
def reveal(self, row, col) -> bool:
"""
展开
Args:
row:
col:
Returns:
游戏是否继续
"""
if self.is_first:
# 第一次展开,生成地雷
self.generate_board(self.board[row][col])
self.is_first = False
if self.board[row][col].value == 9:
self.board[row][col].mask = False
return False
if not self.board[row][col].mask:
return True
self.board[row][col].mask = False
if self.board[row][col].value == 0:
self.reveal_neighbors(row, col)
return True
def is_win(self) -> bool:
"""
是否胜利
Returns:
"""
for row in range(self.rows):
for col in range(self.cols):
if self.board[row][col].mask and self.board[row][col].value != 9:
return False
return True
def generate_board(self, first_dot: Dot):
"""
避开第一个点,生成地雷
Args:
first_dot: 第一个点
Returns:
"""
generate_count = 0
while generate_count < self.num_mines:
row = random.randint(0, self.rows - 1)
col = random.randint(0, self.cols - 1)
if self.board[row][col].value == 9 or (row, col) == (first_dot.row, first_dot.col):
continue
self.board[row][col] = Dot(row=row, col=col, mask=True, value=9)
generate_count += 1
for row in range(self.rows):
for col in range(self.cols):
if self.board[row][col].value != 9:
self.board[row][col].value = self.count_adjacent_mines(row, col)
def count_adjacent_mines(self, row, col):
"""
计算周围地雷数量
Args:
row:
col:
Returns:
"""
count = 0
for r in range(max(0, row - 1), min(self.rows, row + 2)):
for c in range(max(0, col - 1), min(self.cols, col + 2)):
if self.board[r][c].value == 9:
count += 1
return count
def reveal_neighbors(self, row, col):
"""
递归展开,使用深度优先搜索
Args:
row:
col:
Returns:
"""
for r in range(max(0, row - 1), min(self.rows, row + 2)):
for c in range(max(0, col - 1), min(self.cols, col + 2)):
if self.board[r][c].mask:
self.board[r][c].mask = False
if self.board[r][c].value == 0:
self.reveal_neighbors(r, c)
def mark(self, row, col) -> bool:
"""
标记
Args:
row:
col:
Returns:
是否标记成功,如果已经展开则无法标记
"""
if self.board[row][col].mask:
self.board[row][col].flagged = not self.board[row][col].flagged
return self.board[row][col].flagged
def board_markdown(self) -> str:
"""
打印地雷板
Returns:
"""
dis = " "
start = "> " if self.cols >= 10 else ""
text = start + self.NUMS[0] + dis*2
# 横向两个雷之间的间隔字符
# 生成横向索引
for i in range(self.cols):
text += f"{self.NUMS[i]}" + dis
text += "\n\n"
for i, row in enumerate(self.board):
text += start + f"{self.NUMS[i]}" + dis*2
for dot in row:
if dot.mask and not dot.flagged:
text += md.btn_cmd(self.MASK, f"minesweeper reveal {dot.row} {dot.col}")
elif dot.flagged:
text += md.btn_cmd(self.FLAG, f"minesweeper mark {dot.row} {dot.col}")
else:
text += self.NUMS[dot.value]
text += dis
text += "\n"
btn_mark = md.btn_cmd("标记", f"minesweeper mark ", enter=False)
btn_end = md.btn_cmd("结束", "minesweeper end", enter=True)
text += f" {btn_mark} {btn_end}"
return text

View File

@ -1,103 +0,0 @@
from nonebot import require
from src.utils.base.ly_typing import T_Bot, T_MessageEvent
from src.utils.message.message import MarkdownMessage as md
require("nonebot_plugin_alconna")
from .game import Minesweeper
from nonebot_plugin_alconna import Alconna, on_alconna, Subcommand, Args, Arparma
minesweeper = on_alconna(
aliases={"扫雷"},
command=Alconna(
"minesweeper",
Subcommand(
"start",
Args["row", int, 8]["col", int, 8]["mines", int, 10],
alias=["开始"],
),
Subcommand(
"end",
alias=["结束"]
),
Subcommand(
"reveal",
Args["row", int]["col", int],
alias=["展开"]
),
Subcommand(
"mark",
Args["row", int]["col", int],
alias=["标记"]
),
),
)
minesweeper_cache: list[Minesweeper] = []
def get_minesweeper_cache(event: T_MessageEvent) -> Minesweeper | None:
for i in minesweeper_cache:
if i.session_type == event.message_type:
if i.session_id == event.user_id or i.session_id == event.group_id:
return i
return None
@minesweeper.handle()
async def _(event: T_MessageEvent, result: Arparma, bot: T_Bot):
game = get_minesweeper_cache(event)
if result.subcommands.get("start"):
if game:
await minesweeper.finish("当前会话不能同时进行多个扫雷游戏")
else:
try:
new_game = Minesweeper(
rows=result.subcommands["start"].args["row"],
cols=result.subcommands["start"].args["col"],
num_mines=result.subcommands["start"].args["mines"],
session_type=event.message_type,
session_id=event.user_id if event.message_type == "private" else event.group_id,
)
minesweeper_cache.append(new_game)
await minesweeper.send("游戏开始")
await md.send_md(new_game.board_markdown(), bot, event=event)
except AssertionError:
await minesweeper.finish("参数错误")
elif result.subcommands.get("end"):
if game:
minesweeper_cache.remove(game)
await minesweeper.finish("游戏结束")
else:
await minesweeper.finish("当前没有扫雷游戏")
elif result.subcommands.get("reveal"):
if not game:
await minesweeper.finish("当前没有扫雷游戏")
else:
row = result.subcommands["reveal"].args["row"]
col = result.subcommands["reveal"].args["col"]
if not (0 <= row < game.rows and 0 <= col < game.cols):
await minesweeper.finish("参数错误")
if not game.reveal(row, col):
minesweeper_cache.remove(game)
await md.send_md(game.board_markdown(), bot, event=event)
await minesweeper.finish("游戏结束")
await md.send_md(game.board_markdown(), bot, event=event)
if game.is_win():
minesweeper_cache.remove(game)
await minesweeper.finish("游戏胜利")
elif result.subcommands.get("mark"):
if not game:
await minesweeper.finish("当前没有扫雷游戏")
else:
row = result.subcommands["mark"].args["row"]
col = result.subcommands["mark"].args["col"]
if not (0 <= row < game.rows and 0 <= col < game.cols):
await minesweeper.finish("参数错误")
game.board[row][col].flagged = not game.board[row][col].flagged
await md.send_md(game.board_markdown(), bot, event=event)
else:
await minesweeper.finish("参数错误")

View File

@ -14,17 +14,21 @@ from nonebot.permission import SUPERUSER
from nonebot.plugin import Plugin, PluginMetadata
from nonebot.utils import run_sync
from src.utils.base.data_manager import InstalledPlugin
from src.utils.base.language import get_user_lang
from src.utils.base.ly_typing import T_Bot
from src.utils.message.message import MarkdownMessage as md
from src.utils.message.markdown import MarkdownComponent as mdc, compile_md, escape_md
from src.utils.base.permission import GROUP_ADMIN, GROUP_OWNER
from src.utils.message.tools import clamp
from src.utils.message.message import MarkdownMessage as md
from src.utils.message.markdown import MarkdownComponent as mdc, compile_md, escape_md
from src.utils.message.html_tool import md_to_pic
from .common import *
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import (
UniMessage,
on_alconna,
Alconna,
Args,
@ -147,7 +151,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
session_id = event.group_id
new_event = event
else:
raise FinishedException(ulang.get("Permission Denied"))
raise FinishedException(ulang.get("liteyuki.permission_denied"))
session_enable = get_plugin_session_enable(
new_event, plugin_name
@ -292,7 +296,8 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
reply += f"\n{ulang.get('npm.too_many_results', HIDE_NUM=len(rs) - max_show)}"
else:
reply = ulang.get("npm.search_no_result")
await md.send_md(reply, bot, event=event)
img_bytes = await md_to_pic(reply)
await UniMessage.send(UniMessage.image(raw=img_bytes))
elif sc.get("install") and perm_s:
plugin_name: str = result.subcommands["install"].args.get("plugin_name")
@ -320,7 +325,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
info = md.escape(
ulang.get("npm.install_success", NAME=store_plugin.name)
) # markdown转义
await md.send_md(f"{info}\n\n" f"```\n{log}\n```", bot, event=event)
await npm.send(f"{info}\n\n" + f"\n{log}\n")
else:
await npm.finish(
ulang.get(
@ -331,12 +336,12 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
info = ulang.get(
"npm.load_failed", NAME=plugin_name, HOMEPAGE=homepage_btn
).replace("_", r"\\_")
await md.send_md(f"{info}\n\n" f"```\n{log}\n```\n", bot, event=event)
await npm.finish(f"{info}\n\n" f"```\n{log}\n```\n")
else:
info = ulang.get(
"npm.install_failed", NAME=plugin_name, HOMEPAGE=homepage_btn
).replace("_", r"\\_")
await md.send_md(f"{info}\n\n" f"```\n{log}\n```", bot, event=event)
await npm.send(f"{info}\n\n" f"```\n{log}\n```")
elif sc.get("uninstall") and perm_s:
plugin_name: str = result.subcommands["uninstall"].args.get("plugin_name") # type: ignore
@ -464,7 +469,8 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
else ulang.get("npm.next_page")
)
reply += f"\n{btn_prev} {page}/{total} {btn_next}"
await md.send_md(reply, bot, event=event)
img_bytes = await md_to_pic(reply)
await UniMessage.send(UniMessage.image(raw=img_bytes))
else:
if await SUPERUSER(bot, event):
@ -517,7 +523,8 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
f"\n\n>page为页数num为每页显示数量"
f"\n\n>*{md.escape('npm list [page] [num]')}*"
)
await md.send_md(reply, bot, event=event)
img_bytes = await md_to_pic(reply)
await UniMessage.send(UniMessage.image(raw=img_bytes))
else:
btn_list = md.btn_cmd(
@ -539,7 +546,8 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
f"\n\n>page为页数num为每页显示数量"
f"\n\n>*{md.escape('npm list [page] [num]')}*"
)
await md.send_md(reply, bot, event=event)
img_bytes = await md_to_pic(reply)
await UniMessage.send(UniMessage.image(raw=img_bytes))
@on_alconna(
@ -554,7 +562,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
Subcommand(
disable,
Args["group_id", str, None],
alias=["d", "停用", "禁用"],
alias=["d", "停用"],
),
),
permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN,
@ -679,7 +687,7 @@ async def _(result: Arparma, matcher: Matcher, event: T_MessageEvent, bot: T_Bot
else mdc.paragraph(ulang.get("npm.homepage"))
),
]
await md.send_md(compile_md(reply), bot, event=event)
await matcher.finish(compile_md(reply))
else:
await matcher.finish(ulang.get("npm.plugin_not_found", NAME=plugin_name))
else:

View File

@ -198,7 +198,7 @@ async def _(bot: T_Bot, event: T_MessageEvent, result: Arparma, matcher: Matcher
else:
pass
if send_as_md:
await md.send_md(reply, bot, event=event)
await matcher.send(reply)
else:
if reply:
await matcher.finish(reply)

View File

@ -2,7 +2,6 @@ import nonebot
from nonebot.message import event_preprocessor
# from nonebot_plugin_alconna.typings import Event
from src.utils.base.ly_typing import T_MessageEvent
from src.utils import satori_utils
from nonebot.adapters import satori

View File

@ -1,163 +0,0 @@
import datetime
import time
import aiohttp
from nonebot import require
from nonebot.plugin import PluginMetadata
from src.utils.base.config import get_config
from src.utils.base.data import Database, LiteModel
from src.utils.base.resource import get_path
from src.utils.message.html_tool import template2image
require("nonebot_plugin_alconna")
require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_alconna import Alconna, AlconnaResult, CommandResult, Subcommand, UniMessage, on_alconna, Args
__author__ = "snowykami"
__plugin_meta__ = PluginMetadata(
name="签名服务器状态",
description="适用于ntqq的签名状态查看",
usage=(
"sign count 查看当前签名数\n"
"sign data 查看签名数变化\n"
"sign chart [limit] 查看签名数变化图表\n"
),
type="application",
homepage="https://github.com/snowykami/LiteyukiBot",
extra={
"liteyuki" : True,
"toggleable" : True,
"default_enable": True,
}
)
SIGN_COUNT_URLS: dict[str, str] = get_config("sign_count_urls", None)
SIGN_COUNT_DURATION = get_config("sign_count_duration", 10)
class SignCount(LiteModel):
TABLE_NAME: str = "sign_count"
time: float = 0.0
count: int = 0
sid: str = ""
sign_db = Database("data/liteyuki/ntqq_sign.ldb")
sign_db.auto_migrate(SignCount())
sign_status = on_alconna(Alconna(
"sign",
Subcommand(
"chart",
Args["limit", int, 10000]
),
Subcommand(
"count"
),
Subcommand(
"data"
)
))
cache_img: bytes = None
@sign_status.assign("count")
async def _():
reply = "Current sign count:"
for name, count in (await get_now_sign()).items():
reply += f"\n{name}: {count[1]}"
await sign_status.send(reply)
@sign_status.assign("data")
async def _():
query_stamp = [1, 5, 10, 15]
reply = "QPS from last " + ", ".join([str(i) for i in query_stamp]) + "mins"
for name, url in SIGN_COUNT_URLS.items():
count_data = []
for stamp in query_stamp:
count_rows = sign_db.where_all(SignCount(), "sid = ? and time > ?", url, time.time() - 60 * stamp)
if len(count_rows) < 2:
count_data.append(-1)
else:
count_data.append((count_rows[-1].count - count_rows[0].count)/(stamp*60))
reply += f"\n{name}: " + ", ".join([f"{i:.1f}" for i in count_data])
await sign_status.send(reply)
@sign_status.assign("chart")
async def _(arp: CommandResult = AlconnaResult()):
limit = arp.result.subcommands.get("chart").args.get("limit")
if limit == 10000:
if cache_img:
await sign_status.send(UniMessage.image(raw=cache_img))
return
img = await generate_chart(limit)
await sign_status.send(UniMessage.image(raw=img))
@scheduler.scheduled_job("interval", seconds=SIGN_COUNT_DURATION, next_run_time=datetime.datetime.now())
async def update_sign_count():
global cache_img
if not SIGN_COUNT_URLS:
return
data = await get_now_sign()
for name, count in data.items():
await save_sign_count(count[0], count[1], SIGN_COUNT_URLS[name])
cache_img = await generate_chart(10000)
async def get_now_sign() -> dict[str, tuple[float, int]]:
"""
Get the sign count and the time of the latest sign
Returns:
tuple[float, int] | None: (time, count)
"""
data = {}
now = time.time()
async with aiohttp.ClientSession() as client:
for name, url in SIGN_COUNT_URLS.items():
async with client.get(url) as resp:
count = (await resp.json())["count"]
data[name] = (now, count)
return data
async def save_sign_count(timestamp: float, count: int, sid: str):
"""
Save the sign count to the database
Args:
sid: the sign id use url as the id
count:
timestamp (float): the time of the sign count (int): the count of the sign
"""
sign_db.save(SignCount(time=timestamp, count=count, sid=sid))
async def generate_chart(limit):
data = []
for name, url in SIGN_COUNT_URLS.items():
count_rows = sign_db.where_all(SignCount(), "sid = ? ORDER BY id DESC LIMIT ?", url, limit)
count_rows.reverse()
data.append(
{
"name" : name,
# "data": [[row.time, row.count] for row in count_rows]
"times" : [row.time for row in count_rows],
"counts": [row.count for row in count_rows]
}
)
img = await template2image(
template=get_path("templates/sign_status.html"),
templates={
"data": data
},
)
return img

View File

@ -61,6 +61,8 @@ async def get_stat_msg_image(
condition_args.append(user_id)
msg_rows = msg_db.where_all(MessageEventModel(), condition, *condition_args)
if not msg_rows:
msg_rows = []
timestamps = []
msg_count = []
msg_rows.sort(key=lambda x: x.time)
@ -157,8 +159,8 @@ async def get_stat_rank_image(
templates = {
"data": {
"name": ulang.get("stat.rank")
+ f" 类别:{rank_type}"
+ f" 制约:{limit}",
+ f" Type {rank_type}"
+ f" Limit {limit}",
"ranking": ranking,
}
}

View File

@ -96,8 +96,10 @@ async def _(result: Arparma, event: T_MessageEvent, bot: Bot):
bot_id = result.other_args.get("bot_id")
user_id = result.other_args.get("user_id")
if group_id in ["current", "c"]:
if group_id in ["current", "c"] and hasattr(event, "group_id"):
group_id = str(event_utils.get_group_id(event))
else:
group_id = "all"
if group_id in ["all", "a"]:
group_id = "all"

View File

@ -7,7 +7,8 @@ from cpuinfo import cpuinfo
from nonebot import require
from nonebot.adapters import satori
from src.utils import __NAME__, __VERSION__
from src.utils import __NAME__
from liteyuki import __version__
from src.utils.base.config import get_config
from src.utils.base.data_manager import TempConfig, common_db
from src.utils.base.language import Language
@ -227,11 +228,19 @@ async def get_hardware_data() -> dict:
pass
swap = psutil.swap_memory()
cpu_brand_raw = cpuinfo.get_cpu_info().get("brand_raw", "未知处理器")
if "AMD" in cpu_brand_raw:
if "amd" in cpu_brand_raw.lower():
brand = "AMD"
elif "Intel" in cpu_brand_raw:
elif "intel" in cpu_brand_raw:
brand = "英特尔"
elif "Nvidia" in cpu_brand_raw:
elif "apple" in cpu_brand_raw.lower():
brand = "苹果"
elif "qualcomm" in cpu_brand_raw.lower():
brand = "高通"
elif "mediatek" in cpu_brand_raw.lower():
brand = "联发科"
elif "samsung" in cpu_brand_raw.lower():
brand = "三星"
elif "nvidia" in cpu_brand_raw.lower():
brand = "英伟达"
else:
brand = "未知处理器"
@ -262,7 +271,9 @@ async def get_hardware_data() -> dict:
for disk in psutil.disk_partitions(all=True):
try:
disk_usage = psutil.disk_usage(disk.mountpoint)
if disk_usage.total == 0:
if disk_usage.total == 0 or disk.mountpoint.startswith(
("/var", "/boot", "/run", "/proc", "/sys", "/dev", "/tmp", "/snap")
):
continue # 虚拟磁盘
result["disk"].append(
{
@ -283,7 +294,7 @@ async def get_liteyuki_data() -> dict:
temp_data: TempConfig = common_db.where_one(TempConfig(), default=TempConfig())
result = {
"name": list(get_config("nickname", [__NAME__]))[0],
"version": f"{__VERSION__}{'-' + commit_hash[:7] if (commit_hash and len(commit_hash) > 8) else ''}",
"version": f"{__version__}{'-' + commit_hash[:7] if (commit_hash and len(commit_hash) > 8) else ''}",
"plugins": len(nonebot.get_loaded_plugins()),
"resources": len(get_loaded_resource_packs()),
"nonebot": f"{nonebot.__version__}",

View File

@ -1,7 +1,6 @@
import datetime
import aiohttp
import httpx
import nonebot
from nonebot import require
from nonebot.exception import IgnoredException

View File

@ -5,15 +5,24 @@ from nonebot import require
from src.utils.base.data import LiteModel, Database
from src.utils.base.data_manager import User, user_db, group_db
from src.utils.base.language import Language, change_user_lang, get_all_lang, get_user_lang
from src.utils.base.language import (
Language,
change_user_lang,
get_all_lang,
get_user_lang,
)
from src.utils.base.ly_typing import T_Bot, T_MessageEvent
from src.utils.message.message import MarkdownMessage as md
# from src.utils.message.html_tool import md_to_pic
from .const import representative_timezones_list
from src.utils import event as event_utils
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import Alconna, Args, Arparma, Subcommand, on_alconna
profile_alc = on_alconna(
Alconna(
"profile",
@ -28,7 +37,7 @@ profile_alc = on_alconna(
alias=["g", "查询"],
),
),
aliases={"用户信息"}
aliases={"用户信息"},
)
@ -42,13 +51,21 @@ class Profile(LiteModel):
@profile_alc.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
user: User = user_db.where_one(User(), "user_id = ?", event_utils.get_user_id(event),
default=User(user_id=str(event_utils.get_user_id(event))))
user: User = user_db.where_one(
User(),
"user_id = ?",
event_utils.get_user_id(event),
default=User(user_id=str(event_utils.get_user_id(event))),
)
ulang = get_user_lang(str(event_utils.get_user_id(event)))
if result.subcommands.get("set"):
if result.subcommands["set"].args.get("value"):
# 对合法性进行校验后设置
r = set_profile(result.args["key"], result.args["value"], str(event_utils.get_user_id(event)))
r = set_profile(
result.args["key"],
result.args["value"],
str(event_utils.get_user_id(event)),
)
if r:
user.profile[result.args["key"]] = result.args["value"]
user_db.save(user) # 数据库保存
@ -56,18 +73,28 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
ulang.get(
"user.profile.set_success",
ATTR=ulang.get(f"user.profile.{result.args['key']}"),
VALUE=result.args["value"]
VALUE=result.args["value"],
)
)
else:
await profile_alc.finish(ulang.get("user.profile.set_failed", ATTR=ulang.get(f"user.profile.{result.args['key']}")))
await profile_alc.finish(
ulang.get(
"user.profile.set_failed",
ATTR=ulang.get(f"user.profile.{result.args['key']}"),
)
)
else:
# 未输入值,尝试呼出菜单
menu = get_profile_menu(result.args["key"], ulang)
if menu:
await md.send_md(menu, bot, event=event)
else:
await profile_alc.finish(ulang.get("user.profile.input_value", ATTR=ulang.get(f"user.profile.{result.args['key']}")))
await profile_alc.finish(
ulang.get(
"user.profile.input_value",
ATTR=ulang.get(f"user.profile.{result.args['key']}"),
)
)
user.profile[result.args["key"]] = result.args["value"]
@ -92,11 +119,16 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
continue
val = profile.dict()[key]
key_text = ulang.get(f"user.profile.{key}")
btn_set = md.btn_cmd(ulang.get("user.profile.edit"), f"profile set {key}",
enter=True if key in enter_attr else False)
reply += (f"\n**{key_text}** **{val}**\n"
f"\n> {ulang.get(f'user.profile.{key}.desc')}"
f"\n> {btn_set} \n\n***\n")
btn_set = md.btn_cmd(
ulang.get("user.profile.edit"),
f"profile set {key}",
enter=True if key in enter_attr else False,
)
reply += (
f"\n**{key_text}** **{val}**\n"
f"\n> {ulang.get(f'user.profile.{key}.desc')}"
f"\n> {btn_set} \n\n***\n"
)
await md.send_md(reply, bot, event=event)
@ -119,7 +151,9 @@ def get_profile_menu(key: str, ulang: Language) -> Optional[str]:
reply = f"**{setting_name} {ulang.get('user.profile.settings')}**\n***\n"
if key == "lang":
for lang_code, lang_name in get_all_lang().items():
btn_set_lang = md.btn_cmd(f"{lang_name}({lang_code})", f"profile set {key} {lang_code}")
btn_set_lang = md.btn_cmd(
f"{lang_name}({lang_code})", f"profile set {key} {lang_code}"
)
reply += f"\n{btn_set_lang}\n***\n"
elif key == "timezone":
for tz in representative_timezones_list:

View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/20 上午5:10
@Author : snowykami
@Email : snowykami@outlook.com
@File : to_liteyuki.py
@Software: PyCharm
"""
import asyncio
from nonebot import Bot, get_bot, on_message, get_driver
from nonebot.plugin import PluginMetadata
from nonebot.adapters.onebot.v11 import MessageEvent, Bot
from liteyuki import Channel
from liteyuki.comm import get_channel
from liteyuki.comm.storage import shared_memory
from liteyuki.message.event import MessageEvent as LiteyukiMessageEvent
__plugin_meta__ = PluginMetadata(
name="轻雪push",
description="把消息事件传递给轻雪框架进行处理",
usage="用户无需使用",
)
recv_channel = Channel[LiteyukiMessageEvent](name="event_to_nonebot")
# @on_message().handle()
# async def _(bot: Bot, event: MessageEvent):
# liteyuki_event = LiteyukiMessageEvent(
# message_type=event.message_type,
# message=event.dict()["message"],
# raw_message=event.raw_message,
# data=event.dict(),
# bot_id=bot.self_id,
# user_id=str(event.user_id),
# session_id=str(event.user_id if event.message_type == "private" else event.group_id),
# session_type=event.message_type,
# receive_channel=recv_channel,
# )
# shared_memory.publish("event_to_liteyuki", liteyuki_event)
# @get_driver().on_bot_connect
# async def _():
# while True:
# event = await recv_channel.async_receive()
# bot: Bot = get_bot(event.bot_id) # type: ignore
# if event.message_type == "private":
# await bot.send_private_msg(user_id=int(event.session_id), message=event.data["message"])
# elif event.message_type == "group":
# await bot.send_group_msg(group_id=int(event.session_id), message=event.data["message"])

View File

@ -1,3 +0,0 @@
name: Sign Status
description: for Lagrange
version: 2024.4.26

View File

@ -1,4 +0,0 @@
.sign-chart {
height: 400px;
background-color: rgba(255, 255, 255, 0.7);
}

View File

@ -1,75 +0,0 @@
// 数据类型声明
// import * as echarts from 'echarts';
let data = JSON.parse(document.getElementById("data").innerText) // object
const signChartDivTemplate = document.importNode(document.getElementById("sign-chart-template").content, true)
data.forEach((item) => {
let signChartDiv = signChartDivTemplate.cloneNode(true)
let chartID = item["name"]
// 初始化ECharts实例
// 设置id
signChartDiv.querySelector(".sign-chart").id = chartID
document.body.appendChild(signChartDiv)
let signChart = echarts.init(document.getElementById(chartID))
let timeCount = []
item["counts"].forEach((count, index) => {
// 计算平均值index - 1的count + index的count + index + 1的count /3
if (index > 0) {
timeCount.push((item["counts"][index] - item["counts"][index - 1]) / (60*(item["times"][index] - item["times"][index - 1])))
}
})
console.log(timeCount)
signChart.setOption(
{
animation: false,
title: {
text: item["name"],
textStyle: {
color: '#000000' // 设置标题文本颜色为红色
}
},
xAxis: {
type: 'category',
data: item["times"].map(timestampToTime),
},
yAxis: [
{
type: 'value',
min: Math.min(...item["counts"]),
},
{
type: 'value',
min: Math.min(...timeCount),
}
],
series: [
{
data: item["counts"],
type: 'line',
yAxisIndex: 0
},
{
data: timeCount,
type: 'line',
yAxisIndex: 1
}
]
}
)
})
function timestampToTime(timestamp) {
let date = new Date(timestamp * 1000)
let Y = date.getFullYear() + '-'
let M = (date.getMonth() + 1 < 10 ? '0' + (date.getMonth() + 1) : date.getMonth() + 1) + '-'
let D = date.getDate() + ' '
let h = date.getHours() + ':'
let m = date.getMinutes() + ':'
let s = date.getSeconds()
return M + D + h + m + s
}

View File

@ -1,22 +0,0 @@
<!DOCTYPE html>
<html lang="zh" xmlns="http://www.w3.org/1999/html">
<head>
<meta charset="UTF-8">
<title>Liteyuki Status</title>
<link rel="stylesheet" href="./css/card.css">
<link rel="stylesheet" href="./css/fonts.css">
<link rel="stylesheet" href="./css/sign_status.css">
</head>
<body>
<template id="sign-chart-template">
<div class="info-box sign-chart">
</div>
</template>
<div class="data-storage" id="data">{{ data | tojson }}</div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/echarts/5.5.0/echarts.min.js"></script>
<script src="./js/sign_status.js"></script>
<script src="./js/card.js"></script>
</body>

View File

@ -1,10 +1,10 @@
language.name=文言
log.debug=试
log.info=讯
log.warning=警
log.error=
log.success=
log.debug=试
log.info=讯
log.warning=警
log.error=误
log.success=成
liteyuki.restart=复启
liteyuki.restart_now=即复启

View File

@ -113,6 +113,7 @@
z-index: 1;
}
/*
.disk-title {
position: absolute;
color: var(--main-text-color);
@ -126,6 +127,31 @@
max-width: calc(100% - 40px);
z-index: 2;
}
*/
.disk-name {
position: absolute;
color: var(--main-text-color);
font-size: 26px;
margin-left: 20px;
text-align: left;
white-space: normal; /* 允许换行 */
overflow: hidden;
text-overflow: ellipsis;
word-break: break-all; /* 允许在单词内换行 */
z-index: 2;
}
.disk-details {
position: absolute;
right: 20px;
/* 调整位置以确保有足够的空间显示 */
color: var(--main-text-color);
font-size: 24px;
text-align: right;
white-space: nowrap;
z-index: 2;
}
#motto-text {
font-size: 42px;

View File

@ -1,9 +1,9 @@
const data = JSON.parse(document.getElementById('data').innerText);
const bot_data = data['bot']; // 机器人数据
const hardwareData = data['hardware']; // 硬件数据
const liteyukiData = data['liteyuki']; // LiteYuki数据
const localData = data['localization']; // 本地化语言数据
const motto_ = data['motto']; // 言论数据
const data = JSON.parse(document.getElementById("data").innerText);
const bot_data = data["bot"]; // 机器人数据
const hardwareData = data["hardware"]; // 硬件数据
const liteyukiData = data["liteyuki"]; // LiteYuki数据
const localData = data["localization"]; // 本地化语言数据
const motto_ = data["motto"]; // 言论数据
/**
* 创建CPU/内存/交换饼图
@ -16,52 +16,54 @@ function createPieChartOption(title, data) {
animation: false,
title: {
text: title,
left: 'center',
top: 'center',
left: "center",
top: "center",
textStyle: {
color: '#000',
color: "#000",
fontSize: 30,
lineHeight: 36
}
lineHeight: 36,
},
},
tooltip: {
show: true,
trigger: 'item',
backgroundColor: '#000',
trigger: "item",
backgroundColor: "#000",
},
color: data.length === 3 ? ['#053349', '#007ebd', "#00000044"] : ['#007ebd', '#00000044'],
color:
data.length === 3
? ["#053349", "#007ebd", "#00000044"]
: ["#007ebd", "#00000044"],
series: [
{
name: 'info',
type: 'pie',
radius: ['80%', '100%'],
center: ['50%', '50%'],
name: "info",
type: "pie",
radius: ["80%", "100%"],
center: ["50%", "50%"],
itemStyle: {
normal: {
label: {
show: false
show: false,
},
labelLine: {
show: false
}
show: false,
},
},
emphasis: {
label: {
show: true,
textStyle: {
fontSize: '50',
fontWeight: 'bold'
}
}
}
fontSize: "50",
fontWeight: "bold",
},
},
},
},
data: data
}
]
}
data: data,
},
],
};
}
function convertSize(size, precision = 2, addUnit = true, suffix = " X字节") {
let isNegative = size < 0;
size = Math.abs(size);
@ -81,7 +83,7 @@ function convertSize(size, precision = 2, addUnit = true, suffix = " X字节") {
}
if (addUnit) {
return size.toFixed(precision) + suffix.replace('X', unit);
return size.toFixed(precision) + suffix.replace("X", unit);
} else {
return size;
}
@ -92,206 +94,260 @@ function convertSize(size, precision = 2, addUnit = true, suffix = " X字节") {
* @param title
* @param percent 数据
*/
function createBarChart(title, percent) {
function createBarChart(title, percent, name) {
// percent为百分比最大值为100
let diskDiv = document.createElement('div')
diskDiv.setAttribute('class', 'disk-info')
diskDiv.style.marginBottom = '20px'
let diskDiv = document.createElement("div");
diskDiv.setAttribute("class", "disk-info");
diskDiv.style.marginBottom = "20px";
diskDiv.innerHTML = `
<div class="disk-title">${title}</div>
<div class="disk-usage" style="width: ${percent}%"></div>
`
`;
updateDiskNameWidth(diskDiv);
return diskDiv
return diskDiv;
}
// 更新 .disk-name 宽度
function updateDiskNameWidth(diskInfoElement) {
let diskDetails = diskInfoElement.querySelector(".disk-details");
let diskName = diskInfoElement.querySelector(".disk-name");
let detailsWidth = diskDetails.offsetWidth;
let parentWidth = diskInfoElement.offsetWidth;
let nameMaxWidth = parentWidth - detailsWidth - 20 - 40;
diskName.style.maxWidth = `${nameMaxWidth}px`;
}
function secondsToTextTime(seconds) {
let days = Math.floor(seconds / 86400)
let hours = Math.floor((seconds % 86400) / 3600)
let minutes = Math.floor((seconds % 3600) / 60)
let seconds_ = Math.floor(seconds % 60)
return `${days}${localData['days']} ${hours}${localData['hours']} ${minutes}${localData['minutes']} ${seconds_}${localData['seconds']}`
let days = Math.floor(seconds / 86400);
let hours = Math.floor((seconds % 86400) / 3600);
let minutes = Math.floor((seconds % 3600) / 60);
let seconds_ = Math.floor(seconds % 60);
return `${days}${localData["days"]} ${hours}${localData["hours"]} ${minutes}${localData["minutes"]} ${seconds_}${localData["seconds"]}`;
}
// 主函数
function main() {
// 添加机器人信息
bot_data['bots'].forEach(
(bot) => {
let botInfoDiv = document.importNode(document.getElementById('bot-template').content, true) // 复制模板
bot_data["bots"].forEach((bot) => {
let botInfoDiv = document.importNode(
document.getElementById("bot-template").content,
true
); // 复制模板
// 设置机器人信息
botInfoDiv.className = "info-box bot-info";
// 设置机器人信息
botInfoDiv.className = 'info-box bot-info'
botInfoDiv.querySelector('.bot-icon-img').setAttribute('src', bot['icon'])
botInfoDiv.querySelector('.bot-name').innerText = bot['name']
let tagArray = [
bot['protocol_name'],
`${bot['app_name']}`,
`${localData['groups']}${bot['groups']}`,
`${localData['friends']}${bot['friends']}`,
`${localData['message_sent']}${bot['message_sent']}`,
`${localData['message_received']}${bot['message_received']}`,
]
// 添加一些标签
tagArray.forEach(
(tag, index) => {
let tagSpan = document.createElement('span')
tagSpan.className = 'bot-tag'
tagSpan.innerText = tag
// 给最后一个标签不添加后缀
tagSpan.setAttribute('suffix', (index === 0) || (tag[0] == '\n') ? '0' : '1')
botInfoDiv.querySelector('.bot-tags').appendChild(tagSpan)
}
)
document.body.insertBefore(botInfoDiv, document.getElementById('hardware-info')) // 插入对象
}
)
botInfoDiv.querySelector(".bot-icon-img").setAttribute("src", bot["icon"]);
botInfoDiv.querySelector(".bot-name").innerText = bot["name"];
let tagArray = [
bot["protocol_name"],
`${bot["app_name"]}`,
`${localData["groups"]}${bot["groups"]}`,
`${localData["friends"]}${bot["friends"]}`,
`${localData["message_sent"]}${bot["message_sent"]}`,
`${localData["message_received"]}${bot["message_received"]}`,
];
// 添加一些标签
tagArray.forEach((tag, index) => {
let tagSpan = document.createElement("span");
tagSpan.className = "bot-tag";
tagSpan.innerText = tag;
// 给最后一个标签不添加后缀
tagSpan.setAttribute("suffix", index === 0 || tag[0] == "\n" ? "0" : "1");
botInfoDiv.querySelector(".bot-tags").appendChild(tagSpan);
});
document.body.insertBefore(
botInfoDiv,
document.getElementById("hardware-info")
); // 插入对象
});
// 添加轻雪信息
let liteyukiInfoDiv = document.importNode(document.getElementById('bot-template').content, true) // 复制模板
liteyukiInfoDiv.className = 'info-box bot-info'
liteyukiInfoDiv.querySelector('.bot-icon-img').setAttribute('src', './img/litetrimo.png')
liteyukiInfoDiv.querySelector('.bot-name').innerText = `${liteyukiData['name']} - 睿乐`
let liteyukiInfoDiv = document.importNode(
document.getElementById("bot-template").content,
true
); // 复制模板
liteyukiInfoDiv.className = "info-box bot-info";
liteyukiInfoDiv
.querySelector(".bot-icon-img")
.setAttribute("src", "./img/litetrimo.png");
liteyukiInfoDiv.querySelector(
".bot-name"
).innerText = `${liteyukiData["name"]} - 睿乐`;
let tagArray = [
`灵温 ${liteyukiData['version']}`,
`Nonebot ${liteyukiData['nonebot']}`,
`${liteyukiData['python']}`,
liteyukiData['system'],
`${localData['plugins']}${liteyukiData['plugins']}`,
`${localData['resources']}${liteyukiData['resources']}`,
`${localData['bots']}${liteyukiData['bots']}`,
`${localData['runtime']} ${secondsToTextTime(liteyukiData['runtime'])}`,
]
tagArray.forEach(
(tag, index) => {
let tagSpan = document.createElement('span')
tagSpan.className = 'bot-tag'
tagSpan.innerText = tag
// 给最后一个标签不添加后缀
tagSpan.setAttribute('suffix', (index === 0) || (tag[0] == '\n') ? '0' : '1')
liteyukiInfoDiv.querySelector('.bot-tags').appendChild(tagSpan)
}
)
document.body.insertBefore(liteyukiInfoDiv, document.getElementById('hardware-info')) // 插入对象
`灵温 ${liteyukiData["version"]}`,
`Nonebot ${liteyukiData["nonebot"]}`,
`${liteyukiData["python"]}`,
liteyukiData["system"],
`${localData["plugins"]}${liteyukiData["plugins"]}`,
`${localData["resources"]}${liteyukiData["resources"]}`,
`${localData["bots"]}${liteyukiData["bots"]}`,
`${localData["runtime"]} ${secondsToTextTime(liteyukiData["runtime"])}`,
];
tagArray.forEach((tag, index) => {
let tagSpan = document.createElement("span");
tagSpan.className = "bot-tag";
tagSpan.innerText = tag;
// 给最后一个标签不添加后缀
tagSpan.setAttribute("suffix", index === 0 || tag[0] == "\n" ? "0" : "1");
liteyukiInfoDiv.querySelector(".bot-tags").appendChild(tagSpan);
});
document.body.insertBefore(
liteyukiInfoDiv,
document.getElementById("hardware-info")
); // 插入对象
// 添加硬件信息
const cpuData = hardwareData['cpu']
const memData = hardwareData['memory']
const swapData = hardwareData['swap']
const cpuData = hardwareData["cpu"];
const memData = hardwareData["memory"];
const swapData = hardwareData["swap"];
const cpuTagArray = [
cpuData['name'],
`${cpuData['cores']}${localData['cores']} ${cpuData['threads']}${localData['threads']}`,
`${(cpuData['freq'] / 1000).toFixed(2)}吉赫兹`
]
cpuData["name"],
`${cpuData["cores"]}${localData["cores"]} ${cpuData["threads"]}${localData["threads"]}`,
`${(cpuData["freq"] / 1000).toFixed(2)}吉赫兹`,
];
const memTagArray = [
`${localData['process']} ${convertSize(memData['usedProcess'])}`,
`${localData['used']} ${convertSize(memData['used'])}`,
`${localData['free']} ${convertSize(memData['free'])}`,
`${localData['total']} ${convertSize(memData['total'])}`
]
`${localData["process"]} ${convertSize(memData["usedProcess"])}`,
`${localData["used"]} ${convertSize(memData["used"])}`,
`${localData["free"]} ${convertSize(memData["free"])}`,
`${localData["total"]} ${convertSize(memData["total"])}`,
];
const swapTagArray = [
`${localData['used']} ${convertSize(swapData['used'])}`,
`${localData['free']} ${convertSize(swapData['free'])}`,
`${localData['total']} ${convertSize(swapData['total'])}`
]
let cpuDeviceInfoDiv = document.importNode(document.getElementById('device-info').content, true)
let memDeviceInfoDiv = document.importNode(document.getElementById('device-info').content, true)
let swapDeviceInfoDiv = document.importNode(document.getElementById('device-info').content, true)
`${localData["used"]} ${convertSize(swapData["used"])}`,
`${localData["free"]} ${convertSize(swapData["free"])}`,
`${localData["total"]} ${convertSize(swapData["total"])}`,
];
let cpuDeviceInfoDiv = document.importNode(
document.getElementById("device-info").content,
true
);
let memDeviceInfoDiv = document.importNode(
document.getElementById("device-info").content,
true
);
let swapDeviceInfoDiv = document.importNode(
document.getElementById("device-info").content,
true
);
cpuDeviceInfoDiv.querySelector('.device-info').setAttribute('id', 'cpu-info')
memDeviceInfoDiv.querySelector('.device-info').setAttribute('id', 'mem-info')
swapDeviceInfoDiv.querySelector('.device-info').setAttribute('id', 'swap-info')
cpuDeviceInfoDiv.querySelector('.device-chart').setAttribute('id', 'cpu-chart')
memDeviceInfoDiv.querySelector('.device-chart').setAttribute('id', 'mem-chart')
swapDeviceInfoDiv.querySelector('.device-chart').setAttribute('id', 'swap-chart')
cpuDeviceInfoDiv.querySelector(".device-info").setAttribute("id", "cpu-info");
memDeviceInfoDiv.querySelector(".device-info").setAttribute("id", "mem-info");
swapDeviceInfoDiv
.querySelector(".device-info")
.setAttribute("id", "swap-info");
cpuDeviceInfoDiv
.querySelector(".device-chart")
.setAttribute("id", "cpu-chart");
memDeviceInfoDiv
.querySelector(".device-chart")
.setAttribute("id", "mem-chart");
swapDeviceInfoDiv
.querySelector(".device-chart")
.setAttribute("id", "swap-chart");
let devices = {
'cpu': cpuDeviceInfoDiv,
'mem': memDeviceInfoDiv,
'swap': swapDeviceInfoDiv
}
cpu: cpuDeviceInfoDiv,
mem: memDeviceInfoDiv,
swap: swapDeviceInfoDiv,
};
// 遍历添加标签
for (let device in devices) {
let tagArray = []
let tagArray = [];
switch (device) {
case 'cpu':
tagArray = cpuTagArray
break
case 'mem':
tagArray = memTagArray
break
case 'swap':
tagArray = swapTagArray
break
case "cpu":
tagArray = cpuTagArray;
break;
case "mem":
tagArray = memTagArray;
break;
case "swap":
tagArray = swapTagArray;
break;
}
tagArray.forEach(
(tag, index) => {
let tagDiv = document.createElement('div')
tagDiv.className = 'device-tag'
tagDiv.innerText = tag
// 给最后一个标签不添加后缀
tagDiv.setAttribute('suffix', index === tagArray.length - 1 ? '0' : '1')
devices[device].querySelector('.device-tags').appendChild(tagDiv)
}
)
tagArray.forEach((tag, index) => {
let tagDiv = document.createElement("div");
tagDiv.className = "device-tag";
tagDiv.innerText = tag;
// 给最后一个标签不添加后缀
tagDiv.setAttribute("suffix", index === tagArray.length - 1 ? "0" : "1");
devices[device].querySelector(".device-tags").appendChild(tagDiv);
});
}
// 插入
document.getElementById('hardware-info').appendChild(cpuDeviceInfoDiv)
document.getElementById('hardware-info').appendChild(memDeviceInfoDiv)
document.getElementById('hardware-info').appendChild(swapDeviceInfoDiv)
document.getElementById("hardware-info").appendChild(cpuDeviceInfoDiv);
document.getElementById("hardware-info").appendChild(memDeviceInfoDiv);
document.getElementById("hardware-info").appendChild(swapDeviceInfoDiv);
let cpuChart = echarts.init(document.getElementById('cpu-chart'))
let memChart = echarts.init(document.getElementById('mem-chart'))
let swapChart = echarts.init(document.getElementById('swap-chart'))
let cpuChart = echarts.init(document.getElementById("cpu-chart"));
let memChart = echarts.init(document.getElementById("mem-chart"));
let swapChart = echarts.init(document.getElementById("swap-chart"));
cpuChart.setOption(
createPieChartOption(
`${localData["cpu"]}\n${cpuData["percent"].toFixed(1)}%`,
[
{ name: "used", value: cpuData["percent"] },
{ name: "free", value: 100 - cpuData["percent"] },
]
)
);
cpuChart.setOption(createPieChartOption(`${localData['cpu']}\n${cpuData['percent'].toFixed(1)}%`, [
{ name: 'used', value: cpuData['percent'] },
{ name: 'free', value: 100 - cpuData['percent'] }
]))
memChart.setOption(createPieChartOption(`${localData['memory']}\n${memData['percent'].toFixed(1)}%`, [
{ name: 'process', value: memData['usedProcess'] },
{ name: 'used', value: memData['used'] - memData['usedProcess'] },
{ name: 'free', value: memData['free'] }
]))
swapChart.setOption(createPieChartOption(`${localData['swap']}\n${swapData['percent'].toFixed(1)}%`, [
{ name: 'used', value: swapData['used'] },
{ name: 'free', value: swapData['free'] }
]))
memChart.setOption(
createPieChartOption(
`${localData["memory"]}\n${memData["percent"].toFixed(1)}%`,
[
{ name: "process", value: memData["usedProcess"] },
{ name: "used", value: memData["used"] - memData["usedProcess"] },
{ name: "free", value: memData["free"] },
]
)
);
swapChart.setOption(
createPieChartOption(
`${localData["swap"]}\n${swapData["percent"].toFixed(1)}%`,
[
{ name: "used", value: swapData["used"] },
{ name: "free", value: swapData["free"] },
]
)
);
// 磁盘信息
const diskData = hardwareData['disk']
diskData.forEach(
(disk) => {
let diskTitle = `${disk['name']} ${localData['free']} ${convertSize(disk['free'])} ${localData['total']} ${convertSize(disk['total'])}`
// 最后一个把margin-bottom去掉
let diskDiv = createBarChart(diskTitle, disk['percent'])
if (disk === diskData[diskData.length - 1]) {
diskDiv.style.marginBottom = '0'
}
document.getElementById('disk-info').appendChild(createBarChart(diskTitle, disk['percent']))
})
const diskData = hardwareData["disk"];
diskData.forEach((disk) => {
let diskTitle = `${localData['free']} ${convertSize(disk['free'])} ${localData['total']} ${convertSize(disk['total'])}`;
let diskDiv = createBarChart(diskTitle, disk['percent'], disk['name']);
// 最后一个把margin-bottom去掉
if (disk === diskData[diskData.length - 1]) {
diskDiv.style.marginBottom = "0";
}
document.getElementById('disk-info').appendChild(diskDiv);
});
// 随机一言
let mottoText = motto_['text']
let mottoFrom = motto_['source']
document.getElementById('motto-text').innerText = mottoText
document.getElementById('motto-from').innerText = mottoFrom
let mottoText = motto_["text"];
let mottoFrom = motto_["source"];
document.getElementById("motto-text").innerText = mottoText;
document.getElementById("motto-from").innerText = mottoFrom;
// 致谢
document.getElementById('addition-info').innerText = '感谢 锅炉 云裳工作室 提供服务器支持'
document.getElementById("addition-info").innerText =
"感谢 锅炉 云裳工作室 提供服务器支持";
}
main()
main();
/*
// 窗口大小改变监听器 -- Debug
window.addEventListener('resize', () => {
const diskInfos = document.querySelectorAll('.disk-info');
diskInfos.forEach(diskInfo => {
updateDiskNameWidth(diskInfo);
});
});
*/

View File

@ -837,22 +837,6 @@
"time": "2023-06-20T16:04:40.706727Z",
"skip_test": false
},
{
"module_name": "nonebot_plugin_htmlrender",
"project_link": "nonebot-plugin-htmlrender",
"name": "nonebot-plugin-htmlrender",
"desc": "通过浏览器渲染图片",
"author": "kexue-z",
"homepage": "https://github.com/kexue-z/nonebot-plugin-htmlrender",
"tags": [],
"is_official": false,
"type": "library",
"supported_adapters": null,
"valid": true,
"version": "0.3.1",
"time": "2024-03-14T08:47:15.010445Z",
"skip_test": false
},
{
"module_name": "nonebot_plugin_admin",
"project_link": "nonebot-plugin-admin",
@ -8718,31 +8702,6 @@
"time": "2023-07-14T10:32:08.006009Z",
"skip_test": false
},
{
"module_name": "nonebot_plugin_templates",
"project_link": "nonebot-plugin-templates",
"name": "templates_render",
"desc": "使用htmlrender和jinja2渲染,使用构建的menu,card或dict进行模板渲染",
"author": "canxin121",
"homepage": "https://github.com/canxin121/nonebot_plugin_templates",
"tags": [
{
"label": "模板渲染",
"color": "#eacd52"
},
{
"label": "图片生成",
"color": "#adea52"
}
],
"is_official": false,
"type": "library",
"supported_adapters": null,
"valid": true,
"version": "0.1.6",
"time": "2023-08-24T09:56:33.184091Z",
"skip_test": false
},
{
"module_name": "nonebot_plugin_pokesomeone",
"project_link": "nonebot-plugin-pokesomeone",

View File

@ -1,13 +1,9 @@
import json
import os.path
import platform
import sys
import time
import nonebot
__NAME__ = "尹灵温|轻雪-睿乐"
__VERSION__ = "6.3.4" # 60201
__VERSION__ = "6.3.9" # 60201
import requests
@ -15,36 +11,10 @@ from src.utils.base.config import load_from_yaml, config
from src.utils.base.log import init_log
from git import Repo
major, minor, patch = map(int, __VERSION__.split("."))
__VERSION_I__ = 99000000 + major * 10000 + minor * 100 + patch
def register_bot():
url = "https://api.liteyuki.icu/register"
data = {
"name": __NAME__,
"version": __VERSION__,
"version_i": __VERSION_I__,
"python": f"{platform.python_implementation()} {platform.python_version()}",
"os": f"{platform.system()} {platform.version()} {platform.machine()}",
}
try:
nonebot.logger.info("正在等待 Liteyuki 注册服务器…")
resp = requests.post(url, json=data, timeout=(10, 15))
if resp.status_code == 200:
data = resp.json()
if liteyuki_id := data.get("liteyuki_id"):
with open("data/liteyuki/liteyuki.json", "wb") as f:
f.write(json.dumps(data).encode("utf-8"))
nonebot.logger.success(f"成功将 {liteyuki_id} 注册到 Liteyuki 服务器")
else:
raise ValueError(f"无法向 Liteyuki 服务器注册:{data}")
except Exception as e:
nonebot.logger.warning(f"向 Liteyuki 服务器注册失败,但无所谓:{e}")
def init():
"""
初始化
@ -64,25 +34,15 @@ def init():
repo = Repo(".")
except Exception as e:
nonebot.logger.error(
f"无法读取 Git 仓库 {e},你是否是从仓库下载的Zip文件请使用git clone。"
f"无法读取 Git 仓库 `{e}`,你是否是从仓库直接下载的Zip文件请使用git clone。"
)
# temp_data: TempConfig = common_db.where_one(TempConfig(), default=TempConfig())
# temp_data.data["start_time"] = time.time()
# common_db.save(temp_data)
# 在加载完成语言后再初始化日志
nonebot.logger.info("尹灵温 正在初始化…")
if not os.path.exists("data/liteyuki/liteyuki.json"):
register_bot()
if not os.path.exists("pyproject.toml"):
with open("pyproject.toml", "w", encoding="utf-8") as f:
f.write("[tool.nonebot]\n")
nonebot.logger.info(
"正在 {} Python{}.{}.{} 上运行 尹灵温".format(
"正在 {} Python{}.{}.{} 上运行 尹灵温-NoneBot".format(
sys.executable,
sys.version_info.major,
sys.version_info.minor,

View File

@ -1,7 +1,6 @@
import threading
from nonebot import logger
from liteyuki.comm.channel import get_channel
from liteyuki.comm.channel import active_channel
def reload(delay: float = 0.0, receiver: str = "nonebot"):
@ -14,13 +13,9 @@ def reload(delay: float = 0.0, receiver: str = "nonebot"):
Returns:
"""
chan = get_channel(receiver + "-active")
if chan is None:
logger.error(f"未见 Channel {receiver}-active 以至无法重载")
return
if delay > 0:
threading.Timer(delay, chan.send, args=(1,)).start()
threading.Timer(delay, active_channel.send, args=(1,)).start()
return
else:
chan.send(1)
active_channel.send(1)

View File

@ -1,4 +1,5 @@
import os
import platform
from typing import List
import nonebot
@ -7,6 +8,7 @@ from pydantic import BaseModel
from ..message.tools import random_hex_string
config = {} # 全局配置,确保加载后读取
@ -29,23 +31,37 @@ class BasicConfig(BaseModel):
superusers: list[str] = []
command_start: list[str] = ["/", ""]
nickname: list[str] = [f"灵温-{random_hex_string(6)}"]
default_language: str = "zh-WY"
satori: SatoriConfig = SatoriConfig()
data_path: str = "data/liteyuki"
chromium_path: str = (
"/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome" # type: ignore
if platform.system() == "Darwin"
else (
"C:/Program Files (x86)/Microsoft/Edge/Application/msedge.exe"
if platform.system() == "Windows"
else "/usr/bin/chromium-browser"
)
)
def load_from_yaml(file: str) -> dict:
def load_from_yaml(file_: str) -> dict:
global config
nonebot.logger.debug("Loading config from %s" % file)
if not os.path.exists(file):
nonebot.logger.warning(f"未找到配置文件 {file} ,已创建默认配置,请修改后重启。")
with open(file, "w", encoding="utf-8") as f:
nonebot.logger.debug("正在从 {} 中加载配置项".format(file_))
if not os.path.exists(file_):
nonebot.logger.warning(
f"未寻得配置文件 {file_} ,已以默认配置创建,请在重启后更改为你所需的内容。"
)
with open(file_, "w", encoding="utf-8") as f:
yaml.dump(BasicConfig().dict(), f, default_flow_style=False)
with open(file, "r", encoding="utf-8") as f:
with open(file_, "r", encoding="utf-8") as f:
conf = init_conf(yaml.load(f, Loader=yaml.FullLoader))
config = conf
if conf is None:
nonebot.logger.warning(f"配置文件 {file} 为空,已创建默认配置,请修改后重启。")
nonebot.logger.warning(
f"配置文件 {file_} 为空,已以默认配置创建,请在重启后更改为你所需的内容。"
)
conf = BasicConfig().dict()
return conf
@ -75,7 +91,6 @@ def get_config(key: str, default=None):
return default
def init_conf(conf: dict) -> dict:
"""
初始化配置文件,确保配置文件中的必要字段存在,且不会冲突

View File

@ -30,7 +30,7 @@ class Database:
os.makedirs(os.path.dirname(db_name))
self.db_name = db_name
self.conn = sqlite3.connect(db_name)
self.conn = sqlite3.connect(db_name, check_same_thread=False)
self.cursor = self.conn.cursor()
self._on_save_callbacks = []
@ -94,7 +94,7 @@ class Database:
f"数据库 Selecting {model.TABLE_NAME} WHERE {condition.replace('?', '%s') % args}"
)
if not table_name:
raise ValueError(f"数据模型{model_type.__name__}未提供表名")
raise ValueError(f"数据模型 {model_type.__name__} 未提供表名")
# condition = f"WHERE {condition}"
# print(f"SELECT * FROM {table_name} {condition}", args)
@ -118,7 +118,7 @@ class Database:
]
def save(self, *args: LiteModel):
"""增/改操作
self.returns_ = """增/改操作
Args:
*args:
Returns:
@ -126,7 +126,7 @@ class Database:
table_list = [
item[0]
for item in self.cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
"SELECT name FROM sqlite_master WHERE type ='table'"
).fetchall()
]
for model in args:
@ -158,7 +158,7 @@ class Database:
new_obj[field] = value
else:
raise ValueError(
f"数据模型{table_name}包含不支持的数据类型,字段:{field} 值:{value} 值类型:{type(value)}"
f"数据模型 {table_name} 包含不支持的数据类型,字段:{field} 值:{value} 值类型:{type(value)}"
)
if table_name:
fields, values = [], []
@ -273,9 +273,9 @@ class Database:
"""
table_name = model.TABLE_NAME
logger.debug(f"数据库 Deleting {model} WHERE {condition} {args}")
logger.debug(f"Deleting {model} WHERE {condition} {args}")
if not table_name:
raise ValueError(f"数据模型{model.__class__.__name__}未提供表名")
raise ValueError(f"数据模型 {model.__class__.__name__} 未提供表名")
if model.id is not None:
condition = f"id = {model.id}"
if not condition and not allow_empty:
@ -297,7 +297,7 @@ class Database:
"""
for model in args:
if not model.TABLE_NAME:
raise ValueError(f"数据模型{type(model).__name__}未提供表名")
raise ValueError(f"数据模型 {type(model).__name__} 未提供表名")
# 若无则创建表
self.cursor.execute(

View File

@ -2,9 +2,8 @@ import os
from pydantic import Field
from .data import Database, LiteModel, Database
from .data import Database, LiteModel
print("导入数据库模块")
DATA_PATH = "data/liteyuki"
user_db: Database = Database(os.path.join(DATA_PATH, "users.ldb"))
group_db: Database = Database(os.path.join(DATA_PATH, "groups.ldb"))
@ -64,7 +63,7 @@ def auto_migrate():
user_db.auto_migrate(User())
group_db.auto_migrate(Group())
plugin_db.auto_migrate(InstalledPlugin(), GlobalPlugin())
common_db.auto_migrate(GlobalPlugin(), StoredConfig(), TempConfig())
common_db.auto_migrate(GlobalPlugin(), TempConfig())
auto_migrate()

View File

@ -1,92 +0,0 @@
import json
import os.path
import platform
import aiohttp
import nonebot
import psutil
import requests
from .config import load_from_yaml
from .. import __NAME__, __VERSION_I__, __VERSION__
class LiteyukiAPI:
def __init__(self):
self.liteyuki_id = None
if os.path.exists("data/liteyuki/liteyuki.json"):
with open("data/liteyuki/liteyuki.json", "rb") as f:
self.data = json.loads(f.read())
self.liteyuki_id = self.data.get("liteyuki_id")
self.report = load_from_yaml("config.yml").get("auto_report", True)
if self.report:
nonebot.logger.info("已启用自动上报")
@property
def device_info(self) -> dict:
"""
获取设备信息
Returns:
"""
return {
"name": __NAME__,
"version": __VERSION__,
"version_i": __VERSION_I__,
"python": f"{platform.python_implementation()} {platform.python_version()}",
"os": f"{platform.system()} {platform.version()} {platform.machine()}",
"cpu": f"{psutil.cpu_count(logical=False)}c{psutil.cpu_count()}t{psutil.cpu_freq().current}MHz",
"memory_total": f"{psutil.virtual_memory().total / 1024 ** 3:.2f}吉字节",
"memory_used": f"{psutil.virtual_memory().used / 1024 ** 3:.2f}吉字节",
"memory_bot": f"{psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2:.2f}兆字节",
"disk": f"{psutil.disk_usage('/').total / 1024 ** 3:.2f}吉字节",
}
def bug_report(self, content: str):
"""
提交bug报告
Args:
content:
Returns:
"""
if self.report:
nonebot.logger.warning(f"正在上报查误:{content}")
url = "https://api.liteyuki.icu/bug_report"
data = {
"liteyuki_id": self.liteyuki_id,
"content": content,
"device_info": self.device_info,
}
resp = requests.post(url, json=data)
if resp.status_code == 200:
nonebot.logger.success(
f"成功上报差误信息报文ID为{resp.json().get('report_id')}"
)
else:
nonebot.logger.error(f"差误上报错误:{resp.text}")
else:
nonebot.logger.warning(f"已禁用自动上报:{content}")
def register(self):
pass
async def heartbeat_report(self):
"""
提交心跳,预留接口
Returns:
"""
url = "https://api.liteyuki.icu/heartbeat"
data = {
"liteyuki_id": self.liteyuki_id,
"version": __VERSION__,
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as resp:
if resp.status == 200:
nonebot.logger.success("心跳成功送达。")
else:
nonebot.logger.error("休克:{}".format(await resp.text()))

View File

@ -3,7 +3,9 @@ import os
import shutil
import zipfile
from typing import Any
from pathlib import Path
# import aiofiles
import nonebot
import yaml
@ -12,8 +14,8 @@ from .language import Language, get_default_lang_code
from .ly_function import loaded_functions
_loaded_resource_packs: list["ResourceMetadata"] = [] # 按照加载顺序排序
temp_resource_root = "data/liteyuki/resources"
temp_extract_root = "data/liteyuki/temp"
temp_resource_root = Path("data/liteyuki/resources")
temp_extract_root = Path("data/liteyuki/temp")
lang = Language(get_default_lang_code())
@ -50,60 +52,139 @@ def load_resource_from_dir(path: str):
for root, dirs, files in os.walk(path):
for file in files:
relative_path = os.path.relpath(os.path.join(root, file), path)
copy_file(os.path.join(root, file), os.path.join(temp_resource_root, relative_path))
copy_file(
os.path.join(root, file),
os.path.join(temp_resource_root, relative_path),
)
metadata["path"] = path
metadata["folder"] = os.path.basename(path)
if os.path.exists(os.path.join(path, "lang")):
# 加载语言
from src.utils.base.language import load_from_dir
load_from_dir(os.path.join(path, "lang"))
if os.path.exists(os.path.join(path, "functions")):
# 加载功能
from src.utils.base.ly_function import load_from_dir
load_from_dir(os.path.join(path, "functions"))
if os.path.exists(os.path.join(path, "word_bank")):
# 加载词库
from src.utils.base.word_bank import load_from_dir
load_from_dir(os.path.join(path, "word_bank"))
_loaded_resource_packs.insert(0, ResourceMetadata(**metadata))
def get_path(path: str, abs_path: bool = True, default: Any = None, debug: bool = False) -> str | Any:
def get_path(
path: os.PathLike[str,] | Path | str,
abs_path: bool = True,
default: Any = None,
debug: bool = False,
) -> str | Any:
"""
获取资源包中的文件
获取资源包中的路径,且该路径必须存在
Args:
debug: 启用调试,每次都会先重载资源
path: 相对路径
abs_path: 是否返回绝对路径
default: 默认
path: 文件相对路径
Returns: 文件绝对路径
default: 默认解,当该路径不存在时使用
debug: 启用调试,每次都会先重载资源
Returns: 所需求之路径
"""
if debug:
nonebot.logger.debug("Enable resource debug, Reloading resources")
nonebot.logger.debug("由于已启用资源路径调试,正在重载资源")
load_resources()
resource_relative_path = os.path.join(temp_resource_root, path)
if os.path.exists(resource_relative_path):
return os.path.abspath(resource_relative_path) if abs_path else resource_relative_path
resource_relative_path = temp_resource_root / path
if resource_relative_path.exists():
return str(
resource_relative_path.resolve() if abs_path else resource_relative_path
)
else:
return default
def get_files(path: str, abs_path: bool = False) -> list[str]:
def get_resource_path(
path: os.PathLike[str,] | Path | str,
abs_path: bool = True,
only_exist: bool = False,
default: Any = None,
debug: bool = False,
) -> Path:
"""
获取资源包中一个文件夹的所有文件
获取资源包中的路径
Args:
abs_path:
path: 文件夹相对路径
Returns: 文件绝对路径
path: 相对路径
abs_path: 是否返回绝对路径
only_exist: 检查该路径是否存在
default: [当 `only_exist` 为 **真** 时启用]默认解,当该路径不存在时使用
debug: 启用调试,每次都会先重载资源
Returns: 所需求之路径
"""
resource_relative_path = os.path.join(temp_resource_root, path)
if os.path.exists(resource_relative_path):
return [os.path.abspath(os.path.join(resource_relative_path, file)) if abs_path else os.path.join(resource_relative_path, file) for file in
os.listdir(resource_relative_path)]
if debug:
nonebot.logger.debug("由于已启用资源路径调试,正在重载资源")
load_resources()
resource_relative_path = (
(temp_resource_root / path).resolve()
if abs_path
else (temp_resource_root / path)
)
if only_exist:
if resource_relative_path.exists():
return resource_relative_path
else:
return default
else:
return resource_relative_path
def get_files(
path: os.PathLike[str,] | Path | str, abs_path: bool = False
) -> list[str]:
"""
获取资源包中一个目录的所有内容
Args:
path: 该目录的相对路径
abs_path: 是否返回绝对路径
Returns: 目录内容路径所构成之列表
"""
resource_relative_path = temp_resource_root / path
if resource_relative_path.exists():
return [
(
str((resource_relative_path / file_).resolve())
if abs_path
else str((resource_relative_path / file_))
)
for file_ in os.listdir(resource_relative_path)
]
else:
return []
def get_resource_files(
path: os.PathLike[str,] | Path | str, abs_path: bool = False
) -> list[Path]:
"""
获取资源包中一个目录的所有内容
Args:
path: 该目录的相对路径
abs_path: 是否返回绝对路径
Returns: 目录内容路径所构成之列表
"""
resource_relative_path = temp_resource_root / path
if resource_relative_path.exists():
return [
(
(resource_relative_path / file_).resolve()
if abs_path
else (resource_relative_path / file_)
)
for file_ in os.listdir(resource_relative_path)
]
else:
return []
@ -150,7 +231,9 @@ def load_resources():
if not os.path.exists("resources/index.json"):
json.dump([], open("resources/index.json", "w", encoding="utf-8"))
resource_index: list[str] = json.load(open("resources/index.json", "r", encoding="utf-8"))
resource_index: list[str] = json.load(
open("resources/index.json", "r", encoding="utf-8")
)
resource_index.reverse() # 优先级高的后加载,但是排在前面
for resource in resource_index:
load_resource_from_dir(os.path.join("resources", resource))
@ -174,7 +257,9 @@ def check_exist(name: str) -> bool:
Returns: 是否存在
"""
path = os.path.join("resources", name)
return os.path.exists(os.path.join(path, "metadata.yml")) or (os.path.isfile(path) and name.endswith(".zip"))
return os.path.exists(os.path.join(path, "metadata.yml")) or (
os.path.isfile(path) and name.endswith(".zip")
)
def add_resource_pack(name: str) -> bool:
@ -185,17 +270,19 @@ def add_resource_pack(name: str) -> bool:
Returns:
"""
if check_exist(name):
old_index: list[str] = json.load(open("resources/index.json", "r", encoding="utf-8"))
old_index: list[str] = json.load(
open("resources/index.json", "r", encoding="utf-8")
)
if name not in old_index:
old_index.append(name)
json.dump(old_index, open("resources/index.json", "w", encoding="utf-8"))
load_resource_from_dir(os.path.join("resources", name))
return True
else:
nonebot.logger.warning(lang.get("liteyuki.resource_loaded", name=name))
nonebot.logger.warning("资源包 {} 已存在,无需添加".format(name))
return False
else:
nonebot.logger.warning(lang.get("liteyuki.resource_not_exist", name=name))
nonebot.logger.warning("资源包 {} 不存在,无法添加".format(name))
return False
@ -207,16 +294,18 @@ def remove_resource_pack(name: str) -> bool:
Returns:
"""
if check_exist(name):
old_index: list[str] = json.load(open("resources/index.json", "r", encoding="utf-8"))
old_index: list[str] = json.load(
open("resources/index.json", "r", encoding="utf-8")
)
if name in old_index:
old_index.remove(name)
json.dump(old_index, open("resources/index.json", "w", encoding="utf-8"))
return True
else:
nonebot.logger.warning(lang.get("liteyuki.resource_not_loaded", name=name))
nonebot.logger.warning("资源包 {} 不存在,无需移除".format(name))
return False
else:
nonebot.logger.warning(lang.get("liteyuki.resource_not_exist", name=name))
nonebot.logger.warning("资源包 {} 不存在,无法移除".format(name))
return False
@ -229,7 +318,9 @@ def change_priority(name: str, delta: int) -> bool:
Returns:
"""
# 正数表示前移,负数表示后移
old_resource_list: list[str] = json.load(open("resources/index.json", "r", encoding="utf-8"))
old_resource_list: list[str] = json.load(
open("resources/index.json", "r", encoding="utf-8")
)
new_resource_list = old_resource_list.copy()
if name in old_resource_list:
index = old_resource_list.index(name)
@ -237,13 +328,15 @@ def change_priority(name: str, delta: int) -> bool:
new_index = index + delta
new_resource_list.remove(name)
new_resource_list.insert(new_index, name)
json.dump(new_resource_list, open("resources/index.json", "w", encoding="utf-8"))
json.dump(
new_resource_list, open("resources/index.json", "w", encoding="utf-8")
)
return True
else:
nonebot.logger.warning("Priority change failed, out of range")
nonebot.logger.warning("无法更改优先级为 {} ,优先级超出范围".format(delta))
return False
else:
nonebot.logger.debug("Priority change failed, resource not loaded")
nonebot.logger.debug("资源包 {} 未加载,无法更改优先级".format(name))
return False

View File

@ -1,6 +1,6 @@
from nonebot.adapters import satori
from src.utils.base.ly_typing import T_MessageEvent
from nonebot.adapters import onebot
from src.utils.base.ly_typing import T_MessageEvent, T_GroupMessageEvent
def get_user_id(event: T_MessageEvent):
@ -10,11 +10,13 @@ def get_user_id(event: T_MessageEvent):
return event.user_id
def get_group_id(event: T_MessageEvent):
def get_group_id(event: T_GroupMessageEvent):
if isinstance(event, satori.event.Event):
return event.guild.id
else:
elif isinstance(event, onebot.v11.GroupMessageEvent):
return event.group_id
else:
return None
def get_message_type(event: T_MessageEvent) -> str:

View File

@ -1,140 +1,20 @@
import os.path
# import time
from os import getcwd
import os
import aiofiles
import nonebot
from nonebot_plugin_htmlrender import * # type: ignore
from nonebot import require
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import (
template_to_html,
template_to_pic,
md_to_pic,
init,
)
from .tools import random_hex_string
# import imgkit
# from typing import Any, Dict, Literal, Optional, Union
# import uuid
# import jinja2
# from pathlib import Path
# TEMPLATES_PATH = str(Path(__file__).parent / "templates")
# env = jinja2.Environment( # noqa: S701
# extensions=["jinja2.ext.loopcontrols"],
# loader=jinja2.FileSystemLoader(TEMPLATES_PATH),
# enable_async=True,
# )
# async def template_to_html(
# template_path: str,
# template_name: str,
# **kwargs,
# ) -> str:
# """使用jinja2模板引擎通过html生成图片
# Args:
# template_path (str): 模板路径
# template_name (str): 模板名
# **kwargs: 模板内容
# Returns:
# str: html
# """
# template_env = jinja2.Environment( # noqa: S701
# loader=jinja2.FileSystemLoader(template_path),
# enable_async=True,
# )
# template = template_env.get_template(template_name)
# return await template.render_async(**kwargs)
# async def template_to_pic(
# template_path: str,
# template_name: str,
# templates: Dict[Any, Any],
# pages: Optional[Dict[Any, Any]] = None,
# wait: int = 0,
# type: Literal["jpeg", "png"] = "png", # noqa: A002
# quality: Union[int, None] = None,
# device_scale_factor: float = 2,
# ) -> bytes:
# """使用jinja2模板引擎通过html生成图片
# Args:
# template_path (str): 模板路径
# template_name (str): 模板名
# templates (Dict[Any, Any]): 模板内参数 如: {"name": "abc"}
# pages (Optional[Dict[Any, Any]]): 网页参数 Defaults to
# {"base_url": f"file://{getcwd()}", "viewport": {"width": 500, "height": 10}}
# wait (int, optional): 网页载入等待时间. Defaults to 0.
# type (Literal["jpeg", "png"]): 图片类型, 默认 png
# quality (int, optional): 图片质量 0-100 当为`png`时无效
# device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
# Returns:
# bytes: 图片 可直接发送
# """
# if pages is None:
# pages = {
# "viewport": {"width": 500, "height": 10},
# "base_url": f"file://{getcwd()}", # noqa: PTH109
# }
# template_env = jinja2.Environment( # noqa: S701
# loader=jinja2.FileSystemLoader(template_path),
# enable_async=True,
# )
# template = template_env.get_template(template_name)
# open(
# filename := os.path.join(
# template_path,
# str(uuid.uuid4())+".html",
# ),
# "w",
# ).write(await template.render_async(**templates))
# print(pages,filename)
# img = imgkit.from_file(
# filename,
# output_path=False,
# options={
# "format": type,
# "quality": quality if (quality and type == "jpeg") else 94,
# "allow": pages["base_url"],
# # "viewport-size": "{} {}".format(pages["viewport"]["width"],pages["viewport"]["height"]),
# "zoom": device_scale_factor,
# # "load-error-handling": "ignore",
# "enable-local-file-access": None,
# "no-stop-slow-scripts": None,
# "transparent": None,
# },
# ) # type: ignore
# # os.remove(filename)
# return img
# return await html_to_pic(
# template_path=f"file://{template_path}",
# html=await template.render_async(**templates),
# wait=wait,
# type=type,
# quality=quality,
# device_scale_factor=device_scale_factor,
# **pages,
# )
async def html2image(
html: str,
wait: int = 0,
):
pass
async def template2html(
template: str,
templates: dict,
@ -174,8 +54,8 @@ async def template2image(
if pages is None:
pages = {
"viewport": {"width": 1080, "height": 10},
"base_url": f"file://{getcwd()}",
}
template_path = os.path.dirname(template)
template_name = os.path.basename(template)
@ -197,36 +77,9 @@ async def template2image(
template_name=template_name,
template_path=template_path,
templates=templates,
pages=pages,
wait=wait,
###
pages=pages,
device_scale_factor=scale_factor,
###
)
# async def url2image(
# url: str,
# wait: int = 0,
# scale_factor: float = 1,
# type: str = "png",
# quality: int = 100,
# **kwargs
# ) -> bytes:
# """
# Args:
# quality:
# type:
# url: str: URL
# wait: int: 等待时间
# scale_factor: float: 缩放因子
# **kwargs: page 参数
# Returns:
# 图片二进制数据
# """
# async with get_new_page(scale_factor) as page:
# await page.goto(url)
# await page.wait_for_timeout(wait)
# return await page.screenshot(
# full_page=True,
# type=type,
# quality=quality
# )

View File

@ -1,33 +1,20 @@
import base64
import io
from typing import Any
from urllib.parse import quote
import aiofiles
from PIL import Image
import aiohttp
import nonebot
from nonebot import require
from nonebot.adapters import satori
from PIL import Image
from nonebot.adapters.onebot import v11
from typing import Any, Type
from nonebot.internal.adapter import MessageSegment
from nonebot.internal.adapter.message import TM
from .html_tool import md_to_pic
from .. import load_from_yaml
from ..base.ly_typing import T_Bot, T_Message, T_MessageEvent
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import md_to_pic
config = load_from_yaml("config.yml")
can_send_markdown = {} # 用于存储机器人是否支持发送markdown消息id->bool
class TencentBannedMarkdownError(BaseException):
pass
async def broadcast_to_superusers(message: str | T_Message, markdown: bool = False):
"""广播消息给超级用户"""
@ -49,9 +36,6 @@ class MarkdownMessage:
*,
message_type: str = None,
session_id: str | int = None,
event: T_MessageEvent = None,
retry_as_image: bool = True,
**kwargs,
) -> dict[str, Any] | None:
"""
发送Markdown消息支持自动转为图片发送
@ -60,86 +44,20 @@ class MarkdownMessage:
bot:
message_type:
session_id:
event:
retry_as_image: 发送失败后是否尝试以图片形式发送否则失败返回None
**kwargs:
Returns:
"""
formatted_md = v11.unescape(markdown).replace("\n", r"\n").replace('"', r"\\\"")
if event is not None and message_type is None:
if isinstance(event, satori.event.Event):
message_type = "private" if event.guild is None else "group"
group_id = event.guild.id if event.guild is not None else None
else:
assert event is not None
message_type = event.message_type
group_id = event.group_id if message_type == "group" else None
user_id = (
event.user.id
if isinstance(event, satori.event.Event)
else event.user_id
)
session_id = user_id if message_type == "private" else group_id
# try:
# raise TencentBannedMarkdownError("Tencent banned markdown")
# forward_id = await bot.call_api(
# "send_private_forward_msg",
# messages=[
# {
# "type": "node",
# "data": {
# "content": [
# {
# "data": {
# "content": "{\"content\":\"%s\"}" % formatted_md,
# },
# "type": "markdown"
# }
# ],
# "name": "[]",
# "uin": bot.self_id
# }
# }
# ],
# user_id=bot.self_id
# )
# data = await bot.send_msg(
# user_id=session_id,
# group_id=session_id,
# message_type=message_type,
# message=[
# {
# "type": "longmsg",
# "data": {
# "id": forward_id
# }
# },
# ],
# **kwargs
# )
# except BaseException as e:
nonebot.logger.error(f"因未能发送Markdown消息已转为图片发送。")
# 发送失败,渲染为图片发送
# if not retry_as_image:
# return None
# plain_markdown = markdown.replace("[🔗", "[")
md_image_bytes = await md_to_pic(md=markdown, width=540, device_scale_factor=4)
if isinstance(bot, satori.Bot):
msg_seg = satori.MessageSegment.image(raw=md_image_bytes, mime="image/png")
data = await bot.send(event=event, message=msg_seg)
else:
data = await bot.send_msg(
message_type=message_type,
group_id=session_id,
user_id=session_id,
message=v11.MessageSegment.image(md_image_bytes),
)
plain_markdown = markdown.replace("[🔗", "[")
md_image_bytes = await md_to_pic(
md=plain_markdown, width=540, device_scale_factor=4
)
print(md_image_bytes)
data = await bot.send_msg(
message_type=message_type,
group_id=session_id,
user_id=session_id,
message=v11.MessageSegment.image(md_image_bytes),
)
return data
@staticmethod
@ -157,38 +75,25 @@ class MarkdownMessage:
Args:
image: 图片字节流或图片本地路径链接请使用Markdown.image_async方法获取后通过send_md发送
bot: bot instance
message_type: message type
message_type: message message_type
session_id: session id
event: event
kwargs: other arguments
Returns:
dict: response data
"""
if isinstance(image, str):
async with aiofiles.open(image, "rb") as f:
image = await f.read()
method = 2
# 1.轻雪图床方案
# if method == 1:
# image_url = await liteyuki_api.upload_image(image)
# image_size = Image.open(io.BytesIO(image)).size
# image_md = Markdown.image(image_url, image_size)
# data = await Markdown.send_md(image_md, bot, message_type=message_type, session_id=session_id, event=event,
# retry_as_image=False,
# **kwargs)
# Lagrange.OneBot方案
if method == 2:
base64_string = base64.b64encode(image).decode("utf-8")
data = await bot.call_api("upload_image", file=f"base64://{base64_string}")
await MarkdownMessage.send_md(
MarkdownMessage.image(data, Image.open(io.BytesIO(image)).size),
bot,
event=event,
message_type=message_type,
session_id=session_id,
**kwargs,
)
# 其他实现端方案
@ -204,12 +109,7 @@ class MarkdownMessage:
image_size = Image.open(io.BytesIO(image)).size
image_md = MarkdownMessage.image(image_url, image_size)
return await MarkdownMessage.send_md(
image_md,
bot,
message_type=message_type,
session_id=session_id,
event=event,
**kwargs,
image_md, bot, message_type=message_type, session_id=session_id
)
if data is None:
@ -293,7 +193,7 @@ class MarkdownMessage:
image = Image.open(io.BytesIO(await resp.read()))
return MarkdownMessage.image(url, image.size)
except Exception as e:
nonebot.logger.error(f"get image error: {e}")
nonebot.logger.error(f"获取图片错误:{e}")
return "[Image Error]"
@staticmethod