📝 Docs: 数据库最佳实践 (#2545)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Ju4tCode <42488585+yanyongyu@users.noreply.github.com>
This commit is contained in:
Bryan不可思议
2024-05-01 16:16:02 +08:00
committed by GitHub
parent d469c6f287
commit dfd2096fe5
7 changed files with 1076 additions and 0 deletions

View File

@ -0,0 +1,378 @@
# 开发者指南
开发者指南内容较多,故分为了一个示例以及数个专题。
阅读(并且最好跟随实践)示例后,你将会对使用 `nonebot-plugin-orm` 开发插件有一个基本的认识。
如果想要更深入地学习关于 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [Alembic](https://alembic.sqlalchemy.org/) 的知识,或者在使用过程中遇到了问题,可以查阅专题以及其官方文档。
## 示例
### 模型定义
首先,我们需要设计存储的数据的结构。
例如天气插件,需要存储**什么地方 (`location`)** 的**天气是什么 (`weather`)**。
其中,一个地方只会有一种天气,而不同地方可能有相同的天气。
所以,我们可以设计出如下的模型:
```python title=weather/__init__.py showLineNumbers
from nonebot_plugin_orm import Model
from sqlalchemy.orm import Mapped, mapped_column
class Weather(Model):
location: Mapped[str] = mapped_column(primary_key=True)
weather: Mapped[str]
```
其中,`primary_key=True` 意味着此列 (`location`) 是主键,即内容是唯一的且非空的。
每一个模型必须有至少一个主键。
我们可以用以下代码检查模型生成的数据库模式是否正确:
```python
from sqlalchemy.schema import CreateTable
print(CreateTable(Weather.__table__))
```
```sql
CREATE TABLE weather_weather (
location VARCHAR NOT NULL,
weather VARCHAR NOT NULL,
CONSTRAINT pk_weather_weather PRIMARY KEY (location)
)
```
可以注意到表名是 `weather_weather` 而不是 `Weather` 或者 `weather`。
这是因为 `nonebot-plugin-orm` 会自动为模型生成一个表名,规则是:`<插件模块名>_<类名小写>`。
你也可以通过指定 `__tablename__` 属性来自定义表名:
```python {2}
class Weather(Model):
__tablename__ = "weather"
...
```
```sql {1}
CREATE TABLE weather (
...
)
```
但是,并不推荐你这么做,因为这可能会导致不同插件间的表名重复,引发冲突。
特别是当你会发布插件时,你并不知道其他插件会不会使用相同的表名。
### 首次迁移
我们成功定义了模型,现在启动机器人试试吧:
```shell
$ nb run
01-02 15:04:05 [SUCCESS] nonebot | NoneBot is initializing...
01-02 15:04:05 [ERROR] nonebot_plugin_orm | 启动检查失败
01-02 15:04:05 [ERROR] nonebot | Application startup failed. Exiting.
Traceback (most recent call last):
...
click.exceptions.UsageError: 检测到新的升级操作:
[('add_table',
Table('weather', MetaData(), Column('location', String(), table=<weather>, primary_key=True, nullable=False), Column('weather', String(), table=<weather>, nullable=False), schema=None))]
```
咦,发生了什么?
`nonebot-plugin-orm` 试图阻止我们启动机器人。
原来是我们定义了模型,但是数据库中并没有对应的表,这会导致插件不能正常运行。
所以,我们需要迁移数据库。
首先,我们需要创建一个迁移脚本:
```shell
nb orm revision -m "first revision" --branch-label weather
```
其中,`-m` 参数是迁移脚本的描述,`--branch-label` 参数是迁移脚本的分支,一般为插件模块名。
执行命令过后,出现了一个 `weather/migrations` 目录,其中有一个 `xxxxxxxxxxxx_first_revision.py` 文件:
```shell {4,5}
weather
├── __init__.py
├── config.py
└── migrations
└── xxxxxxxxxxxx_first_revision.py
```
这就是我们创建的迁移脚本,它记录了数据库模式的变化。
我们可以查看一下它的内容:
```python title=weather/migrations/xxxxxxxxxxxx_first_revision.py {25-33,39-41} showLineNumbers
"""first revision
迁移 ID: xxxxxxxxxxxx
父迁移:
创建时间: 2006-01-02 15:04:05.999999
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "xxxxxxxxxxxx"
down_revision: str | Sequence[str] | None = None
branch_labels: str | Sequence[str] | None = ("weather",)
depends_on: str | Sequence[str] | None = None
def upgrade(name: str = "") -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"weather_weather",
sa.Column("location", sa.String(), nullable=False),
sa.Column("weather", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")),
info={"bind_key": "weather"},
)
# ### end Alembic commands ###
def downgrade(name: str = "") -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("weather_weather")
# ### end Alembic commands ###
```
可以注意到脚本的主体部分(其余是模版代码,请勿修改)是:
```python
# ### commands auto generated by Alembic - please adjust! ###
op.create_table( # CREATE TABLE
"weather_weather", # weather_weather
sa.Column("location", sa.String(), nullable=False), # location VARCHAR NOT NULL,
sa.Column("weather", sa.String(), nullable=False), # weather VARCHAR NOT NULL,
sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")), # CONSTRAINT pk_weather_weather PRIMARY KEY (location)
info={"bind_key": "weather"},
)
# ### end Alembic commands ###
```
```python
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("weather_weather") # DROP TABLE weather_weather;
# ### end Alembic commands ###
```
虽然我们不是很懂这些代码的意思,但是可以注意到它们几乎与 SQL 语句 (DDL) 一一对应。
显然,它们是用来创建和删除表的。
我们还可以注意到,`upgrade()` 和 `downgrade()` 函数中的代码是**互逆**的。
也就是说,执行一次 `upgrade()` 函数,再执行一次 `downgrade()` 函数后,数据库的模式就会回到原来的状态。
这就是迁移脚本的作用:记录数据库模式的变化,以便我们在不同的环境中(例如开发环境和生产环境)**可复现地**、**可逆地**同步数据库模式,正如 git 对我们的代码做的事情那样。
对了,不要忘记还有一段注释:`commands auto generated by Alembic - please adjust!`。
它在提醒我们,这些代码是由 Alembic 自动生成的,我们应该检查它们,并且根据需要进行调整。
:::caution 注意
迁移脚本冗长且繁琐,我们一般不会手写它们,而是由 Alembic 自动生成。
一般情况下Alembic 足够智能,可以正确地生成迁移脚本。
但是,在复杂或有歧义的情况下,我们可能需要手动调整迁移脚本。
所以,**永远要检查迁移脚本,并且在开发环境中测试!**
**迁移脚本中任何一处错误都足以使数据付之东流!**
:::
确定迁移脚本正确后,我们就可以执行迁移脚本,将数据库模式同步到数据库中:
```shell
nb orm upgrade
```
现在,我们可以正常启动机器人了。
开发过程中,我们可能会频繁地修改模型,这意味着我们需要频繁地创建并执行迁移脚本,非常繁琐。
实际上,此时我们不在乎数据安全,只需要数据库模式与模型定义一致即可。
所以,我们可以关闭 `nonebot-plugin-orm` 的启动检查:
```shell title=.env.dev
ALEMBIC_STARTUP_CHECK=false
```
现在,每次启动机器人时,数据库模式会自动与模型定义同步,无需手动迁移。
### 会话管理
我们已经成功定义了模型,并且迁移了数据库,现在可以开始使用数据库了……吗?
并不能,因为模型只是数据结构的定义,并不能通过它操作数据(如果你曾经使用过 [Tortoise ORM](https://tortoise.github.io/),可能会知道 `await Weather.get(location="上海")` 这样的面向对象编程。
但是 SQLAlchemy 不同,选择了命令式编程)。
我们需要使用**会话**操作数据:
```python title=weather/__init__.py {10,13} showLineNumbers
from nonebot import on_command
from nonebot.adapters import Message
from nonebot.params import CommandArg
from nonebot_plugin_orm import async_scoped_session
weather = on_command("天气")
@weather.handle()
async def _(session: async_scoped_session, args: Message = CommandArg()):
location = args.extract_plain_text()
if wea := await session.get(Weather, location):
await weather.finish(f"今天{location}的天气是{wea.weather}")
await weather.finish(f"未查询到{location}的天气")
```
我们通过 `session: async_scoped_session` 依赖注入获得了一个会话,然后使用 `await session.get(Weather, location)` 查询数据库。
`async_scoped_session` 是一个有作用域限制的会话,作用域为当前事件、当前事件响应器。
会话产生的模型实例(例如此处的 `wea := await session.get(Weather, location)`)作用域与会话相同。
:::caution 注意
此处提到的“会话”指的是 ORM 会话,而非 [NoneBot 会话](../../../appendices/session-control)两者的生命周期也是不同的NoneBot 会话的生命周期中可能包含多个事件,不同的事件也会有不同的事件响应器)。
具体而言,就是不要将 ORM 会话和模型实例存储在 NoneBot 会话状态中:
```python {12}
from nonebot.params import ArgPlainText
from nonebot.typing import T_State
@weather.got("location", prompt="请输入地名")
async def _(state: T_State, session: async_scoped_session, location: str = ArgPlainText()):
wea = await session.get(Weather, location)
if not wea:
await weather.finish(f"未查询到{location}的天气")
state["weather"] = wea # 不要这么做,除非你知道自己在做什么
```
当然非要这么做也不是不可以:
```python {6}
@weather.handle()
async def _(state: T_State, session: async_scoped_session):
# 通过 await session.merge(state["weather"]) 获得了此 ORM 会话中的相应模型实例,
# 而非直接使用会话状态中的模型实例,
# 因为先前的 ORM 会话已经关闭了。
wea = await session.merge(state["weather"])
await weather.finish(f"今天{state['location']}的天气是{wea.weather}")
```
:::
当有数据更改时,我们需要提交事务,也要注意会话作用域问题:
```python title=weather/__init__.py {12,20} showLineNumbers
from nonebot.params import Depends
async def get_weather(
session: async_scoped_session, args: Message = CommandArg()
) -> Weather:
location = args.extract_plain_text()
if not (wea := await session.get(Weather, location)):
wea = Weather(location=location, weather="未知")
session.add(wea)
# await session.commit() # 不应该在其他地方提交事务
return wea
@weather.handle()
async def _(session: async_scoped_session, wea: Weather = Depends(get_weather)):
await weather.send(f"今天的天气是{wea.weather}")
await session.commit() # 而应该在事件响应器结束前提交事务
```
当然我们也可以获得一个新的会话,不过此时就要手动管理会话了:
```python title=weather/__init__.py {5-6} showLineNumbers
from nonebot_plugin_orm import get_session
async def get_weather(location: str) -> str:
session = get_session()
async with session.begin():
wea = await session.get(Weather, location)
if not wea:
wea = Weather(location=location, weather="未知")
session.add(wea)
return wea.weather
@weather.handle()
async def _(args: Message = CommandArg()):
wea = await get_weather(args.extract_plain_text())
await weather.send(f"今天的天气是{wea}")
```
### 依赖注入
在上面的示例中,我们都是通过会话获得数据的。
不过,我们也可以通过依赖注入获得数据:
```python title=weather/__init__.py {12-14} showLineNumbers
from sqlalchemy import select
from nonebot.params import Depends
from nonebot_plugin_orm import SQLDepends
def extract_arg_plain_text(args: Message = CommandArg()) -> str:
return args.extract_plain_text()
@weather.handle()
async def _(
wea: Weather = SQLDepends(
select(Weather).where(Weather.location == Depends(extract_arg_plain_text))
),
):
await weather.send(f"今天的天气是{wea.weather}")
```
其中,`SQLDepends` 是一个特殊的依赖注入,它会根据类型标注和 SQL 语句提供数据SQL 语句中也可以有子依赖。
不同的类型标注也会获得不同形式的数据:
```python title=weather/__init__.py {5} showLineNumbers
from collections.abc import Sequence
@weather.handle()
async def _(
weas: Sequence[Weather] = SQLDepends(
select(Weather).where(Weather.weather == Depends(extract_arg_plain_text))
),
):
await weather.send(f"今天的天气是{weas[0].weather}的城市有{''.join(wea.location for wea in weas)}")
```
支持的类型标注请参见 [依赖注入](dependency)。
我们也可以像 [类作为依赖](../../../advanced/dependency#类作为依赖) 那样,在类属性中声明子依赖:
```python title=weather/__init__.py {5-6,10} showLineNumbers
from collections.abc import Sequence
class Weather(Model):
location: Mapped[str] = mapped_column(primary_key=True)
weather: Mapped[str] = Depends(extract_arg_plain_text)
# weather: Annotated[Mapped[str], Depends(extract_arg_plain_text)] # Annotated 支持
@weather.handle()
async def _(weas: Sequence[Weather]):
await weather.send(
f"今天的天气是{weas[0].weather}的城市有{''.join(wea.location for wea in weas)}"
)
```

View File

@ -0,0 +1,4 @@
{
"label": "开发者指南",
"position": 3
}

View File

@ -0,0 +1,240 @@
---
sidebar_position: 3
description: 依赖注入
---
# 依赖注入
`nonebot-plugin-orm` 提供了强大且灵活的依赖注入,可以方便地帮助你获取数据库会话和查询数据。
## 数据库会话
### AsyncSession
新数据库会话,常用于有独立的数据库操作逻辑的插件。
```python {13,26}
from nonebot import on_message
from nonebot.params import Depends
from nonebot_plugin_orm import AsyncSession, Model, async_scoped_session
from sqlalchemy.orm import Mapped, mapped_column
message = on_message()
class Message(Model):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
async def get_message(session: AsyncSession) -> Message:
# 等价于 session = get_session()
async with session:
msg = Message()
session.add(msg)
await session.commit()
await session.refresh(msg)
return msg
@message.handle()
async def _(session: async_scoped_session, msg: Message = Depends(get_message)):
await session.rollback() # 无法回退 get_message() 中的更改
await message.send(str(msg.id)) # msg 被存储msg.id 递增
```
### async_scoped_session
数据库作用域会话,常用于事件响应器和有与响应逻辑相关的数据库操作逻辑的插件。
```python {1326}
from nonebot import on_message
from nonebot.params import Depends
from nonebot_plugin_orm import Model, async_scoped_session
from sqlalchemy.orm import Mapped, mapped_column
message = on_message()
class Message(Model):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
async def get_message(session: async_scoped_session) -> Message:
# 等价于 session = get_scoped_session()
msg = Message()
session.add(msg)
await session.flush()
await session.refresh(msg)
return msg
@message.handle()
async def _(session: async_scoped_session, msg: Message = Depends(get_message)):
await session.rollback() # 可以回退 get_message() 中的更改
await message.send(str(msg.id)) # msg 没有被存储msg.id 不变
```
## 查询数据
### Model
支持类作为依赖。
```python
from typing import Annotated
from nonebot.params import Depends
from nonebot_plugin_orm import Model
from sqlalchemy.orm import Mapped, mapped_column
def get_id() -> int: ...
class Message(Model):
id: Annotated[Mapped[int], Depends(get_id)] = mapped_column(
primary_key=True, autoincrement=True
)
async def _(msg: Message):
# 等价于 msg = (
# await (await session.stream(select(Message).where(Message.id == get_id())))
# .scalars()
# .one_or_none()
# )
...
```
### SQLDepends
参数为一个 SQL 语句决定依赖注入的内容SQL 语句中可以使用子依赖。
```python {11-13}
from nonebot.params import Depends
from nonebot_plugin_orm import Model, SQLDepends
from sqlalchemy import select
def get_id() -> int: ...
async def _(
model: Model = SQLDepends(select(Model).where(Model.id == Depends(get_id))),
): ...
```
参数可以是任意 SQL 语句,但不建议使用 `select` 以外的语句,因为语句可能没有返回值(`returning` 除外),而且代码不清晰。
### 类型标注
类型标注决定依赖注入的数据结构,主要影响以下几个层面:
- 迭代器(`session.execute()`)或异步迭代器(`session.stream()`
- 标量(`session.execute().scalars()`)或元组(`session.execute()`
- 一个(`session.execute().one_or_none()`,注意 `None` 时可能触发 [重载](../../../appendices/overload#重载))或全部(`session.execute()` / `session.execute().all()`
- 连续(`session().execute()`)或分块(`session.execute().partitions()`
具体如下(可以使用父类型作为类型标注):
- ```python
async def _(rows_partitions: AsyncIterator[Sequence[Tuple[Model, ...]]]):
# 等价于 rows_partitions = await (await session.stream(sql).partitions())
async for partition in rows_partitions:
for row in partition:
print(row[0], row[1], ...)
```
- ```python
async def _(model_partitions: AsyncIterator[Sequence[Model]]):
# 等价于 model_partitions = await (await session.stream(sql).scalars().partitions())
async for partition in model_partitions:
for model in partition:
print(model)
```
- ```python
async def _(row_partitions: Iterator[Sequence[Tuple[Model, ...]]]):
# 等价于 row_partitions = await session.execute(sql).partitions()
for partition in rows_partitions:
for row in partition:
print(row[0], row[1], ...)
```
- ```python
async def _(model_partitions: Iterator[Sequence[Model]]):
# 等价于 model_partitions = await (await session.execute(sql).scalars().partitions())
for partition in model_partitions:
for model in partition:
print(model)
```
- ```python
async def _(rows: sa_async.AsyncResult[Tuple[Model, ...]]):
# 等价于 rows = await session.stream(sql)
async for row in rows:
print(row[0], row[1], ...)
```
- ```python
async def _(models: sa_async.AsyncScalarResult[Model]):
# 等价于 models = await session.stream(sql).scalars()
async for model in models:
print(model)
```
- ```python
async def _(rows: sa.Result[Tuple[Model, ...]]):
# 等价于 rows = await session.execute(sql)
for row in rows:
print(row[0], row[1], ...)
```
- ```python
async def _(models: sa.ScalarResult[Model]):
# 等价于 models = await session.execute(sql).scalars()
for model in models:
print(model)
```
- ```python
async def _(rows: Sequence[Tuple[Model, ...]]):
# 等价于 rows = await (await session.stream(sql).all())
for row in rows:
print(row[0], row[1], ...)
```
- ```python
async def _(models: Sequence[Model]):
# 等价于 models = await (await session.stream(sql).scalars().all())
for model in models:
print(model)
```
- ```python
async def _(row: Tuple[Model, ...]):
# 等价于 row = await (await session.stream(sql).one_or_none())
print(row[0], row[1], ...)
```
- ```python
async def _(model: Model):
# 等价于 model = await (await session.stream(sql).scalars().one_or_none())
print(model)
```

View File

@ -0,0 +1,147 @@
---
sidebar_position: 2
description: 测试
---
# 测试
百思不如一试,测试是发现问题的最佳方式。
不同的用户会有不同的配置,为了提高项目的兼容性,我们需要在不同数据库后端上测试。
手动进行大量的、重复的测试不可靠,也不现实,因此我们推荐使用 [GitHub Actions](https://github.com/features/actions) 进行自动化测试:
```yaml title=.github/workflows/test.yml {12-42,52-53} showLineNumbers
name: Test
on:
push:
branches:
- main
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
db:
- sqlite+aiosqlite:///db.sqlite3
- postgresql+psycopg://postgres:postgres@localhost:5432/postgres
- mysql+aiomysql://mysql:mysql@localhost:3306/mymysql
fail-fast: false
env:
SQLALCHEMY_DATABASE_URL: ${{ matrix.db }}
services:
postgresql:
image: ${{ startsWith(matrix.db, 'postgresql') && 'postgres' || '' }}
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
ports:
- 5432:5432
mysql:
image: ${{ startsWith(matrix.db, 'mysql') && 'mysql' || '' }}
env:
MYSQL_ROOT_PASSWORD: mysql
MYSQL_USER: mysql
MYSQL_PASSWORD: mysql
MYSQL_DATABASE: mymysql
ports:
- 3306:3306
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run migrations
run: pipx run nb-cli orm upgrade
- name: Run tests
run: pytest
```
如果项目还需要考虑跨平台和跨 Python 版本兼容,测试矩阵中还需要增加这两个维度。
但是,我们没必要在所有平台和 Python 版本上运行所有数据库的测试因为很显然PostgreSQL 和 MySQL 这类独立的数据库后端不会受平台和 Python 影响,而且 Github Actions 的非 Linux 平台不支持运行独立服务:
| | Python 3.9 | Python 3.10 | Python 3.11 | Python 3.12 |
| ----------- | ---------- | ----------- | ----------- | --------------------------- |
| **Linux** | SQLite | SQLite | SQLite | SQLite / PostgreSQL / MySQL |
| **Windows** | SQLite | SQLite | SQLite | SQLite |
| **macOS** | SQLite | SQLite | SQLite | SQLite |
```yaml title=.github/workflows/test.yml {12-24} showLineNumbers
name: Test
on:
push:
branches:
- main
jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
db: ["sqlite+aiosqlite:///db.sqlite3"]
include:
- os: ubuntu-latest
python-version: "3.12"
db: postgresql+psycopg://postgres:postgres@localhost:5432/postgres
- os: ubuntu-latest
python-version: "3.12"
db: mysql+aiomysql://mysql:mysql@localhost:3306/mymysql
fail-fast: false
env:
SQLALCHEMY_DATABASE_URL: ${{ matrix.db }}
services:
postgresql:
image: ${{ startsWith(matrix.db, 'postgresql') && 'postgres' || '' }}
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
ports:
- 5432:5432
mysql:
image: ${{ startsWith(matrix.db, 'mysql') && 'mysql' || '' }}
env:
MYSQL_ROOT_PASSWORD: mysql
MYSQL_USER: mysql
MYSQL_PASSWORD: mysql
MYSQL_DATABASE: mymysql
ports:
- 3306:3306
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run migrations
run: pipx run nb-cli orm upgrade
- name: Run tests
run: pytest
```