feat: 添加了对markdown的简单封装
This commit is contained in:
@ -4,10 +4,10 @@ import sqlite3
|
||||
import types
|
||||
from abc import ABC
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
import nonebot
|
||||
from pydantic import BaseModel
|
||||
from typing_extensions import Any
|
||||
|
||||
BaseIterable = list | tuple | set | dict
|
||||
|
||||
@ -16,7 +16,7 @@ class LiteModel(BaseModel):
|
||||
"""轻量级模型基类
|
||||
类型注解统一使用Python3.9的PEP585标准,如需使用泛型请使用typing模块的泛型类型
|
||||
"""
|
||||
id: Any = None
|
||||
id: int = None
|
||||
|
||||
|
||||
class BaseORMAdapter(ABC):
|
||||
@ -72,7 +72,7 @@ class BaseORMAdapter(ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class SqliteORMDatabase(BaseORMAdapter):
|
||||
class Database(BaseORMAdapter):
|
||||
"""SQLiteORM适配器,严禁使用`FORIEGNID`和`JSON`作为主键前缀,严禁使用`$ID:`作为字符串值前缀
|
||||
|
||||
Attributes:
|
||||
@ -86,6 +86,14 @@ class SqliteORMDatabase(BaseORMAdapter):
|
||||
bool : 'INTEGER',
|
||||
list : 'TEXT'
|
||||
}
|
||||
|
||||
DEFAULT_TYPE = {
|
||||
'TEXT': '',
|
||||
'INTEGER': 0,
|
||||
'REAL': 0.0
|
||||
}
|
||||
|
||||
|
||||
FOREIGNID = 'FOREIGNID'
|
||||
JSON = 'JSON'
|
||||
ID = '$ID'
|
||||
@ -143,6 +151,8 @@ class SqliteORMDatabase(BaseORMAdapter):
|
||||
if field not in table_fields:
|
||||
nonebot.logger.debug(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
|
||||
self.cursor.execute(f'ALTER TABLE {table_name} ADD COLUMN {field} {type_}')
|
||||
# 在原有的行中添加新字段对应类型的默认值,从DEFAULT_TYPE中获取
|
||||
self.cursor.execute(f'UPDATE {table_name} SET {field} = ? WHERE {field} IS NULL', (self.DEFAULT_TYPE.get(type_, ""),))
|
||||
|
||||
# 检测多余字段,除了id字段
|
||||
for field in table_fields:
|
||||
@ -230,11 +240,12 @@ class SqliteORMDatabase(BaseORMAdapter):
|
||||
"""
|
||||
table_name = model.__name__
|
||||
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
|
||||
if data := self.cursor.fetchone():
|
||||
if row_data := self.cursor.fetchone():
|
||||
data = dict(row_data)
|
||||
return model(**self.convert_to_dict(data))
|
||||
return default
|
||||
|
||||
def all(self, model: type(LiteModel), conditions, *args, default: Any = None) -> list[LiteModel] | None:
|
||||
def all(self, model: type(LiteModel), conditions=None, *args, default: Any = None) -> list[LiteModel] | None:
|
||||
"""查询所有数据
|
||||
|
||||
Args:
|
||||
@ -246,9 +257,17 @@ class SqliteORMDatabase(BaseORMAdapter):
|
||||
Returns: 数据
|
||||
"""
|
||||
table_name = model.__name__
|
||||
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
|
||||
data = self.cursor.fetchall()
|
||||
return [model(**self.convert_to_dict(d)) for d in data] if data else default
|
||||
|
||||
# 检测表是否存在,否则返回None
|
||||
|
||||
if conditions:
|
||||
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
|
||||
else:
|
||||
self.cursor.execute(f'SELECT * FROM {table_name}')
|
||||
if row_datas := self.cursor.fetchall():
|
||||
datas = [dict(row_data) for row_data in row_datas]
|
||||
return [model(**self.convert_to_dict(d)) for d in datas] if datas else default
|
||||
return default
|
||||
|
||||
def delete(self, model: type(LiteModel), conditions, *args):
|
||||
"""删除数据
|
||||
@ -299,7 +318,7 @@ class SqliteORMDatabase(BaseORMAdapter):
|
||||
for k, v in d.items():
|
||||
if k.startswith(self.FOREIGNID):
|
||||
new_d[k.replace(self.FOREIGNID, '')] = load(
|
||||
dict(self.cursor.execute(f'SELECT * FROM {v.split(":")[1]} WHERE id = ?', (v.split(":")[2],)).fetchone()))
|
||||
dict(self.cursor.execute(f'SELECT * FROM {v.split(":",2)[1]} WHERE id = ?', (v.split(":",2)[2],)).fetchone()))
|
||||
elif k.startswith(self.JSON):
|
||||
new_d[k.replace(self.JSON, '')] = load(json.loads(v))
|
||||
else:
|
||||
@ -308,7 +327,7 @@ class SqliteORMDatabase(BaseORMAdapter):
|
||||
new_d = []
|
||||
for i, v in enumerate(d):
|
||||
if isinstance(v, str) and v.startswith(self.ID):
|
||||
new_d.append(load(dict(self.cursor.execute(f'SELECT * FROM {v.split(":")[1]} WHERE id = ?', (v.split(":")[2],)).fetchone())))
|
||||
new_d.append(load(dict(self.cursor.execute(f'SELECT * FROM {v.split(":",2)[1]} WHERE id = ?', (v.split(":",2)[2],)).fetchone())))
|
||||
elif isinstance(v, BaseIterable):
|
||||
new_d.append(load(v))
|
||||
else:
|
||||
|
@ -1,6 +1,6 @@
|
||||
import os
|
||||
|
||||
from src.utils.data import LiteModel, SqliteORMDatabase as DB
|
||||
from src.utils.data import LiteModel, Database as DB
|
||||
|
||||
DATA_PATH = "data/liteyuki"
|
||||
|
||||
|
@ -6,7 +6,7 @@ from .tools import de_escape
|
||||
from .typing import T_Bot
|
||||
|
||||
|
||||
async def send_markdown(markdown: str, bot: T_Bot, message_type: str, session_id: str | int) -> dict[str, Any]:
|
||||
async def send_markdown(markdown: str, bot: T_Bot, message_type: str, session_id: str | int) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
formatted_md = de_escape(markdown).replace("\n", r"\n").replace("\"", r'\\\"')
|
||||
try:
|
||||
forward_data = await bot.call_api(
|
||||
@ -67,4 +67,4 @@ async def send_markdown(markdown: str, bot: T_Bot, message_type: str, session_id
|
||||
else:
|
||||
nonebot.logger.error("send_markdown: bot type not supported")
|
||||
data = {}
|
||||
return data
|
||||
return data, forward_data
|
||||
|
Reference in New Issue
Block a user