1
0
forked from bot/app

feat: 添加了网页监控面板

This commit is contained in:
2024-03-19 00:27:40 +08:00
parent 51cb1a87b8
commit 3adc265876
23 changed files with 316 additions and 42 deletions

View File

@ -5,3 +5,5 @@ Bot = v11.Bot | v12.Bot
GroupMessageEvent = v11.GroupMessageEvent | v12.GroupMessageEvent
PrivateMessageEvent = v11.PrivateMessageEvent | v12.PrivateMessageEvent
MessageEvent = v11.MessageEvent | v12.MessageEvent
Message = v11.Message | v12.Message

View File

@ -1,6 +1,10 @@
from yaml import load, FullLoader
config = None
def load_from_yaml(file: str) -> dict:
global config
with open(file, 'r', encoding='utf-8') as f:
return load(f, Loader=FullLoader)
config = load(f, Loader=FullLoader)
return config

View File

@ -1,9 +1,12 @@
import json
import os
import sqlite3
import types
from abc import ABC
from collections.abc import Iterable
from typing import Any
import nonebot
from pydantic import BaseModel
BaseIterable = list | tuple | set | dict
@ -89,6 +92,8 @@ class SqliteORMDatabase(BaseORMAdapter):
def __init__(self, db_name: str):
super().__init__()
if not os.path.exists(os.path.dirname(db_name)):
os.makedirs(os.path.dirname(db_name))
self.conn = sqlite3.connect(db_name)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
@ -136,12 +141,13 @@ class SqliteORMDatabase(BaseORMAdapter):
# 检测新字段
for field, type_ in zip(model_fields, model_types):
if field not in table_fields:
print(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
nonebot.logger.debug(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
self.cursor.execute(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
# 检测多余字段除了id字段
for field in table_fields:
if field not in model_fields and field != 'id':
nonebot.logger.debug(f'ALTER TABLE {table_name} DROP COLUMN {field}')
self.cursor.execute(f'ALTER TABLE {table_name} DROP COLUMN {field}')
self.conn.commit()
@ -172,6 +178,7 @@ class SqliteORMDatabase(BaseORMAdapter):
key_list.append(field)
value_list.append(value)
# 更新或插入数据,用?占位
nonebot.logger.debug(f'INSERT OR REPLACE INTO {table_name} ({",".join(key_list)}) VALUES ({",".join(["?" for _ in key_list])})')
self.cursor.execute(f'INSERT OR REPLACE INTO {table_name} ({",".join(key_list)}) VALUES ({",".join(["?" for _ in key_list])})', value_list)
ids.append(self.cursor.lastrowid)
@ -223,8 +230,9 @@ class SqliteORMDatabase(BaseORMAdapter):
"""
table_name = model.__name__
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
data = dict(self.cursor.fetchone())
return model(**self.convert_to_dict(data)) if data else default
if data := self.cursor.fetchone():
return model(**self.convert_to_dict(data))
return default
def all(self, model: type(LiteModel), conditions, *args, default: Any = None) -> list[LiteModel] | None:
"""查询所有数据
@ -254,6 +262,7 @@ class SqliteORMDatabase(BaseORMAdapter):
"""
table_name = model.__name__
nonebot.logger.debug(f'DELETE FROM {table_name} WHERE {conditions}')
self.cursor.execute(f'DELETE FROM {table_name} WHERE {conditions}', args)
self.conn.commit()
@ -270,6 +279,7 @@ class SqliteORMDatabase(BaseORMAdapter):
"""
table_name = model.__name__
nonebot.logger.debug(f'UPDATE {table_name} SET {operation} WHERE {conditions}')
self.cursor.execute(f'UPDATE {table_name} SET {operation} WHERE {conditions}', args)
self.conn.commit()

View File

@ -7,10 +7,10 @@ DATA_PATH = "data/liteyuki"
user_db = DB(os.path.join(DATA_PATH, 'users.ldb'))
class UserModel(LiteModel):
id: str
username: str
lang: str
class User(LiteModel):
user_id: str
username: str = ""
lang: str = "en"
user_db.auto_migrate(UserModel)
user_db.auto_migrate(User)

View File

@ -3,13 +3,15 @@
"""
import json
import locale
import os
from typing import Any
import nonebot
from src.utils.data_manager import UserModel, user_db
from src.utils.data_manager import User, user_db
_default_lang_code = "en"
_language_data = {
"en": {
"name": "English",
@ -35,6 +37,7 @@ def load_from_lang(file_path: str, lang_code: str = None):
if not line or line.startswith('#'): # 空行或注释
continue
key, value = line.split('=', 1)
nonebot.logger.debug(f"Loaded language text: {key.strip()} -> {value.strip()}")
data[key.strip()] = value.strip()
if lang_code not in _language_data:
_language_data[lang_code] = {}
@ -59,10 +62,31 @@ def load_from_json(file_path: str, lang_code: str = None):
if lang_code not in _language_data:
_language_data[lang_code] = {}
_language_data[lang_code].update(data)
nonebot.logger.debug(f"Loaded language data from {file_path}")
except Exception as e:
nonebot.logger.error(f"Failed to load language data from {file_path}: {e}")
def load_from_dir(dir_path: str):
"""
从目录中加载语言数据
Args:
dir_path: 目录路径
"""
for file in os.listdir(dir_path):
try:
file_path = os.path.join(dir_path, file)
if os.path.isfile(file_path):
if file.endswith('.lang'):
load_from_lang(file_path)
elif file.endswith('.json'):
load_from_json(file_path)
except Exception as e:
nonebot.logger.error(f"Failed to load language data from {file}: {e}")
continue
def load_from_dict(data: dict, lang_code: str):
"""
从字典中加载语言数据
@ -77,11 +101,13 @@ def load_from_dict(data: dict, lang_code: str):
class Language:
def __init__(self, lang_code: str = "en", fallback_lang_code: str = "en"):
def __init__(self, lang_code: str = None, fallback_lang_code: str = "en"):
if lang_code is None:
lang_code = get_system_lang_code()
self.lang_code = lang_code
self.fallback_lang_code = fallback_lang_code
def get(self, item: str, *args) -> str | Any:
def get(self, item: str, *args, **kwargs) -> str | Any:
"""
获取当前语言文本
Args:
@ -94,9 +120,9 @@ class Language:
"""
try:
if self.lang_code in _language_data and item in _language_data[self.lang_code]:
return _language_data[self.lang_code][item].format(*args)
return _language_data[self.lang_code][item].format(*args, **kwargs)
if self.fallback_lang_code in _language_data and item in _language_data[self.fallback_lang_code]:
return _language_data[self.fallback_lang_code][item].format(*args)
return _language_data[self.fallback_lang_code][item].format(*args, **kwargs)
return item
except Exception as e:
nonebot.logger.error(f"Failed to get language text or format: {e}")
@ -107,5 +133,19 @@ def get_user_lang(user_id: str) -> Language:
"""
获取用户的语言代码
"""
user = user_db.first(UserModel, "id = ?", user_id, default=UserModel(id=user_id, username="Unknown", lang="en"))
user = user_db.first(User, "user_id = ?", user_id, default=User(user_id=user_id, username="Unknown", lang="en"))
return Language(user.lang)
def get_system_lang_code() -> str:
"""
获取系统语言代码
"""
return locale.getdefaultlocale()[0].replace('_', '-')
def get_system_lang() -> Language:
"""
获取系统语言
"""
return Language(get_system_lang_code())

43
src/utils/resource.py Normal file
View File

@ -0,0 +1,43 @@
import os
import nonebot
import yaml
from src.utils.data import LiteModel
_resource_data = {}
_loaded_resource_packs = [] # 按照加载顺序排序
class ResourceMetadata(LiteModel):
name: str = "Unknown"
version: str = "0.0.1"
description: str = "Unknown"
path: str
def load_resource_from_dir(path: str):
"""
把资源包按照文件相对路径加载到资源包中,后加载的优先级更高,顺便加载语言
Args:
path: 资源文件夹
Returns:
"""
for root, dirs, files in os.walk(path):
for file in files:
relative_path = os.path.relpath(os.path.join(root, file), path).replace("\\", "/")
abs_path = os.path.join(root, file).replace("\\", "/")
_resource_data[relative_path] = abs_path
nonebot.logger.debug(f"Loaded {relative_path} -> {abs_path}")
if os.path.exists(os.path.join(path, "metadata.yml")):
with open(os.path.join(path, "metadata.yml"), "r", encoding="utf-8") as f:
metadata = yaml.safe_load(f)
else:
metadata = ResourceMetadata()
metadata["path"] = path
if os.path.exists(os.path.join(path, "lang")):
from src.utils.language import load_from_dir
load_from_dir(os.path.join(path, "lang"))
_loaded_resource_packs.append(ResourceMetadata(**metadata))