Feature: 新增 dotenv 嵌套配置项支持 (#1324)

Co-authored-by: hemengyang <hmy0119@hotmail.com>
This commit is contained in:
Ju4tCode
2022-10-14 09:58:44 +08:00
committed by GitHub
parent 67b96528af
commit db534b8824
8 changed files with 90 additions and 66 deletions

View File

@ -14,14 +14,14 @@ from datetime import timedelta
from ipaddress import IPv4Address
from typing import TYPE_CHECKING, Any, Set, Dict, Tuple, Union, Mapping, Optional
from pydantic import BaseSettings, IPvAnyAddress
from pydantic.utils import deep_update
from pydantic import Extra, BaseSettings, IPvAnyAddress
from pydantic.env_settings import (
DotenvType,
SettingsError,
EnvSettingsSource,
InitSettingsSource,
SettingsSourceCallable,
read_env_file,
env_file_sentinel,
)
from nonebot.log import logger
@ -32,33 +32,15 @@ class CustomEnvSettings(EnvSettingsSource):
"""
Build environment variables suitable for passing to the Model.
"""
d: Dict[str, Optional[str]] = {}
d: Dict[str, Any] = {}
if settings.__config__.case_sensitive:
env_vars: Mapping[str, Optional[str]] = os.environ # pragma: no cover
else:
env_vars = {k.lower(): v for k, v in os.environ.items()}
env_file_vars: Dict[str, Optional[str]] = {}
env_file = (
self.env_file
if self.env_file != env_file_sentinel
else settings.__config__.env_file
)
env_file_encoding = (
self.env_file_encoding
if self.env_file_encoding is not None
else settings.__config__.env_file_encoding
)
if env_file is not None:
env_path = Path(env_file)
if env_path.is_file():
env_file_vars = read_env_file(
env_path,
encoding=env_file_encoding, # type: ignore
case_sensitive=settings.__config__.case_sensitive,
)
env_vars = {**env_file_vars, **env_vars}
env_file_vars = self._read_env_files(settings.__config__.case_sensitive)
env_vars = {**env_file_vars, **env_vars}
for field in settings.__fields__.values():
env_val: Optional[str] = None
@ -69,31 +51,56 @@ class CustomEnvSettings(EnvSettingsSource):
if env_val is not None:
break
if env_val is None:
continue
if field.is_complex():
try:
env_val = settings.__config__.json_loads(env_val)
except ValueError as e: # pragma: no cover
raise SettingsError(
f'error parsing JSON for "{env_name}"' # type: ignore
) from e
d[field.alias] = env_val
if env_file_vars:
for env_name in env_file_vars.keys():
env_val = env_vars[env_name]
if env_val and (val_striped := env_val.strip()):
is_complex, allow_parse_failure = self.field_is_complex(field)
if is_complex:
if env_val is None:
if env_val_built := self.explode_env_vars(field, env_vars):
d[field.alias] = env_val_built
else:
# field is complex and there's a value, decode that as JSON, then add explode_env_vars
try:
env_val = settings.__config__.json_loads(val_striped)
env_val = settings.__config__.parse_env_var(field.name, env_val)
except ValueError as e:
logger.trace(
"Error while parsing JSON for "
f"{env_name!r}={val_striped!r}. "
"Assumed as string."
)
if not allow_parse_failure:
raise SettingsError(
f'error parsing env var "{env_name}"' # type: ignore
) from e
if isinstance(env_val, dict):
d[field.alias] = deep_update(
env_val, self.explode_env_vars(field, env_vars)
)
else:
d[field.alias] = env_val
elif env_val is not None:
# simplest case, field is not complex, we only need to add the value if it was found
d[field.alias] = env_val
# remain user custom config
for env_name in env_file_vars:
env_val = env_vars[env_name]
if env_val and (val_striped := env_val.strip()):
# there's a value, decode that as JSON
try:
env_val = settings.__config__.parse_env_var(env_name, val_striped)
except ValueError as e:
logger.trace(
"Error while parsing JSON for "
f"{env_name!r}={val_striped!r}. "
"Assumed as string."
)
# explode value when it's a nested dict
env_name, *nested_keys = env_name.split(self.env_nested_delimiter)
if nested_keys and (env_name not in d or isinstance(d[env_name], dict)):
result = {}
*keys, last_key = nested_keys
_tmp = result
for key in keys:
_tmp = _tmp.setdefault(key, {})
_tmp[last_key] = env_val
d[env_name] = deep_update(d.get(env_name, {}), result)
elif not nested_keys:
d[env_name] = env_val
return d
@ -106,6 +113,9 @@ class BaseConfig(BaseSettings):
return self.__dict__.get(name)
class Config:
extra = Extra.allow
env_nested_delimiter = "__"
@classmethod
def customise_sources(
cls,
@ -117,7 +127,10 @@ class BaseConfig(BaseSettings):
return (
init_settings,
CustomEnvSettings(
env_settings.env_file, env_settings.env_file_encoding
env_settings.env_file,
env_settings.env_file_encoding,
env_settings.env_nested_delimiter,
env_settings.env_prefix_len,
),
InitSettingsSource(common_config),
file_secret_settings,
@ -137,7 +150,6 @@ class Env(BaseConfig):
"""
class Config:
extra = "allow"
env_file = ".env"
@ -150,8 +162,7 @@ class Config(BaseConfig):
配置方法参考: [配置](https://v2.nonebot.dev/docs/tutorial/configuration)
"""
_env_file: str = ".env"
_common_config: Dict[str, Any] = {}
_env_file: DotenvType = ".env", ".env.prod"
# nonebot configs
driver: str = "~fastapi"
@ -231,8 +242,7 @@ class Config(BaseConfig):
# or from env file using json loads
class Config:
extra = "allow"
env_file = ".env.prod"
env_file = ".env", ".env.prod"
__autodoc__ = {