feat: 添加插件启用和停用功能
This commit is contained in:
@ -87,7 +87,7 @@ class Database(BaseORMAdapter):
|
||||
list : 'TEXT'
|
||||
}
|
||||
|
||||
DEFAULT_TYPE = {
|
||||
DEFAULT_VALUE = {
|
||||
'TEXT' : '',
|
||||
'INTEGER': 0,
|
||||
'REAL' : 0.0
|
||||
@ -95,6 +95,8 @@ class Database(BaseORMAdapter):
|
||||
|
||||
FOREIGNID = 'FOREIGNID'
|
||||
JSON = 'JSON'
|
||||
LIST = 'LIST'
|
||||
DICT = 'DICT'
|
||||
ID = '$ID'
|
||||
|
||||
def __init__(self, db_name: str):
|
||||
@ -131,28 +133,34 @@ class Database(BaseORMAdapter):
|
||||
table_fields = self.cursor.fetchall()
|
||||
table_fields = [field[1] for field in table_fields]
|
||||
|
||||
raw_fields = model.__annotations__.keys()
|
||||
raw_fields, raw_types = zip(*model.__annotations__.items())
|
||||
# 获取模型字段,若有模型则添加FOREIGNID前缀,若为BaseIterable则添加JSON前缀,用多行if判断
|
||||
model_fields = []
|
||||
model_types = []
|
||||
for field in raw_fields:
|
||||
if isinstance(model.__annotations__[field], type(LiteModel)):
|
||||
for field, r_type in zip(raw_fields, raw_types):
|
||||
if isinstance(r_type, type(LiteModel)):
|
||||
model_fields.append(f'{self.FOREIGNID}{field}')
|
||||
model_types.append('TEXT')
|
||||
elif isinstance(model.__annotations__[field], types.GenericAlias):
|
||||
elif isinstance(r_type, list):
|
||||
model_fields.append(f'{self.LIST}{field}')
|
||||
model_types.append('TEXT')
|
||||
elif isinstance(r_type, dict):
|
||||
model_fields.append(f'{self.DICT}{field}')
|
||||
model_types.append('TEXT')
|
||||
elif isinstance(r_type, types.GenericAlias):
|
||||
model_fields.append(f'{self.JSON}{field}')
|
||||
model_types.append('TEXT')
|
||||
else:
|
||||
model_fields.append(field)
|
||||
model_types.append(self.type_map.get(model.__annotations__[field], 'TEXT'))
|
||||
model_types.append(self.type_map.get(r_type, 'TEXT'))
|
||||
|
||||
# 检测新字段
|
||||
for field, type_ in zip(model_fields, model_types):
|
||||
# 检测新字段或字段类型是否有变化,有则增删字段,已经加了前缀类型
|
||||
for field, type_, r_type in zip(model_fields, model_types, raw_types):
|
||||
if field not in table_fields:
|
||||
nonebot.logger.debug(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
|
||||
self.cursor.execute(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
|
||||
# 在原有的行中添加新字段对应类型的默认值,从DEFAULT_TYPE中获取
|
||||
self.cursor.execute(f'UPDATE {table_name} SET {field} = ? WHERE {field} IS NULL', (self.DEFAULT_TYPE.get(type_, ""),))
|
||||
self.cursor.execute(f'UPDATE {table_name} SET {field} = ? WHERE {field} IS NULL', (self.DEFAULT_VALUE.get(type_, ""),))
|
||||
|
||||
# 检测多余字段,除了id字段
|
||||
for field in table_fields:
|
||||
@ -161,7 +169,7 @@ class Database(BaseORMAdapter):
|
||||
self.cursor.execute(f'ALTER TABLE {table_name} DROP COLUMN {field}')
|
||||
|
||||
self.conn.commit()
|
||||
nonebot.logger.success(f'Table {table_name} migrated successfully')
|
||||
nonebot.logger.debug(f'Table {table_name} migrated successfully')
|
||||
|
||||
def save(self, *models: LiteModel) -> int | tuple:
|
||||
"""存储数据,检查id字段,如果有id字段则更新,没有则插入
|
||||
@ -326,7 +334,14 @@ class Database(BaseORMAdapter):
|
||||
new_d[k.replace(self.FOREIGNID, '')] = load(
|
||||
dict(self.cursor.execute(f'SELECT * FROM {v.split(":", 2)[1]} WHERE id = ?', (v.split(":", 2)[2],)).fetchone()))
|
||||
elif k.startswith(self.JSON):
|
||||
if v == '': v = '[]'
|
||||
new_d[k.replace(self.JSON, '')] = load(json.loads(v))
|
||||
elif k.startswith(self.LIST):
|
||||
if v == '': v = '[]'
|
||||
new_d[k.replace(self.LIST, '')] = load(json.loads(v))
|
||||
elif k.startswith(self.DICT):
|
||||
if v == '': v = '{}'
|
||||
new_d[k.replace(self.DICT, '')] = load(json.loads(v))
|
||||
else:
|
||||
new_d[k] = v
|
||||
elif isinstance(d, list | tuple | set):
|
||||
|
@ -5,6 +5,7 @@ from src.utils.data import LiteModel, Database as DB
|
||||
DATA_PATH = "data/liteyuki"
|
||||
|
||||
user_db = DB(os.path.join(DATA_PATH, 'users.ldb'))
|
||||
group_db = DB(os.path.join(DATA_PATH, 'groups.ldb'))
|
||||
plugin_db = DB(os.path.join(DATA_PATH, 'plugins.ldb'))
|
||||
|
||||
|
||||
@ -12,6 +13,16 @@ class User(LiteModel):
|
||||
user_id: str
|
||||
username: str = ""
|
||||
lang: str = "en"
|
||||
enabled_plugins: list[str] = []
|
||||
disabled_plugins: list[str] = []
|
||||
|
||||
|
||||
class GroupChat(LiteModel):
|
||||
# Group是一个关键字,所以这里用GroupChat
|
||||
group_id: str
|
||||
group_name: str = ""
|
||||
enabled_plugins: list[str] = []
|
||||
disabled_plugins: list[str] = []
|
||||
|
||||
|
||||
class InstalledPlugin(LiteModel):
|
||||
@ -20,4 +31,5 @@ class InstalledPlugin(LiteModel):
|
||||
|
||||
def auto_migrate():
|
||||
user_db.auto_migrate(User)
|
||||
group_db.auto_migrate(GroupChat)
|
||||
plugin_db.auto_migrate(InstalledPlugin)
|
||||
|
@ -76,7 +76,7 @@ async def send_markdown(markdown: str, bot: T_Bot, *, message_type: str = None,
|
||||
class Markdown:
|
||||
@staticmethod
|
||||
def button(name: str, cmd: str, reply: bool = False, enter: bool = True) -> str:
|
||||
"""生成点击按钮
|
||||
"""生成点击回调按钮
|
||||
Args:
|
||||
name: 按钮显示内容
|
||||
cmd: 发送的命令,已在函数内url编码,不需要再次编码
|
||||
@ -91,7 +91,7 @@ class Markdown:
|
||||
|
||||
@staticmethod
|
||||
def link(name: str, url: str) -> str:
|
||||
"""生成链接
|
||||
"""生成点击链接按钮
|
||||
Args:
|
||||
name: 链接显示内容
|
||||
url: 链接地址
|
||||
@ -101,3 +101,18 @@ class Markdown:
|
||||
|
||||
"""
|
||||
return f"[🔗{name}]({url})"
|
||||
|
||||
@staticmethod
|
||||
def escape(text: str) -> str:
|
||||
"""转义特殊字符
|
||||
Args:
|
||||
text: 需要转义的文本,请勿直接把整个markdown文本传入,否则会转义掉所有字符
|
||||
|
||||
Returns:
|
||||
转义后的文本
|
||||
|
||||
"""
|
||||
chars = "*[]()~_-`>#+-=|{}.!"
|
||||
for char in chars:
|
||||
text = text.replace(char, f"\\\\{char}")
|
||||
return text
|
||||
|
Reference in New Issue
Block a user