312 lines
13 KiB
Python
312 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
from functools import lru_cache
|
|
from typing import Any, List, Literal, Mapping
|
|
from urllib.parse import quote
|
|
|
|
from dotenv import find_dotenv, load_dotenv
|
|
from pydantic import Field, PrivateAttr
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
|
|
from src.infrastructure.vault.client import VaultClient
|
|
|
|
env_file = find_dotenv('.env')
|
|
if env_file:
|
|
load_dotenv(env_file)
|
|
|
|
|
|
def _as_int(value: object, default: int) -> int:
|
|
if value is None:
|
|
return default
|
|
if isinstance(value, int):
|
|
return value
|
|
return int(str(value).strip())
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8', case_sensitive=True, extra='ignore')
|
|
|
|
_vault_database_secrets: dict[str, Any] = PrivateAttr(default_factory=dict)
|
|
|
|
VAULT_ADDR: str = 'https://corp.vault.elcsa.ru'
|
|
VAULT_ROLE_ID: str = ''
|
|
VAULT_SECRET_ID: str = ''
|
|
VAULT_NAMESPACE: str | None = None
|
|
VAULT_MOUNT_POINT: str = 'dev-secrets'
|
|
VAULT_DATABASE_SECRET_PATH: str = 'database'
|
|
VAULT_RABBIT_SECRET_PATH: str = 'rabbitmq'
|
|
VAULT_CSRF_SECRET_PATH: str = 'csrf'
|
|
VAULT_DOCS_SECRET_PATH: str = 'docs'
|
|
VAULT_JWT_KID_PATH: str = 'jwt/kid'
|
|
VAULT_JWT_KIDS_PREFIX: str = 'jwt/kids'
|
|
VAULT_S3_SECRET_PATH: str = 's3/avatars'
|
|
|
|
DATABASE_URL_DIRECT: str | None = Field(default=None, validation_alias='DATABASE_URL')
|
|
DATABASE_HOST: str = ''
|
|
DATABASE_PORT: int = Field(default=5432, ge=1, le=65535)
|
|
DATABASE_NAME: str = ''
|
|
DATABASE_USER: str = ''
|
|
DATABASE_PASSWORD: str = ''
|
|
|
|
DATABASE_POOL_SIZE: int = 10
|
|
DATABASE_MAX_OVERFLOW: int = 20
|
|
DATABASE_POOL_TIMEOUT: int = 30
|
|
DATABASE_POOL_RECYCLE: int = 3600
|
|
DATABASE_ECHO: bool = False
|
|
|
|
CSRF_SECRET_KEY: str = Field(
|
|
default='change-me-change-me-change-me-change-me',
|
|
min_length=32,
|
|
)
|
|
|
|
CSRF_COOKIE_SECURE: bool = False
|
|
CSRF_COOKIE_HTTPONLY: bool = True
|
|
CSRF_COOKIE_SAMESITE: Literal['Lax', 'Strict', 'None'] = 'Lax'
|
|
CSRF_COOKIE_PATH: str = '/'
|
|
CSRF_COOKIE_DOMAIN: str | None = None
|
|
|
|
DOCS_USERNAME: str = 'admin'
|
|
DOCS_PASSWORD: str = 'admin'
|
|
|
|
JWT_ACCESS_TTL_SECONDS: int = 15 * 60
|
|
JWT_REFRESH_TTL_SECONDS: int = 30 * 24 * 60 * 60
|
|
JWT_ISSUER: str | None = None
|
|
JWT_AUDIENCE: str | None = None
|
|
JWT_ALGORITHM: str = 'RS256'
|
|
JWT_KEYS_REFRESH_SECONDS: int = 3600
|
|
|
|
REDIS_HOST: str = 'keydb'
|
|
REDIS_PORT: int = 6379
|
|
REDIS_PASSWORD: str | None = None
|
|
REDIS_DB: int = 0
|
|
|
|
RABBIT_HOST: str = 'localhost'
|
|
RABBIT_PORT: int = 5672
|
|
RABBIT_USER: str = 'guest'
|
|
RABBIT_PASSWORD: str = 'guest'
|
|
RABBIT_VHOST: str = '/'
|
|
|
|
RABBIT_PUBLISH_PERSIST: bool = True
|
|
RABBIT_CONNECT_TIMEOUT: int = 5
|
|
RABBIT_EMAIL_CODE_QUEUE: str = 'email.verification_code'
|
|
|
|
S3_BUCKET: str = ''
|
|
S3_REGION: str = 'us-east-1'
|
|
S3_ACCESS_KEY_ID: str = ''
|
|
S3_SECRET_ACCESS_KEY: str = ''
|
|
S3_ENDPOINT_URL: str = ''
|
|
S3_PUBLIC_BASE_URL: str = ''
|
|
S3_REGRU_PUBLIC_WEBSITE_HOST: bool = False
|
|
S3_AVATAR_KEY_PREFIX: str = 'avatars'
|
|
|
|
LOG_LEVEL: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO'
|
|
LOG_FORMAT: Literal['JSON', 'TEXT'] = 'JSON'
|
|
|
|
def _get_vault_secret(self, secrets: dict[str, Any], *keys: str) -> str:
|
|
for key in keys:
|
|
value = secrets.get(key)
|
|
if value is not None and str(value).strip() != '':
|
|
return str(value)
|
|
return ''
|
|
|
|
def _reset_s3_config(self) -> None:
|
|
object.__setattr__(self, 'S3_BUCKET', '')
|
|
object.__setattr__(self, 'S3_ACCESS_KEY_ID', '')
|
|
object.__setattr__(self, 'S3_SECRET_ACCESS_KEY', '')
|
|
object.__setattr__(self, 'S3_ENDPOINT_URL', '')
|
|
object.__setattr__(self, 'S3_PUBLIC_BASE_URL', '')
|
|
object.__setattr__(self, 'S3_REGION', 'us-east-1')
|
|
object.__setattr__(self, 'S3_REGRU_PUBLIC_WEBSITE_HOST', False)
|
|
object.__setattr__(self, 'S3_AVATAR_KEY_PREFIX', 'avatars')
|
|
|
|
@staticmethod
|
|
def _vault_kv(mapping: Mapping[str, Any], *keys: str) -> Any:
|
|
for k in keys:
|
|
if k in mapping and mapping[k] is not None:
|
|
return mapping[k]
|
|
return None
|
|
|
|
def _apply_s3_from_vault_secret(self, s3: dict[str, Any]) -> None:
|
|
bucket_name = (
|
|
self._vault_kv(s3, 'bucket_name', 'BUCKET_NAME', 'bucket')
|
|
or self._vault_kv(s3, 'S3_BUCKET', 'bucketName')
|
|
)
|
|
endpoint_url = (
|
|
self._vault_kv(s3, 's3_endpoint_url', 'S3_ENDPOINT_URL', 'endpoint_url', 'ENDPOINT_URL')
|
|
or self._vault_kv(s3, 'endpoint')
|
|
)
|
|
ak = (
|
|
self._vault_kv(s3, 's3_access_key_id', 'S3_ACCESS_KEY_ID', 'ACCESS_KEY_ID', 'access_key_id')
|
|
or self._vault_kv(s3, 'AWS_ACCESS_KEY_ID')
|
|
)
|
|
sk = (
|
|
self._vault_kv(s3, 's3_secret_access_key', 'S3_SECRET_ACCESS_KEY', 'SECRET_ACCESS_KEY')
|
|
or self._vault_kv(s3, 'AWS_SECRET_ACCESS_KEY')
|
|
)
|
|
if bucket_name is None or str(bucket_name).strip() == '':
|
|
raise ValueError('Vault S3 secret must contain bucket_name')
|
|
if endpoint_url is None or str(endpoint_url).strip() == '':
|
|
raise ValueError('Vault S3 secret must contain s3_endpoint_url')
|
|
if ak is None or str(ak).strip() == '':
|
|
raise ValueError('Vault S3 secret must contain s3_access_key_id')
|
|
if sk is None or str(sk).strip() == '':
|
|
raise ValueError('Vault S3 secret must contain s3_secret_access_key')
|
|
object.__setattr__(self, 'S3_BUCKET', str(bucket_name).strip())
|
|
object.__setattr__(self, 'S3_ENDPOINT_URL', str(endpoint_url).strip())
|
|
object.__setattr__(self, 'S3_ACCESS_KEY_ID', str(ak).strip())
|
|
object.__setattr__(self, 'S3_SECRET_ACCESS_KEY', str(sk).strip())
|
|
region = (
|
|
self._vault_kv(s3, 's3_region', 'S3_REGION', 'region')
|
|
)
|
|
if region is not None and str(region).strip() != '':
|
|
object.__setattr__(self, 'S3_REGION', str(region).strip())
|
|
public_base = (
|
|
self._vault_kv(s3, 's3_public_base_url', 'S3_PUBLIC_BASE_URL', 'public_base_url')
|
|
or self._vault_kv(s3, 'public_url')
|
|
)
|
|
if public_base is not None and str(public_base).strip() != '':
|
|
object.__setattr__(self, 'S3_PUBLIC_BASE_URL', str(public_base).strip())
|
|
prefix = self._vault_kv(s3, 'avatar_key_prefix', 'S3_AVATAR_KEY_PREFIX', 's3_avatar_key_prefix')
|
|
if prefix is not None and str(prefix).strip() != '':
|
|
object.__setattr__(self, 'S3_AVATAR_KEY_PREFIX', str(prefix).strip())
|
|
rf = (
|
|
self._vault_kv(s3, 's3_reg_ru_public_website_host', 'S3_REGRU_PUBLIC_WEBSITE_HOST')
|
|
)
|
|
if rf is not None:
|
|
v = str(rf).strip().lower()
|
|
if v in {'1', 'true', 'yes', 'on'}:
|
|
object.__setattr__(self, 'S3_REGRU_PUBLIC_WEBSITE_HOST', True)
|
|
elif v in {'0', 'false', 'no', 'off'}:
|
|
object.__setattr__(self, 'S3_REGRU_PUBLIC_WEBSITE_HOST', False)
|
|
|
|
def model_post_init(self, __context: Any) -> None:
|
|
self._reset_s3_config()
|
|
if not self.VAULT_ROLE_ID.strip() or not self.VAULT_SECRET_ID.strip():
|
|
if not self.DATABASE_URL:
|
|
raise ValueError(
|
|
'Set VAULT_ROLE_ID and VAULT_SECRET_ID for Vault, or set DATABASE_URL '
|
|
'(or DATABASE_HOST, DATABASE_USER, DATABASE_PASSWORD, DATABASE_NAME) in the environment',
|
|
)
|
|
return
|
|
|
|
client = VaultClient(
|
|
addr=self.VAULT_ADDR,
|
|
role_id=self.VAULT_ROLE_ID,
|
|
secret_id=self.VAULT_SECRET_ID,
|
|
namespace=self.VAULT_NAMESPACE,
|
|
mount_point=self.VAULT_MOUNT_POINT,
|
|
)
|
|
|
|
db = client.read_secret(self.VAULT_DATABASE_SECRET_PATH)
|
|
object.__setattr__(self, '_vault_database_secrets', db)
|
|
|
|
def kv(d: dict[str, Any], *keys: str) -> Any:
|
|
for k in keys:
|
|
if k in d and d[k] is not None:
|
|
return d[k]
|
|
return None
|
|
|
|
if kv(db, 'HOST', 'host') is not None:
|
|
object.__setattr__(self, 'DATABASE_HOST', str(kv(db, 'HOST', 'host')))
|
|
if kv(db, 'PORT', 'port') is not None:
|
|
object.__setattr__(self, 'DATABASE_PORT', _as_int(kv(db, 'PORT', 'port'), self.DATABASE_PORT))
|
|
if kv(db, 'NAME', 'name') is not None:
|
|
object.__setattr__(self, 'DATABASE_NAME', str(kv(db, 'NAME', 'name')))
|
|
if kv(db, 'USER', 'user') is not None:
|
|
object.__setattr__(self, 'DATABASE_USER', str(kv(db, 'USER', 'user')))
|
|
if kv(db, 'PASSWORD', 'password') is not None:
|
|
object.__setattr__(self, 'DATABASE_PASSWORD', str(kv(db, 'PASSWORD', 'password')))
|
|
|
|
rabbit = client.read_secret_optional(self.VAULT_RABBIT_SECRET_PATH)
|
|
if rabbit:
|
|
if kv(rabbit, 'HOST', 'host') is not None:
|
|
object.__setattr__(self, 'RABBIT_HOST', str(kv(rabbit, 'HOST', 'host')))
|
|
if kv(rabbit, 'PORT', 'port') is not None:
|
|
object.__setattr__(self, 'RABBIT_PORT', _as_int(kv(rabbit, 'PORT', 'port'), self.RABBIT_PORT))
|
|
if kv(rabbit, 'USER', 'user') is not None:
|
|
object.__setattr__(self, 'RABBIT_USER', str(kv(rabbit, 'USER', 'user')))
|
|
if kv(rabbit, 'PASSWORD', 'password') is not None:
|
|
object.__setattr__(self, 'RABBIT_PASSWORD', str(kv(rabbit, 'PASSWORD', 'password')))
|
|
if kv(rabbit, 'VHOST', 'vhost') is not None:
|
|
object.__setattr__(self, 'RABBIT_VHOST', str(kv(rabbit, 'VHOST', 'vhost')))
|
|
|
|
csrf = client.read_secret_optional(self.VAULT_CSRF_SECRET_PATH)
|
|
if csrf and kv(csrf, 'KEY', 'key') is not None:
|
|
key = str(kv(csrf, 'KEY', 'key'))
|
|
if len(key) >= 32:
|
|
object.__setattr__(self, 'CSRF_SECRET_KEY', key)
|
|
|
|
docs = client.read_secret_optional(self.VAULT_DOCS_SECRET_PATH)
|
|
if docs:
|
|
u = docs.get('DOCS_USERNAME') or docs.get('USERNAME')
|
|
p = docs.get('DOCS_PASSWORD') or docs.get('PASSWORD')
|
|
if u is not None:
|
|
object.__setattr__(self, 'DOCS_USERNAME', str(u))
|
|
if p is not None:
|
|
object.__setattr__(self, 'DOCS_PASSWORD', str(p))
|
|
|
|
s3_rel_path = self.VAULT_S3_SECRET_PATH.strip()
|
|
if s3_rel_path:
|
|
s3_secret_data = client.read_secret_optional(s3_rel_path)
|
|
if s3_secret_data:
|
|
self._apply_s3_from_vault_secret(s3_secret_data)
|
|
|
|
if not self.DATABASE_URL:
|
|
raise ValueError('Database URL could not be built from Vault database secret')
|
|
|
|
@property
|
|
def DATABASE_URL(self) -> str:
|
|
direct = (self.DATABASE_URL_DIRECT or '').strip()
|
|
if direct:
|
|
return direct
|
|
|
|
ready_url = self._get_vault_secret(
|
|
self._vault_database_secrets,
|
|
'DATABASE_URL',
|
|
'database_url',
|
|
)
|
|
if ready_url:
|
|
return ready_url
|
|
|
|
host = self._get_vault_secret(self._vault_database_secrets, 'host', 'HOST')
|
|
port = self._get_vault_secret(self._vault_database_secrets, 'port', 'PORT') or str(self.DATABASE_PORT)
|
|
user = self._get_vault_secret(self._vault_database_secrets, 'user', 'USER')
|
|
password = self._get_vault_secret(self._vault_database_secrets, 'password', 'PASSWORD')
|
|
name = self._get_vault_secret(self._vault_database_secrets, 'name', 'NAME', 'database', 'DATABASE')
|
|
if not host or not user or not password or not name:
|
|
h = (self.DATABASE_HOST or '').strip()
|
|
u = (self.DATABASE_USER or '').strip()
|
|
p = (self.DATABASE_PASSWORD or '').strip()
|
|
n = (self.DATABASE_NAME or '').strip()
|
|
if h and u and p and n:
|
|
quoted_user = quote(u, safe='')
|
|
quoted_password = quote(p, safe='')
|
|
po = str(self.DATABASE_PORT)
|
|
return f'postgresql+asyncpg://{quoted_user}:{quoted_password}@{h}:{po}/{n}'
|
|
return ''
|
|
quoted_user = quote(user, safe='')
|
|
quoted_password = quote(password, safe='')
|
|
return f'postgresql+asyncpg://{quoted_user}:{quoted_password}@{host}:{port}/{name}'
|
|
|
|
@property
|
|
def REDIS_URL(self) -> str:
|
|
auth = f':{self.REDIS_PASSWORD}@' if self.REDIS_PASSWORD else ''
|
|
return f'redis://{auth}{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}'
|
|
|
|
@property
|
|
def RABBIT_URL(self) -> str:
|
|
vhost = '%2F' if self.RABBIT_VHOST == '/' else self.RABBIT_VHOST.lstrip('/')
|
|
return f'amqp://{self.RABBIT_USER}:{self.RABBIT_PASSWORD}@{self.RABBIT_HOST}:{self.RABBIT_PORT}/{vhost}'
|
|
|
|
@property
|
|
def EXCLUDED_PATHS(self) -> List[str]:
|
|
return ['/docs', '/redoc', '/openapi.json', '/ping', '/health']
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_settings() -> Settings:
|
|
return Settings()
|
|
|
|
|
|
settings = get_settings()
|