Feature: 兼容 Pydantic v2 (#2544)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Ju4tCode
2024-01-26 11:12:57 +08:00
committed by GitHub
parent 82e4ccb227
commit bbd13c04cc
36 changed files with 6535 additions and 414 deletions

View File

@ -13,10 +13,11 @@ FrontMatter:
import os
import abc
import json
from pathlib import Path
from datetime import timedelta
from ipaddress import IPv4Address
from typing_extensions import TypeAlias, get_origin
from typing_extensions import TypeAlias, get_args, get_origin
from typing import (
TYPE_CHECKING,
Any,
@ -27,17 +28,25 @@ from typing import (
Tuple,
Union,
Mapping,
ClassVar,
Optional,
)
from dotenv import dotenv_values
from pydantic.typing import is_union
from pydantic.utils import deep_update
from pydantic.fields import Undefined, ModelField, UndefinedType
from pydantic import Extra, Field, BaseModel, BaseConfig, JsonWrapper, IPvAnyAddress
from pydantic import Field, BaseModel
from pydantic.networks import IPvAnyAddress
from nonebot.log import logger
from nonebot.typing import origin_is_union
from nonebot.utils import deep_update, type_is_complex, lenient_issubclass
from nonebot.compat import (
PYDANTIC_V2,
ConfigDict,
ModelField,
PydanticUndefined,
PydanticUndefinedType,
model_config,
model_fields,
)
DOTENV_TYPE: TypeAlias = Union[
Path, str, List[Union[Path, str]], Tuple[Union[Path, str], ...]
@ -55,8 +64,8 @@ class BaseSettingsSource(abc.ABC):
self.settings_cls = settings_cls
@property
def config(self) -> Type["SettingsConfig"]:
return self.settings_cls.__config__
def config(self) -> "SettingsConfig":
return model_config(self.settings_cls)
@abc.abstractmethod
def __call__(self) -> Dict[str, Any]:
@ -90,40 +99,34 @@ class DotEnvSettingsSource(BaseSettingsSource):
) -> None:
super().__init__(settings_cls)
self.env_file = (
env_file if env_file is not ENV_FILE_SENTINEL else self.config.env_file
env_file
if env_file is not ENV_FILE_SENTINEL
else self.config.get("env_file", (".env",))
)
self.env_file_encoding = (
env_file_encoding
if env_file_encoding is not None
else self.config.env_file_encoding
else self.config.get("env_file_encoding", "utf-8")
)
self.case_sensitive = (
case_sensitive if case_sensitive is not None else self.config.case_sensitive
case_sensitive
if case_sensitive is not None
else self.config.get("case_sensitive", False)
)
self.env_nested_delimiter = (
env_nested_delimiter
if env_nested_delimiter is not None
else self.config.env_nested_delimiter
else self.config.get("env_nested_delimiter", None)
)
def _apply_case_sensitive(self, var_name: str) -> str:
return var_name if self.case_sensitive else var_name.lower()
def _field_is_complex(self, field: ModelField) -> Tuple[bool, bool]:
try:
if isinstance(field.annotation, type) and issubclass(
field.annotation, JsonWrapper
):
return True, False
except TypeError:
pass
if field.is_complex():
if type_is_complex(field.annotation):
return True, False
elif (
is_union(get_origin(field.type_))
and field.sub_fields
and any(f.is_complex() for f in field.sub_fields)
elif origin_is_union(get_origin(field.annotation)) and any(
type_is_complex(arg) for arg in get_args(field.annotation)
):
return True, True
return False, False
@ -157,16 +160,12 @@ class DotEnvSettingsSource(BaseSettingsSource):
def _next_field(
self, field: Optional[ModelField], key: str
) -> Optional[ModelField]:
if not field or is_union(get_origin(field.annotation)):
if not field or origin_is_union(get_origin(field.annotation)):
return None
elif (
field.annotation
and isinstance(
(fields := getattr(field.annotation, "__fields__", None)), dict
)
and (field := fields.get(key))
):
return field
elif field.annotation and lenient_issubclass(field.annotation, BaseModel):
for field in model_fields(field.annotation):
if field.name == key:
return field
return None
def _explode_env_vars(
@ -200,7 +199,7 @@ class DotEnvSettingsSource(BaseSettingsSource):
is_complex, allow_parse_failure = self._field_is_complex(target_field)
if is_complex:
try:
env_val = self.settings_cls.__config__.json_loads(env_val)
env_val = json.loads(env_val)
except ValueError as e:
if not allow_parse_failure:
raise SettingsError(
@ -220,30 +219,31 @@ class DotEnvSettingsSource(BaseSettingsSource):
env_file_vars = self._read_env_files()
env_vars = {**env_file_vars, **env_vars}
for field in self.settings_cls.__fields__.values():
field_key = field.name
env_name = self._apply_case_sensitive(field_key)
for field in model_fields(self.settings_cls):
field_name = field.name
env_name = self._apply_case_sensitive(field_name)
# try get values from env vars
env_val = env_vars.get(env_name, Undefined)
env_val = env_vars.get(env_name, PydanticUndefined)
# delete from file vars when used
if env_name in env_file_vars:
del env_file_vars[env_name]
is_complex, allow_parse_failure = self._field_is_complex(field)
if is_complex:
if isinstance(env_val, UndefinedType):
if isinstance(env_val, PydanticUndefinedType):
# field is complex but no value found so far, try explode_env_vars
if env_val_built := self._explode_env_vars(
field, env_vars, env_file_vars
):
d[field_key] = env_val_built
d[field_name] = env_val_built
elif env_val is None:
d[field_key] = env_val
d[field_name] = env_val
else:
# field is complex and there's a value
# decode that as JSON, then add explode_env_vars
try:
env_val = self.settings_cls.__config__.json_loads(env_val)
env_val = json.loads(env_val)
except ValueError as e:
if not allow_parse_failure:
raise SettingsError(
@ -253,16 +253,16 @@ class DotEnvSettingsSource(BaseSettingsSource):
if isinstance(env_val, dict):
# field value is a dict
# try explode_env_vars to find more sub-values
d[field_key] = deep_update(
d[field_name] = deep_update(
env_val,
self._explode_env_vars(field, env_vars, env_file_vars),
)
else:
d[field_key] = env_val
elif not isinstance(env_val, UndefinedType):
d[field_name] = env_val
elif env_val is not PydanticUndefined:
# simplest case, field is not complex
# we only need to add the value if it was found
d[field_key] = env_val
d[field_name] = env_val
# remain user custom config
for env_name in env_file_vars:
@ -270,7 +270,7 @@ class DotEnvSettingsSource(BaseSettingsSource):
if env_val and (val_striped := env_val.strip()):
# there's a value, decode that as JSON
try:
env_val = self.settings_cls.__config__.json_loads(val_striped)
env_val = json.loads(val_striped)
except ValueError:
logger.trace(
"Error while parsing JSON for "
@ -294,23 +294,45 @@ class DotEnvSettingsSource(BaseSettingsSource):
return d
class SettingsConfig(BaseConfig):
extra = Extra.allow
env_file: Optional[DOTENV_TYPE] = None
env_file_encoding: str = "utf-8"
case_sensitive: bool = False
env_nested_delimiter: Optional[str] = "__"
if PYDANTIC_V2: # pragma: pydantic-v2
class SettingsConfig(ConfigDict, total=False):
env_file: Optional[DOTENV_TYPE]
env_file_encoding: str
case_sensitive: bool
env_nested_delimiter: Optional[str]
else: # pragma: pydantic-v1
class SettingsConfig(ConfigDict):
env_file: Optional[DOTENV_TYPE]
env_file_encoding: str
case_sensitive: bool
env_nested_delimiter: Optional[str]
class BaseSettings(BaseModel):
if TYPE_CHECKING:
__config__: ClassVar[Type[SettingsConfig]]
# dummy getattr for pylance checking, actually not used
def __getattr__(self, name: str) -> Any: # pragma: no cover
return self.__dict__.get(name)
Config = SettingsConfig
if PYDANTIC_V2: # pragma: pydantic-v2
model_config: SettingsConfig = SettingsConfig(
extra="allow",
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
env_nested_delimiter="__",
)
else: # pragma: pydantic-v1
class Config(SettingsConfig):
extra = "allow" # type: ignore
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = False
env_nested_delimiter = "__"
def __init__(
__settings_self__, # pyright: ignore[reportSelfClsParameterName]
@ -357,9 +379,6 @@ class Env(BaseSettings):
NoneBot 将从 `.env.{environment}` 文件中加载配置。
"""
class Config:
env_file = ".env"
class Config(BaseSettings):
"""NoneBot 主要配置。大小写不敏感。
@ -370,7 +389,8 @@ class Config(BaseSettings):
配置方法参考: [配置](https://nonebot.dev/docs/appendices/config)
"""
_env_file: Optional[DOTENV_TYPE] = ".env", ".env.prod"
if TYPE_CHECKING:
_env_file: Optional[DOTENV_TYPE] = ".env", ".env.prod"
# nonebot configs
driver: str = "~fastapi"
@ -455,8 +475,12 @@ class Config(BaseSettings):
# custom configs can be assigned during nonebot.init
# or from env file using json loads
class Config:
env_file = ".env", ".env.prod"
if PYDANTIC_V2: # pragma: pydantic-v2
model_config = SettingsConfig(env_file=(".env", ".env.prod"))
else: # pragma: pydantic-v1
class Config(SettingsConfig):
env_file = ".env", ".env.prod"
__autodoc__ = {