first commit
This commit is contained in:
7
src/utils/adapter.py
Normal file
7
src/utils/adapter.py
Normal file
@ -0,0 +1,7 @@
|
||||
from nonebot.adapters.onebot import v11, v12
|
||||
|
||||
|
||||
Bot = v11.Bot | v12.Bot
|
||||
GroupMessageEvent = v11.GroupMessageEvent | v12.GroupMessageEvent
|
||||
PrivateMessageEvent = v11.PrivateMessageEvent | v12.PrivateMessageEvent
|
||||
MessageEvent = v11.MessageEvent | v12.MessageEvent
|
6
src/utils/config.py
Normal file
6
src/utils/config.py
Normal file
@ -0,0 +1,6 @@
|
||||
from yaml import load, FullLoader
|
||||
|
||||
|
||||
def load_from_yaml(file: str) -> dict:
|
||||
with open(file, 'r', encoding='utf-8') as f:
|
||||
return load(f, Loader=FullLoader)
|
308
src/utils/data.py
Normal file
308
src/utils/data.py
Normal file
@ -0,0 +1,308 @@
|
||||
import json
|
||||
import sqlite3
|
||||
import types
|
||||
from abc import ABC
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
from pydantic import BaseModel
|
||||
|
||||
BaseIterable = list | tuple | set | dict
|
||||
|
||||
|
||||
class LiteModel(BaseModel):
|
||||
"""轻量级模型基类
|
||||
类型注解统一使用Python3.9的PEP585标准,如需使用泛型请使用typing模块的泛型类型
|
||||
"""
|
||||
id: Any = None
|
||||
|
||||
|
||||
class BaseORMAdapter(ABC):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def auto_migrate(self, *args, **kwargs):
|
||||
"""自动迁移
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""存储数据
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def first(self, *args, **kwargs):
|
||||
"""查询第一条数据
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def all(self, *args, **kwargs):
|
||||
"""查询所有数据
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
"""删除数据
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
"""更新数据
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class SqliteORMDatabase(BaseORMAdapter):
|
||||
"""SQLiteORM适配器,严禁使用`FORIEGNID`和`JSON`作为主键前缀,严禁使用`$ID:`作为字符串值前缀
|
||||
|
||||
Attributes:
|
||||
|
||||
"""
|
||||
type_map = {
|
||||
# default: TEXT
|
||||
str : 'TEXT',
|
||||
int : 'INTEGER',
|
||||
float: 'REAL',
|
||||
bool : 'INTEGER',
|
||||
list : 'TEXT'
|
||||
}
|
||||
FOREIGNID = 'FOREIGNID'
|
||||
JSON = 'JSON'
|
||||
ID = '$ID'
|
||||
|
||||
def __init__(self, db_name: str):
|
||||
super().__init__()
|
||||
self.conn = sqlite3.connect(db_name)
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
self.cursor = self.conn.cursor()
|
||||
|
||||
def auto_migrate(self, *args: type(LiteModel)):
|
||||
"""自动迁移,检测新模型字段和原有表字段的差异,如有差异自动增删新字段
|
||||
|
||||
Args:
|
||||
*args: 模型类
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
for model in args:
|
||||
model: type(LiteModel)
|
||||
# 检测并创建表,若模型未定义id字段则使用自增主键,有定义的话使用id字段,且id有可能为字符串
|
||||
table_name = model.__name__
|
||||
if 'id' in model.__annotations__ and model.__annotations__['id'] is not None:
|
||||
# 如果模型定义了id字段,那么使用模型的id字段
|
||||
id_type = self.type_map.get(model.__annotations__['id'], 'TEXT')
|
||||
self.cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id {id_type} PRIMARY KEY)')
|
||||
else:
|
||||
# 如果模型未定义id字段,那么使用自增主键
|
||||
self.cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)')
|
||||
# 获取表字段
|
||||
self.cursor.execute(f'PRAGMA table_info({table_name})')
|
||||
table_fields = self.cursor.fetchall()
|
||||
table_fields = [field[1] for field in table_fields]
|
||||
|
||||
raw_fields = model.__annotations__.keys()
|
||||
# 获取模型字段,若有模型则添加FOREIGNID前缀,若为BaseIterable则添加JSON前缀,用多行if判断
|
||||
model_fields = []
|
||||
model_types = []
|
||||
for field in raw_fields:
|
||||
if isinstance(model.__annotations__[field], type(LiteModel)):
|
||||
model_fields.append(f'{self.FOREIGNID}{field}')
|
||||
model_types.append('TEXT')
|
||||
elif isinstance(model.__annotations__[field], types.GenericAlias):
|
||||
model_fields.append(f'{self.JSON}{field}')
|
||||
model_types.append('TEXT')
|
||||
else:
|
||||
model_fields.append(field)
|
||||
model_types.append(self.type_map.get(model.__annotations__[field], 'TEXT'))
|
||||
|
||||
# 检测新字段
|
||||
for field, type_ in zip(model_fields, model_types):
|
||||
if field not in table_fields:
|
||||
print(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
|
||||
self.cursor.execute(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
|
||||
|
||||
# 检测多余字段,除了id字段
|
||||
for field in table_fields:
|
||||
if field not in model_fields and field != 'id':
|
||||
self.cursor.execute(f'ALTER TABLE {table_name} DROP COLUMN {field}')
|
||||
|
||||
self.conn.commit()
|
||||
|
||||
def save(self, *models: LiteModel) -> int | tuple:
|
||||
"""存储数据,检查id字段,如果有id字段则更新,没有则插入
|
||||
|
||||
Args:
|
||||
models: 数据
|
||||
|
||||
Returns:
|
||||
id: 数据id,如果有多个数据则返回id元组
|
||||
"""
|
||||
ids = []
|
||||
for model in models:
|
||||
table_name = model.__class__.__name__
|
||||
key_list = []
|
||||
value_list = []
|
||||
# 处理外键,添加前缀'$IDFieldName'
|
||||
for field, value in model.__dict__.items():
|
||||
if isinstance(value, LiteModel):
|
||||
key_list.append(f'{self.FOREIGNID}{field}')
|
||||
value_list.append(f'{self.ID}:{value.__class__.__name__}:{self.save(value)}')
|
||||
elif isinstance(value, BaseIterable):
|
||||
key_list.append(f'{self.JSON}{field}')
|
||||
value_list.append(self._flat(value))
|
||||
else:
|
||||
key_list.append(field)
|
||||
value_list.append(value)
|
||||
# 更新或插入数据,用?占位
|
||||
self.cursor.execute(f'INSERT OR REPLACE INTO {table_name} ({",".join(key_list)}) VALUES ({",".join(["?" for _ in key_list])})', value_list)
|
||||
|
||||
ids.append(self.cursor.lastrowid)
|
||||
self.conn.commit()
|
||||
return ids[0] if len(ids) == 1 else tuple(ids)
|
||||
|
||||
def _flat(self, data: Iterable) -> str:
|
||||
"""扁平化数据,返回扁平化对象
|
||||
|
||||
Args:
|
||||
data: 数据,可迭代对象
|
||||
|
||||
Returns: json字符串
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
return_data = {}
|
||||
for k, v in data.items():
|
||||
if isinstance(v, LiteModel):
|
||||
return_data[f'{self.FOREIGNID}{k}'] = f'{self.ID}:{v.__class__.__name__}:{self.save(v)}'
|
||||
elif isinstance(v, BaseIterable):
|
||||
return_data[f'{self.JSON}{k}'] = self._flat(v)
|
||||
else:
|
||||
return_data[k] = v
|
||||
|
||||
elif isinstance(data, list | tuple | set):
|
||||
return_data = []
|
||||
for v in data:
|
||||
if isinstance(v, LiteModel):
|
||||
return_data.append(f'{self.ID}:{v.__class__.__name__}:{self.save(v)}')
|
||||
elif isinstance(v, BaseIterable):
|
||||
return_data.append(self._flat(v))
|
||||
else:
|
||||
return_data.append(v)
|
||||
else:
|
||||
raise ValueError('数据类型错误')
|
||||
|
||||
return json.dumps(return_data)
|
||||
|
||||
def first(self, model: type(LiteModel), conditions, *args, default: Any = None) -> LiteModel | None:
|
||||
"""查询第一条数据
|
||||
|
||||
Args:
|
||||
model: 模型
|
||||
conditions: 查询条件
|
||||
*args: 参数化查询条件参数
|
||||
default: 未查询到结果默认返回值
|
||||
|
||||
Returns: 数据
|
||||
"""
|
||||
table_name = model.__name__
|
||||
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
|
||||
data = dict(self.cursor.fetchone())
|
||||
return model(**self.convert_to_dict(data)) if data else default
|
||||
|
||||
def all(self, model: type(LiteModel), conditions, *args, default: Any = None) -> list[LiteModel] | None:
|
||||
"""查询所有数据
|
||||
|
||||
Args:
|
||||
model: 模型
|
||||
conditions: 查询条件
|
||||
*args: 参数化查询条件参数
|
||||
default: 未查询到结果默认返回值
|
||||
|
||||
Returns: 数据
|
||||
"""
|
||||
table_name = model.__name__
|
||||
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
|
||||
data = self.cursor.fetchall()
|
||||
return [model(**self.convert_to_dict(d)) for d in data] if data else default
|
||||
|
||||
def delete(self, model: type(LiteModel), conditions, *args):
|
||||
"""删除数据
|
||||
|
||||
Args:
|
||||
model: 模型
|
||||
conditions: 查询条件
|
||||
*args: 参数化查询条件参数
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
table_name = model.__name__
|
||||
self.cursor.execute(f'DELETE FROM {table_name} WHERE {conditions}', args)
|
||||
self.conn.commit()
|
||||
|
||||
def update(self, model: type(LiteModel), conditions: str, *args, operation: str):
|
||||
"""更新数据
|
||||
|
||||
Args:
|
||||
model: 模型
|
||||
conditions: 查询条件
|
||||
*args: 参数化查询条件参数
|
||||
operation: 更新操作
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
table_name = model.__name__
|
||||
self.cursor.execute(f'UPDATE {table_name} SET {operation} WHERE {conditions}', args)
|
||||
self.conn.commit()
|
||||
|
||||
def convert_to_dict(self, data: dict) -> dict:
|
||||
"""将json字符串转换为字典
|
||||
|
||||
Args:
|
||||
data: json字符串
|
||||
|
||||
Returns: 字典
|
||||
"""
|
||||
|
||||
def load(d: BaseIterable) -> BaseIterable:
|
||||
"""递归加载数据,去除前缀"""
|
||||
if isinstance(d, dict):
|
||||
new_d = {}
|
||||
for k, v in d.items():
|
||||
if k.startswith(self.FOREIGNID):
|
||||
new_d[k.replace(self.FOREIGNID, '')] = load(
|
||||
dict(self.cursor.execute(f'SELECT * FROM {v.split(":")[1]} WHERE id = ?', (v.split(":")[2],)).fetchone()))
|
||||
elif k.startswith(self.JSON):
|
||||
new_d[k.replace(self.JSON, '')] = load(json.loads(v))
|
||||
else:
|
||||
new_d[k] = v
|
||||
elif isinstance(d, list | tuple | set):
|
||||
new_d = []
|
||||
for i, v in enumerate(d):
|
||||
if isinstance(v, str) and v.startswith(self.ID):
|
||||
new_d.append(load(dict(self.cursor.execute(f'SELECT * FROM {v.split(":")[1]} WHERE id = ?', (v.split(":")[2],)).fetchone())))
|
||||
elif isinstance(v, BaseIterable):
|
||||
new_d.append(load(v))
|
||||
else:
|
||||
new_d = d
|
||||
return new_d
|
||||
|
||||
return load(data)
|
16
src/utils/data_manager.py
Normal file
16
src/utils/data_manager.py
Normal file
@ -0,0 +1,16 @@
|
||||
import os
|
||||
|
||||
from src.utils.data import LiteModel, SqliteORMDatabase as DB
|
||||
|
||||
DATA_PATH = "data/liteyuki"
|
||||
|
||||
user_db = DB(os.path.join(DATA_PATH, 'users.ldb'))
|
||||
|
||||
|
||||
class UserModel(LiteModel):
|
||||
id: str
|
||||
username: str
|
||||
lang: str
|
||||
|
||||
|
||||
user_db.auto_migrate(UserModel)
|
111
src/utils/language.py
Normal file
111
src/utils/language.py
Normal file
@ -0,0 +1,111 @@
|
||||
"""
|
||||
语言模块,添加对多语言的支持
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import nonebot
|
||||
|
||||
from src.utils.data_manager import UserModel, user_db
|
||||
|
||||
_language_data = {
|
||||
"en": {
|
||||
"name": "English",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def load_from_lang(file_path: str, lang_code: str = None):
|
||||
"""
|
||||
从lang文件中加载语言数据,用于简单的文本键值对
|
||||
|
||||
Args:
|
||||
file_path: lang文件路径
|
||||
lang_code: 语言代码,如果为None则从文件名中获取
|
||||
"""
|
||||
try:
|
||||
if lang_code is None:
|
||||
lang_code = os.path.basename(file_path).split('.')[0]
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
data = {}
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
if not line or line.startswith('#'): # 空行或注释
|
||||
continue
|
||||
key, value = line.split('=', 1)
|
||||
data[key.strip()] = value.strip()
|
||||
if lang_code not in _language_data:
|
||||
_language_data[lang_code] = {}
|
||||
_language_data[lang_code].update(data)
|
||||
except Exception as e:
|
||||
nonebot.logger.error(f"Failed to load language data from {file_path}: {e}")
|
||||
|
||||
|
||||
def load_from_json(file_path: str, lang_code: str = None):
|
||||
"""
|
||||
从json文件中加载语言数据,可以定义一些变量
|
||||
|
||||
Args:
|
||||
lang_code: 语言代码,如果为None则从文件名中获取
|
||||
file_path: json文件路径
|
||||
"""
|
||||
try:
|
||||
if lang_code is None:
|
||||
lang_code = os.path.basename(file_path).split('.')[0]
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
data = json.load(file)
|
||||
if lang_code not in _language_data:
|
||||
_language_data[lang_code] = {}
|
||||
_language_data[lang_code].update(data)
|
||||
except Exception as e:
|
||||
nonebot.logger.error(f"Failed to load language data from {file_path}: {e}")
|
||||
|
||||
|
||||
def load_from_dict(data: dict, lang_code: str):
|
||||
"""
|
||||
从字典中加载语言数据
|
||||
|
||||
Args:
|
||||
lang_code: 语言代码
|
||||
data: 字典数据
|
||||
"""
|
||||
if lang_code not in _language_data:
|
||||
_language_data[lang_code] = {}
|
||||
_language_data[lang_code].update(data)
|
||||
|
||||
|
||||
class Language:
|
||||
def __init__(self, lang_code: str = "en", fallback_lang_code: str = "en"):
|
||||
self.lang_code = lang_code
|
||||
self.fallback_lang_code = fallback_lang_code
|
||||
|
||||
def get(self, item: str, *args) -> str | Any:
|
||||
"""
|
||||
获取当前语言文本
|
||||
Args:
|
||||
item: 文本键
|
||||
*args: 格式化参数
|
||||
|
||||
Returns:
|
||||
str: 当前语言的文本
|
||||
|
||||
"""
|
||||
try:
|
||||
if self.lang_code in _language_data and item in _language_data[self.lang_code]:
|
||||
return _language_data[self.lang_code][item].format(*args)
|
||||
if self.fallback_lang_code in _language_data and item in _language_data[self.fallback_lang_code]:
|
||||
return _language_data[self.fallback_lang_code][item].format(*args)
|
||||
return item
|
||||
except Exception as e:
|
||||
nonebot.logger.error(f"Failed to get language text or format: {e}")
|
||||
return item
|
||||
|
||||
|
||||
def get_user_lang(user_id: str) -> Language:
|
||||
"""
|
||||
获取用户的语言代码
|
||||
"""
|
||||
user = user_db.first(UserModel, "id = ?", user_id, default=UserModel(id=user_id, username="Unknown", lang="en"))
|
||||
return Language(user.lang)
|
Reference in New Issue
Block a user