update to python fastpi
This commit is contained in:
@@ -0,0 +1,22 @@
|
||||
from .main import BaseSettings, SettingsConfigDict
|
||||
from .sources import (
|
||||
DotEnvSettingsSource,
|
||||
EnvSettingsSource,
|
||||
InitSettingsSource,
|
||||
PydanticBaseSettingsSource,
|
||||
SecretsSettingsSource,
|
||||
)
|
||||
from .version import VERSION
|
||||
|
||||
__all__ = (
|
||||
'BaseSettings',
|
||||
'DotEnvSettingsSource',
|
||||
'EnvSettingsSource',
|
||||
'InitSettingsSource',
|
||||
'PydanticBaseSettingsSource',
|
||||
'SecretsSettingsSource',
|
||||
'SettingsConfigDict',
|
||||
'__version__',
|
||||
)
|
||||
|
||||
__version__ = VERSION
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,177 @@
|
||||
from __future__ import annotations as _annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from pydantic import ConfigDict
|
||||
from pydantic._internal._config import config_keys
|
||||
from pydantic._internal._utils import deep_update
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from .sources import (
|
||||
ENV_FILE_SENTINEL,
|
||||
DotEnvSettingsSource,
|
||||
DotenvType,
|
||||
EnvSettingsSource,
|
||||
InitSettingsSource,
|
||||
PydanticBaseSettingsSource,
|
||||
SecretsSettingsSource,
|
||||
)
|
||||
|
||||
|
||||
class SettingsConfigDict(ConfigDict, total=False):
|
||||
case_sensitive: bool
|
||||
env_prefix: str
|
||||
env_file: DotenvType | None
|
||||
env_file_encoding: str | None
|
||||
env_nested_delimiter: str | None
|
||||
secrets_dir: str | Path | None
|
||||
|
||||
|
||||
# Extend `config_keys` by pydantic settings config keys to
|
||||
# support setting config through class kwargs.
|
||||
# Pydantic uses `config_keys` in `pydantic._internal._config.ConfigWrapper.for_model`
|
||||
# to extract config keys from model kwargs, So, by adding pydantic settings keys to
|
||||
# `config_keys`, they will be considered as valid config keys and will be collected
|
||||
# by Pydantic.
|
||||
config_keys |= set(SettingsConfigDict.__annotations__.keys())
|
||||
|
||||
|
||||
class BaseSettings(BaseModel):
|
||||
"""
|
||||
Base class for settings, allowing values to be overridden by environment variables.
|
||||
|
||||
This is useful in production for secrets you do not wish to save in code, it plays nicely with docker(-compose),
|
||||
Heroku and any 12 factor app design.
|
||||
|
||||
All the below attributes can be set via `model_config`.
|
||||
|
||||
Args:
|
||||
_case_sensitive: Whether environment variables names should be read with case-sensitivity. Defaults to `None`.
|
||||
_env_prefix: Prefix for all environment variables. Defaults to `None`.
|
||||
_env_file: The env file(s) to load settings values from. Defaults to `Path('')`, which
|
||||
means that the value from `model_config['env_file']` should be used. You can also pass
|
||||
`None` to indicate that environment variables should not be loaded from an env file.
|
||||
_env_file_encoding: The env file encoding, e.g. `'latin-1'`. Defaults to `None`.
|
||||
_env_nested_delimiter: The nested env values delimiter. Defaults to `None`.
|
||||
_secrets_dir: The secret files directory. Defaults to `None`.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
__pydantic_self__,
|
||||
_case_sensitive: bool | None = None,
|
||||
_env_prefix: str | None = None,
|
||||
_env_file: DotenvType | None = ENV_FILE_SENTINEL,
|
||||
_env_file_encoding: str | None = None,
|
||||
_env_nested_delimiter: str | None = None,
|
||||
_secrets_dir: str | Path | None = None,
|
||||
**values: Any,
|
||||
) -> None:
|
||||
# Uses something other than `self` the first arg to allow "self" as a settable attribute
|
||||
super().__init__(
|
||||
**__pydantic_self__._settings_build_values(
|
||||
values,
|
||||
_case_sensitive=_case_sensitive,
|
||||
_env_prefix=_env_prefix,
|
||||
_env_file=_env_file,
|
||||
_env_file_encoding=_env_file_encoding,
|
||||
_env_nested_delimiter=_env_nested_delimiter,
|
||||
_secrets_dir=_secrets_dir,
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def settings_customise_sources(
|
||||
cls,
|
||||
settings_cls: type[BaseSettings],
|
||||
init_settings: PydanticBaseSettingsSource,
|
||||
env_settings: PydanticBaseSettingsSource,
|
||||
dotenv_settings: PydanticBaseSettingsSource,
|
||||
file_secret_settings: PydanticBaseSettingsSource,
|
||||
) -> tuple[PydanticBaseSettingsSource, ...]:
|
||||
"""
|
||||
Define the sources and their order for loading the settings values.
|
||||
|
||||
Args:
|
||||
settings_cls: The Settings class.
|
||||
init_settings: The `InitSettingsSource` instance.
|
||||
env_settings: The `EnvSettingsSource` instance.
|
||||
dotenv_settings: The `DotEnvSettingsSource` instance.
|
||||
file_secret_settings: The `SecretsSettingsSource` instance.
|
||||
|
||||
Returns:
|
||||
A tuple containing the sources and their order for loading the settings values.
|
||||
"""
|
||||
return init_settings, env_settings, dotenv_settings, file_secret_settings
|
||||
|
||||
def _settings_build_values(
|
||||
self,
|
||||
init_kwargs: dict[str, Any],
|
||||
_case_sensitive: bool | None = None,
|
||||
_env_prefix: str | None = None,
|
||||
_env_file: DotenvType | None = None,
|
||||
_env_file_encoding: str | None = None,
|
||||
_env_nested_delimiter: str | None = None,
|
||||
_secrets_dir: str | Path | None = None,
|
||||
) -> dict[str, Any]:
|
||||
# Determine settings config values
|
||||
case_sensitive = _case_sensitive if _case_sensitive is not None else self.model_config.get('case_sensitive')
|
||||
env_prefix = _env_prefix if _env_prefix is not None else self.model_config.get('env_prefix')
|
||||
env_file = _env_file if _env_file != ENV_FILE_SENTINEL else self.model_config.get('env_file')
|
||||
env_file_encoding = (
|
||||
_env_file_encoding if _env_file_encoding is not None else self.model_config.get('env_file_encoding')
|
||||
)
|
||||
env_nested_delimiter = (
|
||||
_env_nested_delimiter
|
||||
if _env_nested_delimiter is not None
|
||||
else self.model_config.get('env_nested_delimiter')
|
||||
)
|
||||
secrets_dir = _secrets_dir if _secrets_dir is not None else self.model_config.get('secrets_dir')
|
||||
|
||||
# Configure built-in sources
|
||||
init_settings = InitSettingsSource(self.__class__, init_kwargs=init_kwargs)
|
||||
env_settings = EnvSettingsSource(
|
||||
self.__class__,
|
||||
case_sensitive=case_sensitive,
|
||||
env_prefix=env_prefix,
|
||||
env_nested_delimiter=env_nested_delimiter,
|
||||
)
|
||||
dotenv_settings = DotEnvSettingsSource(
|
||||
self.__class__,
|
||||
env_file=env_file,
|
||||
env_file_encoding=env_file_encoding,
|
||||
case_sensitive=case_sensitive,
|
||||
env_prefix=env_prefix,
|
||||
env_nested_delimiter=env_nested_delimiter,
|
||||
)
|
||||
|
||||
file_secret_settings = SecretsSettingsSource(
|
||||
self.__class__, secrets_dir=secrets_dir, case_sensitive=case_sensitive, env_prefix=env_prefix
|
||||
)
|
||||
# Provide a hook to set built-in sources priority and add / remove sources
|
||||
sources = self.settings_customise_sources(
|
||||
self.__class__,
|
||||
init_settings=init_settings,
|
||||
env_settings=env_settings,
|
||||
dotenv_settings=dotenv_settings,
|
||||
file_secret_settings=file_secret_settings,
|
||||
)
|
||||
if sources:
|
||||
return deep_update(*reversed([source() for source in sources]))
|
||||
else:
|
||||
# no one should mean to do this, but I think returning an empty dict is marginally preferable
|
||||
# to an informative error and much better than a confusing error
|
||||
return {}
|
||||
|
||||
model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
|
||||
extra='forbid',
|
||||
arbitrary_types_allowed=True,
|
||||
validate_default=True,
|
||||
case_sensitive=False,
|
||||
env_prefix='',
|
||||
env_file=None,
|
||||
env_file_encoding=None,
|
||||
env_nested_delimiter=None,
|
||||
secrets_dir=None,
|
||||
protected_namespaces=('model_', 'settings_'),
|
||||
)
|
||||
@@ -0,0 +1,649 @@
|
||||
from __future__ import annotations as _annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from collections import deque
|
||||
from dataclasses import is_dataclass
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, List, Mapping, Sequence, Tuple, Union, cast
|
||||
|
||||
from dotenv import dotenv_values
|
||||
from pydantic import AliasChoices, AliasPath, BaseModel, Json
|
||||
from pydantic._internal._typing_extra import origin_is_union
|
||||
from pydantic._internal._utils import deep_update, lenient_issubclass
|
||||
from pydantic.fields import FieldInfo
|
||||
from typing_extensions import get_args, get_origin
|
||||
|
||||
from pydantic_settings.utils import path_type_label
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pydantic_settings.main import BaseSettings
|
||||
|
||||
|
||||
DotenvType = Union[Path, str, List[Union[Path, str]], Tuple[Union[Path, str], ...]]
|
||||
|
||||
# This is used as default value for `_env_file` in the `BaseSettings` class and
|
||||
# `env_file` in `DotEnvSettingsSource` so the default can be distinguished from `None`.
|
||||
# See the docstring of `BaseSettings` for more details.
|
||||
ENV_FILE_SENTINEL: DotenvType = Path('')
|
||||
|
||||
|
||||
class SettingsError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class PydanticBaseSettingsSource(ABC):
|
||||
"""
|
||||
Abstract base class for settings sources, every settings source classes should inherit from it.
|
||||
"""
|
||||
|
||||
def __init__(self, settings_cls: type[BaseSettings]):
|
||||
self.settings_cls = settings_cls
|
||||
self.config = settings_cls.model_config
|
||||
|
||||
@abstractmethod
|
||||
def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
|
||||
"""
|
||||
Gets the value, the key for model creation, and a flag to determine whether value is complex.
|
||||
|
||||
This is an abstract method that should be overridden in every settings source classes.
|
||||
|
||||
Args:
|
||||
field: The field.
|
||||
field_name: The field name.
|
||||
|
||||
Returns:
|
||||
A tuple contains the key, value and a flag to determine whether value is complex.
|
||||
"""
|
||||
pass
|
||||
|
||||
def field_is_complex(self, field: FieldInfo) -> bool:
|
||||
"""
|
||||
Checks whether a field is complex, in which case it will attempt to be parsed as JSON.
|
||||
|
||||
Args:
|
||||
field: The field.
|
||||
|
||||
Returns:
|
||||
Whether the field is complex.
|
||||
"""
|
||||
return _annotation_is_complex(field.annotation, field.metadata)
|
||||
|
||||
def prepare_field_value(self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool) -> Any:
|
||||
"""
|
||||
Prepares the value of a field.
|
||||
|
||||
Args:
|
||||
field_name: The field name.
|
||||
field: The field.
|
||||
value: The value of the field that has to be prepared.
|
||||
value_is_complex: A flag to determine whether value is complex.
|
||||
|
||||
Returns:
|
||||
The prepared value.
|
||||
"""
|
||||
if value is not None and (self.field_is_complex(field) or value_is_complex):
|
||||
return self.decode_complex_value(field_name, field, value)
|
||||
return value
|
||||
|
||||
def decode_complex_value(self, field_name: str, field: FieldInfo, value: Any) -> Any:
|
||||
"""
|
||||
Decode the value for a complex field
|
||||
|
||||
Args:
|
||||
field_name: The field name.
|
||||
field: The field.
|
||||
value: The value of the field that has to be prepared.
|
||||
|
||||
Returns:
|
||||
The decoded value for further preparation
|
||||
"""
|
||||
return json.loads(value)
|
||||
|
||||
@abstractmethod
|
||||
def __call__(self) -> dict[str, Any]:
|
||||
pass
|
||||
|
||||
|
||||
class InitSettingsSource(PydanticBaseSettingsSource):
|
||||
"""
|
||||
Source class for loading values provided during settings class initialization.
|
||||
"""
|
||||
|
||||
def __init__(self, settings_cls: type[BaseSettings], init_kwargs: dict[str, Any]):
|
||||
self.init_kwargs = init_kwargs
|
||||
super().__init__(settings_cls)
|
||||
|
||||
def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
|
||||
# Nothing to do here. Only implement the return statement to make mypy happy
|
||||
return None, '', False
|
||||
|
||||
def __call__(self) -> dict[str, Any]:
|
||||
return self.init_kwargs
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'InitSettingsSource(init_kwargs={self.init_kwargs!r})'
|
||||
|
||||
|
||||
class PydanticBaseEnvSettingsSource(PydanticBaseSettingsSource):
|
||||
def __init__(
|
||||
self, settings_cls: type[BaseSettings], case_sensitive: bool | None = None, env_prefix: str | None = None
|
||||
) -> None:
|
||||
super().__init__(settings_cls)
|
||||
self.case_sensitive = case_sensitive if case_sensitive is not None else self.config.get('case_sensitive', False)
|
||||
self.env_prefix = env_prefix if env_prefix is not None else self.config.get('env_prefix', '')
|
||||
|
||||
def _apply_case_sensitive(self, value: str) -> str:
|
||||
return value.lower() if not self.case_sensitive else value
|
||||
|
||||
def _extract_field_info(self, field: FieldInfo, field_name: str) -> list[tuple[str, str, bool]]:
|
||||
"""
|
||||
Extracts field info. This info is used to get the value of field from environment variables.
|
||||
|
||||
It returns a list of tuples, each tuple contains:
|
||||
* field_key: The key of field that has to be used in model creation.
|
||||
* env_name: The environment variable name of the field.
|
||||
* value_is_complex: A flag to determine whether the value from environment variable
|
||||
is complex and has to be parsed.
|
||||
|
||||
Args:
|
||||
field (FieldInfo): The field.
|
||||
field_name (str): The field name.
|
||||
|
||||
Returns:
|
||||
list[tuple[str, str, bool]]: List of tuples, each tuple contains field_key, env_name, and value_is_complex.
|
||||
"""
|
||||
field_info: list[tuple[str, str, bool]] = []
|
||||
if isinstance(field.validation_alias, (AliasChoices, AliasPath)):
|
||||
v_alias: str | list[str | int] | list[list[str | int]] | None = field.validation_alias.convert_to_aliases()
|
||||
else:
|
||||
v_alias = field.validation_alias
|
||||
|
||||
if v_alias:
|
||||
if isinstance(v_alias, list): # AliasChoices, AliasPath
|
||||
for alias in v_alias:
|
||||
if isinstance(alias, str): # AliasPath
|
||||
field_info.append((alias, self._apply_case_sensitive(alias), True if len(alias) > 1 else False))
|
||||
elif isinstance(alias, list): # AliasChoices
|
||||
first_arg = cast(str, alias[0]) # first item of an AliasChoices must be a str
|
||||
field_info.append(
|
||||
(first_arg, self._apply_case_sensitive(first_arg), True if len(alias) > 1 else False)
|
||||
)
|
||||
else: # string validation alias
|
||||
field_info.append((v_alias, self._apply_case_sensitive(v_alias), False))
|
||||
else:
|
||||
field_info.append((field_name, self._apply_case_sensitive(self.env_prefix + field_name), False))
|
||||
|
||||
return field_info
|
||||
|
||||
def _replace_field_names_case_insensitively(self, field: FieldInfo, field_values: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Replace field names in values dict by looking in models fields insensitively.
|
||||
|
||||
By having the following models:
|
||||
|
||||
```py
|
||||
class SubSubSub(BaseModel):
|
||||
VaL3: str
|
||||
|
||||
class SubSub(BaseModel):
|
||||
Val2: str
|
||||
SUB_sub_SuB: SubSubSub
|
||||
|
||||
class Sub(BaseModel):
|
||||
VAL1: str
|
||||
SUB_sub: SubSub
|
||||
|
||||
class Settings(BaseSettings):
|
||||
nested: Sub
|
||||
|
||||
model_config = SettingsConfigDict(env_nested_delimiter='__')
|
||||
```
|
||||
|
||||
Then:
|
||||
_replace_field_names_case_insensitively(
|
||||
field,
|
||||
{"val1": "v1", "sub_SUB": {"VAL2": "v2", "sub_SUB_sUb": {"vAl3": "v3"}}}
|
||||
)
|
||||
Returns {'VAL1': 'v1', 'SUB_sub': {'Val2': 'v2', 'SUB_sub_SuB': {'VaL3': 'v3'}}}
|
||||
"""
|
||||
values: dict[str, Any] = {}
|
||||
|
||||
for name, value in field_values.items():
|
||||
sub_model_field: FieldInfo | None = None
|
||||
|
||||
# This is here to make mypy happy
|
||||
# Item "None" of "Optional[Type[Any]]" has no attribute "model_fields"
|
||||
if not field.annotation or not hasattr(field.annotation, 'model_fields'):
|
||||
values[name] = value
|
||||
continue
|
||||
|
||||
# Find field in sub model by looking in fields case insensitively
|
||||
for sub_model_field_name, f in field.annotation.model_fields.items():
|
||||
if not f.validation_alias and sub_model_field_name.lower() == name.lower():
|
||||
sub_model_field = f
|
||||
break
|
||||
|
||||
if not sub_model_field:
|
||||
values[name] = value
|
||||
continue
|
||||
|
||||
if lenient_issubclass(sub_model_field.annotation, BaseModel) and isinstance(value, dict):
|
||||
values[sub_model_field_name] = self._replace_field_names_case_insensitively(sub_model_field, value)
|
||||
else:
|
||||
values[sub_model_field_name] = value
|
||||
|
||||
return values
|
||||
|
||||
def __call__(self) -> dict[str, Any]:
|
||||
data: dict[str, Any] = {}
|
||||
|
||||
for field_name, field in self.settings_cls.model_fields.items():
|
||||
try:
|
||||
field_value, field_key, value_is_complex = self.get_field_value(field, field_name)
|
||||
except Exception as e:
|
||||
raise SettingsError(
|
||||
f'error getting value for field "{field_name}" from source "{self.__class__.__name__}"'
|
||||
) from e
|
||||
|
||||
try:
|
||||
field_value = self.prepare_field_value(field_name, field, field_value, value_is_complex)
|
||||
except ValueError as e:
|
||||
raise SettingsError(
|
||||
f'error parsing value for field "{field_name}" from source "{self.__class__.__name__}"'
|
||||
) from e
|
||||
|
||||
if field_value is not None:
|
||||
if (
|
||||
not self.case_sensitive
|
||||
and lenient_issubclass(field.annotation, BaseModel)
|
||||
and isinstance(field_value, dict)
|
||||
):
|
||||
data[field_key] = self._replace_field_names_case_insensitively(field, field_value)
|
||||
else:
|
||||
data[field_key] = field_value
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class SecretsSettingsSource(PydanticBaseEnvSettingsSource):
|
||||
"""
|
||||
Source class for loading settings values from secret files.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
settings_cls: type[BaseSettings],
|
||||
secrets_dir: str | Path | None = None,
|
||||
case_sensitive: bool | None = None,
|
||||
env_prefix: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(settings_cls, case_sensitive, env_prefix)
|
||||
self.secrets_dir = secrets_dir if secrets_dir is not None else self.config.get('secrets_dir')
|
||||
|
||||
def __call__(self) -> dict[str, Any]:
|
||||
"""
|
||||
Build fields from "secrets" files.
|
||||
"""
|
||||
secrets: dict[str, str | None] = {}
|
||||
|
||||
if self.secrets_dir is None:
|
||||
return secrets
|
||||
|
||||
self.secrets_path = Path(self.secrets_dir).expanduser()
|
||||
|
||||
if not self.secrets_path.exists():
|
||||
warnings.warn(f'directory "{self.secrets_path}" does not exist')
|
||||
return secrets
|
||||
|
||||
if not self.secrets_path.is_dir():
|
||||
raise SettingsError(f'secrets_dir must reference a directory, not a {path_type_label(self.secrets_path)}')
|
||||
|
||||
return super().__call__()
|
||||
|
||||
@classmethod
|
||||
def find_case_path(cls, dir_path: Path, file_name: str, case_sensitive: bool) -> Path | None:
|
||||
"""
|
||||
Find a file within path's directory matching filename, optionally ignoring case.
|
||||
|
||||
Args:
|
||||
dir_path: Directory path.
|
||||
file_name: File name.
|
||||
case_sensitive: Whether to search for file name case sensitively.
|
||||
|
||||
Returns:
|
||||
Whether file path or `None` if file does not exist in directory.
|
||||
"""
|
||||
for f in dir_path.iterdir():
|
||||
if f.name == file_name:
|
||||
return f
|
||||
elif not case_sensitive and f.name.lower() == file_name.lower():
|
||||
return f
|
||||
return None
|
||||
|
||||
def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
|
||||
"""
|
||||
Gets the value for field from secret file and a flag to determine whether value is complex.
|
||||
|
||||
Args:
|
||||
field: The field.
|
||||
field_name: The field name.
|
||||
|
||||
Returns:
|
||||
A tuple contains the key, value if the file exists otherwise `None`, and
|
||||
a flag to determine whether value is complex.
|
||||
"""
|
||||
|
||||
for field_key, env_name, value_is_complex in self._extract_field_info(field, field_name):
|
||||
path = self.find_case_path(self.secrets_path, env_name, self.case_sensitive)
|
||||
if not path:
|
||||
# path does not exist, we currently don't return a warning for this
|
||||
continue
|
||||
|
||||
if path.is_file():
|
||||
return path.read_text().strip(), field_key, value_is_complex
|
||||
else:
|
||||
warnings.warn(
|
||||
f'attempted to load secret file "{path}" but found a {path_type_label(path)} instead.',
|
||||
stacklevel=4,
|
||||
)
|
||||
|
||||
return None, field_key, value_is_complex
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'SecretsSettingsSource(secrets_dir={self.secrets_dir!r})'
|
||||
|
||||
|
||||
class EnvSettingsSource(PydanticBaseEnvSettingsSource):
|
||||
"""
|
||||
Source class for loading settings values from environment variables.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
settings_cls: type[BaseSettings],
|
||||
case_sensitive: bool | None = None,
|
||||
env_prefix: str | None = None,
|
||||
env_nested_delimiter: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(settings_cls, case_sensitive, env_prefix)
|
||||
self.env_nested_delimiter = (
|
||||
env_nested_delimiter if env_nested_delimiter is not None else self.config.get('env_nested_delimiter')
|
||||
)
|
||||
self.env_prefix_len = len(self.env_prefix)
|
||||
|
||||
self.env_vars = self._load_env_vars()
|
||||
|
||||
def _load_env_vars(self) -> Mapping[str, str | None]:
|
||||
if self.case_sensitive:
|
||||
return os.environ
|
||||
return {k.lower(): v for k, v in os.environ.items()}
|
||||
|
||||
def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
|
||||
"""
|
||||
Gets the value for field from environment variables and a flag to determine whether value is complex.
|
||||
|
||||
Args:
|
||||
field: The field.
|
||||
field_name: The field name.
|
||||
|
||||
Returns:
|
||||
A tuple contains the key, value if the file exists otherwise `None`, and
|
||||
a flag to determine whether value is complex.
|
||||
"""
|
||||
|
||||
env_val: str | None = None
|
||||
for field_key, env_name, value_is_complex in self._extract_field_info(field, field_name):
|
||||
env_val = self.env_vars.get(env_name)
|
||||
if env_val is not None:
|
||||
break
|
||||
|
||||
return env_val, field_key, value_is_complex
|
||||
|
||||
def prepare_field_value(self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool) -> Any:
|
||||
"""
|
||||
Prepare value for the field.
|
||||
|
||||
* Extract value for nested field.
|
||||
* Deserialize value to python object for complex field.
|
||||
|
||||
Args:
|
||||
field: The field.
|
||||
field_name: The field name.
|
||||
|
||||
Returns:
|
||||
A tuple contains prepared value for the field.
|
||||
|
||||
Raises:
|
||||
ValuesError: When There is an error in deserializing value for complex field.
|
||||
"""
|
||||
is_complex, allow_parse_failure = self._field_is_complex(field)
|
||||
if is_complex or value_is_complex:
|
||||
if value is None:
|
||||
# field is complex but no value found so far, try explode_env_vars
|
||||
env_val_built = self.explode_env_vars(field_name, field, self.env_vars)
|
||||
if env_val_built:
|
||||
return env_val_built
|
||||
else:
|
||||
# field is complex and there's a value, decode that as JSON, then add explode_env_vars
|
||||
try:
|
||||
value = self.decode_complex_value(field_name, field, value)
|
||||
except ValueError as e:
|
||||
if not allow_parse_failure:
|
||||
raise e
|
||||
|
||||
if isinstance(value, dict):
|
||||
return deep_update(value, self.explode_env_vars(field_name, field, self.env_vars))
|
||||
else:
|
||||
return value
|
||||
elif value is not None:
|
||||
# simplest case, field is not complex, we only need to add the value if it was found
|
||||
return value
|
||||
|
||||
def _union_is_complex(self, annotation: type[Any] | None, metadata: list[Any]) -> bool:
|
||||
return any(_annotation_is_complex(arg, metadata) for arg in get_args(annotation))
|
||||
|
||||
def _field_is_complex(self, field: FieldInfo) -> tuple[bool, bool]:
|
||||
"""
|
||||
Find out if a field is complex, and if so whether JSON errors should be ignored
|
||||
"""
|
||||
if self.field_is_complex(field):
|
||||
allow_parse_failure = False
|
||||
elif origin_is_union(get_origin(field.annotation)) and self._union_is_complex(field.annotation, field.metadata):
|
||||
allow_parse_failure = True
|
||||
else:
|
||||
return False, False
|
||||
|
||||
return True, allow_parse_failure
|
||||
|
||||
@staticmethod
|
||||
def next_field(field: FieldInfo | None, key: str) -> FieldInfo | None:
|
||||
"""
|
||||
Find the field in a sub model by key(env name)
|
||||
|
||||
By having the following models:
|
||||
|
||||
```py
|
||||
class SubSubModel(BaseSettings):
|
||||
dvals: Dict
|
||||
|
||||
class SubModel(BaseSettings):
|
||||
vals: list[str]
|
||||
sub_sub_model: SubSubModel
|
||||
|
||||
class Cfg(BaseSettings):
|
||||
sub_model: SubModel
|
||||
```
|
||||
|
||||
Then:
|
||||
next_field(sub_model, 'vals') Returns the `vals` field of `SubModel` class
|
||||
next_field(sub_model, 'sub_sub_model') Returns `sub_sub_model` field of `SubModel` class
|
||||
|
||||
Args:
|
||||
field: The field.
|
||||
key: The key (env name).
|
||||
|
||||
Returns:
|
||||
Field if it finds the next field otherwise `None`.
|
||||
"""
|
||||
if not field or origin_is_union(get_origin(field.annotation)):
|
||||
# no support for Unions of complex BaseSettings fields
|
||||
return None
|
||||
elif field.annotation and hasattr(field.annotation, 'model_fields') and field.annotation.model_fields.get(key):
|
||||
return field.annotation.model_fields[key]
|
||||
|
||||
return None
|
||||
|
||||
def explode_env_vars(self, field_name: str, field: FieldInfo, env_vars: Mapping[str, str | None]) -> dict[str, Any]:
|
||||
"""
|
||||
Process env_vars and extract the values of keys containing env_nested_delimiter into nested dictionaries.
|
||||
|
||||
This is applied to a single field, hence filtering by env_var prefix.
|
||||
|
||||
Args:
|
||||
field_name: The field name.
|
||||
field: The field.
|
||||
env_vars: Environment variables.
|
||||
|
||||
Returns:
|
||||
A dictionaty contains extracted values from nested env values.
|
||||
"""
|
||||
prefixes = [
|
||||
f'{env_name}{self.env_nested_delimiter}' for _, env_name, _ in self._extract_field_info(field, field_name)
|
||||
]
|
||||
result: dict[str, Any] = {}
|
||||
for env_name, env_val in env_vars.items():
|
||||
if not any(env_name.startswith(prefix) for prefix in prefixes):
|
||||
continue
|
||||
# we remove the prefix before splitting in case the prefix has characters in common with the delimiter
|
||||
env_name_without_prefix = env_name[self.env_prefix_len :]
|
||||
_, *keys, last_key = env_name_without_prefix.split(self.env_nested_delimiter)
|
||||
env_var = result
|
||||
target_field: FieldInfo | None = field
|
||||
for key in keys:
|
||||
target_field = self.next_field(target_field, key)
|
||||
env_var = env_var.setdefault(key, {})
|
||||
|
||||
# get proper field with last_key
|
||||
target_field = self.next_field(target_field, last_key)
|
||||
|
||||
# check if env_val maps to a complex field and if so, parse the env_val
|
||||
if target_field and env_val:
|
||||
is_complex, allow_json_failure = self._field_is_complex(target_field)
|
||||
if is_complex:
|
||||
try:
|
||||
env_val = self.decode_complex_value(last_key, target_field, env_val)
|
||||
except ValueError as e:
|
||||
if not allow_json_failure:
|
||||
raise e
|
||||
env_var[last_key] = env_val
|
||||
|
||||
return result
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'EnvSettingsSource(env_nested_delimiter={self.env_nested_delimiter!r}, '
|
||||
f'env_prefix_len={self.env_prefix_len!r})'
|
||||
)
|
||||
|
||||
|
||||
class DotEnvSettingsSource(EnvSettingsSource):
|
||||
"""
|
||||
Source class for loading settings values from env files.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
settings_cls: type[BaseSettings],
|
||||
env_file: DotenvType | None = ENV_FILE_SENTINEL,
|
||||
env_file_encoding: str | None = None,
|
||||
case_sensitive: bool | None = None,
|
||||
env_prefix: str | None = None,
|
||||
env_nested_delimiter: str | None = None,
|
||||
) -> None:
|
||||
self.env_file = env_file if env_file != ENV_FILE_SENTINEL else settings_cls.model_config.get('env_file')
|
||||
self.env_file_encoding = (
|
||||
env_file_encoding if env_file_encoding is not None else settings_cls.model_config.get('env_file_encoding')
|
||||
)
|
||||
super().__init__(settings_cls, case_sensitive, env_prefix, env_nested_delimiter)
|
||||
|
||||
def _load_env_vars(self) -> Mapping[str, str | None]:
|
||||
return self._read_env_files(self.case_sensitive)
|
||||
|
||||
def _read_env_files(self, case_sensitive: bool) -> Mapping[str, str | None]:
|
||||
env_files = self.env_file
|
||||
if env_files is None:
|
||||
return {}
|
||||
|
||||
if isinstance(env_files, (str, os.PathLike)):
|
||||
env_files = [env_files]
|
||||
|
||||
dotenv_vars: dict[str, str | None] = {}
|
||||
for env_file in env_files:
|
||||
env_path = Path(env_file).expanduser()
|
||||
if env_path.is_file():
|
||||
dotenv_vars.update(
|
||||
read_env_file(env_path, encoding=self.env_file_encoding, case_sensitive=case_sensitive)
|
||||
)
|
||||
|
||||
return dotenv_vars
|
||||
|
||||
def __call__(self) -> dict[str, Any]:
|
||||
data: dict[str, Any] = super().__call__()
|
||||
|
||||
data_lower_keys: list[str] = []
|
||||
if not self.case_sensitive:
|
||||
data_lower_keys = [x.lower() for x in data.keys()]
|
||||
|
||||
# As `extra` config is allowed in dotenv settings source, We have to
|
||||
# update data with extra env variabels from dotenv file.
|
||||
for env_name, env_value in self.env_vars.items():
|
||||
if env_name.startswith(self.env_prefix) and env_value is not None:
|
||||
env_name_without_prefix = env_name[self.env_prefix_len :]
|
||||
first_key, *_ = env_name_without_prefix.split(self.env_nested_delimiter)
|
||||
|
||||
if (data_lower_keys and first_key not in data_lower_keys) or (
|
||||
not data_lower_keys and first_key not in data
|
||||
):
|
||||
data[first_key] = env_value
|
||||
|
||||
return data
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'DotEnvSettingsSource(env_file={self.env_file!r}, env_file_encoding={self.env_file_encoding!r}, '
|
||||
f'env_nested_delimiter={self.env_nested_delimiter!r}, env_prefix_len={self.env_prefix_len!r})'
|
||||
)
|
||||
|
||||
|
||||
def read_env_file(
|
||||
file_path: Path, *, encoding: str | None = None, case_sensitive: bool = False
|
||||
) -> Mapping[str, str | None]:
|
||||
file_vars: dict[str, str | None] = dotenv_values(file_path, encoding=encoding or 'utf8')
|
||||
if not case_sensitive:
|
||||
return {k.lower(): v for k, v in file_vars.items()}
|
||||
else:
|
||||
return file_vars
|
||||
|
||||
|
||||
def _annotation_is_complex(annotation: type[Any] | None, metadata: list[Any]) -> bool:
|
||||
if any(isinstance(md, Json) for md in metadata): # type: ignore[misc]
|
||||
return False
|
||||
origin = get_origin(annotation)
|
||||
return (
|
||||
_annotation_is_complex_inner(annotation)
|
||||
or _annotation_is_complex_inner(origin)
|
||||
or hasattr(origin, '__pydantic_core_schema__')
|
||||
or hasattr(origin, '__get_pydantic_core_schema__')
|
||||
)
|
||||
|
||||
|
||||
def _annotation_is_complex_inner(annotation: type[Any] | None) -> bool:
|
||||
if lenient_issubclass(annotation, (str, bytes)):
|
||||
return False
|
||||
|
||||
return lenient_issubclass(annotation, (BaseModel, Mapping, Sequence, tuple, set, frozenset, deque)) or is_dataclass(
|
||||
annotation
|
||||
)
|
||||
@@ -0,0 +1,24 @@
|
||||
from pathlib import Path
|
||||
|
||||
path_type_labels = {
|
||||
'is_dir': 'directory',
|
||||
'is_file': 'file',
|
||||
'is_mount': 'mount point',
|
||||
'is_symlink': 'symlink',
|
||||
'is_block_device': 'block device',
|
||||
'is_char_device': 'char device',
|
||||
'is_fifo': 'FIFO',
|
||||
'is_socket': 'socket',
|
||||
}
|
||||
|
||||
|
||||
def path_type_label(p: Path) -> str:
|
||||
"""
|
||||
Find out what sort of thing a path is.
|
||||
"""
|
||||
assert p.exists(), 'path does not exist'
|
||||
for method, name in path_type_labels.items():
|
||||
if getattr(p, method)():
|
||||
return name
|
||||
|
||||
return 'unknown'
|
||||
@@ -0,0 +1 @@
|
||||
VERSION = '2.1.0'
|
||||
Reference in New Issue
Block a user