from __future__ import annotations from functools import lru_cache from typing import Any, List, Literal, Mapping from urllib.parse import quote from dotenv import find_dotenv, load_dotenv from pydantic import Field, PrivateAttr from pydantic_settings import BaseSettings, SettingsConfigDict from src.infrastructure.vault.client import VaultClient env_file = find_dotenv('.env') if env_file: load_dotenv(env_file) def _as_int(value: object, default: int) -> int: if value is None: return default if isinstance(value, int): return value return int(str(value).strip()) class Settings(BaseSettings): model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8', case_sensitive=True, extra='ignore') _vault_database_secrets: dict[str, Any] = PrivateAttr(default_factory=dict) VAULT_ADDR: str = 'https://corp.vault.elcsa.ru' VAULT_ROLE_ID: str = '' VAULT_SECRET_ID: str = '' VAULT_NAMESPACE: str | None = None VAULT_MOUNT_POINT: str = 'dev-secrets' VAULT_DATABASE_SECRET_PATH: str = 'database' VAULT_RABBIT_SECRET_PATH: str = 'rabbitmq' VAULT_CSRF_SECRET_PATH: str = 'csrf' VAULT_DOCS_SECRET_PATH: str = 'docs' VAULT_JWT_KID_PATH: str = 'jwt/kid' VAULT_JWT_KIDS_PREFIX: str = 'jwt/kids' VAULT_S3_SECRET_PATH: str = 's3/avatars' DATABASE_URL_DIRECT: str | None = Field(default=None, validation_alias='DATABASE_URL') DATABASE_HOST: str = '' DATABASE_PORT: int = Field(default=5432, ge=1, le=65535) DATABASE_NAME: str = '' DATABASE_USER: str = '' DATABASE_PASSWORD: str = '' DATABASE_POOL_SIZE: int = 10 DATABASE_MAX_OVERFLOW: int = 20 DATABASE_POOL_TIMEOUT: int = 30 DATABASE_POOL_RECYCLE: int = 3600 DATABASE_ECHO: bool = False CSRF_SECRET_KEY: str = Field( default='change-me-change-me-change-me-change-me', min_length=32, ) CSRF_COOKIE_SECURE: bool = False CSRF_COOKIE_HTTPONLY: bool = True CSRF_COOKIE_SAMESITE: Literal['Lax', 'Strict', 'None'] = 'Lax' CSRF_COOKIE_PATH: str = '/' CSRF_COOKIE_DOMAIN: str | None = None DOCS_USERNAME: str = 'admin' DOCS_PASSWORD: str = 'admin' JWT_ACCESS_TTL_SECONDS: int = 15 * 60 JWT_REFRESH_TTL_SECONDS: int = 30 * 24 * 60 * 60 JWT_ISSUER: str | None = None JWT_AUDIENCE: str | None = None JWT_ALGORITHM: str = 'RS256' JWT_KEYS_REFRESH_SECONDS: int = 3600 REDIS_HOST: str = 'keydb' REDIS_PORT: int = 6379 REDIS_PASSWORD: str | None = None REDIS_DB: int = 0 RABBIT_HOST: str = 'localhost' RABBIT_PORT: int = 5672 RABBIT_USER: str = 'guest' RABBIT_PASSWORD: str = 'guest' RABBIT_VHOST: str = '/' RABBIT_PUBLISH_PERSIST: bool = True RABBIT_CONNECT_TIMEOUT: int = 5 RABBIT_EMAIL_CODE_QUEUE: str = 'email.verification_code' S3_BUCKET: str = '' S3_REGION: str = 'us-east-1' S3_ACCESS_KEY_ID: str = '' S3_SECRET_ACCESS_KEY: str = '' S3_ENDPOINT_URL: str = '' S3_PUBLIC_BASE_URL: str = '' S3_REGRU_PUBLIC_WEBSITE_HOST: bool = True S3_AVATAR_KEY_PREFIX: str = 'avatars' LOG_LEVEL: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO' LOG_FORMAT: Literal['JSON', 'TEXT'] = 'JSON' def _get_vault_secret(self, secrets: dict[str, Any], *keys: str) -> str: for key in keys: value = secrets.get(key) if value is not None and str(value).strip() != '': return str(value) return '' def _reset_s3_config(self) -> None: object.__setattr__(self, 'S3_BUCKET', '') object.__setattr__(self, 'S3_ACCESS_KEY_ID', '') object.__setattr__(self, 'S3_SECRET_ACCESS_KEY', '') object.__setattr__(self, 'S3_ENDPOINT_URL', '') object.__setattr__(self, 'S3_PUBLIC_BASE_URL', '') object.__setattr__(self, 'S3_REGION', 'us-east-1') object.__setattr__(self, 'S3_REGRU_PUBLIC_WEBSITE_HOST', True) object.__setattr__(self, 'S3_AVATAR_KEY_PREFIX', 'avatars') @staticmethod def _vault_kv(mapping: Mapping[str, Any], *keys: str) -> Any: for k in keys: if k in mapping and mapping[k] is not None: return mapping[k] return None def _apply_s3_from_vault_secret(self, s3: dict[str, Any]) -> None: bucket_name = ( self._vault_kv(s3, 'bucket_name', 'BUCKET_NAME', 'bucket') or self._vault_kv(s3, 'S3_BUCKET', 'bucketName') ) endpoint_url = ( self._vault_kv(s3, 's3_endpoint_url', 'S3_ENDPOINT_URL', 'endpoint_url', 'ENDPOINT_URL') or self._vault_kv(s3, 'endpoint') ) ak = ( self._vault_kv(s3, 's3_access_key_id', 'S3_ACCESS_KEY_ID', 'ACCESS_KEY_ID', 'access_key_id') or self._vault_kv(s3, 'AWS_ACCESS_KEY_ID') ) sk = ( self._vault_kv(s3, 's3_secret_access_key', 'S3_SECRET_ACCESS_KEY', 'SECRET_ACCESS_KEY') or self._vault_kv(s3, 'AWS_SECRET_ACCESS_KEY') ) if bucket_name is None or str(bucket_name).strip() == '': raise ValueError('Vault S3 secret must contain bucket_name') if endpoint_url is None or str(endpoint_url).strip() == '': raise ValueError('Vault S3 secret must contain s3_endpoint_url') if ak is None or str(ak).strip() == '': raise ValueError('Vault S3 secret must contain s3_access_key_id') if sk is None or str(sk).strip() == '': raise ValueError('Vault S3 secret must contain s3_secret_access_key') object.__setattr__(self, 'S3_BUCKET', str(bucket_name).strip()) object.__setattr__(self, 'S3_ENDPOINT_URL', str(endpoint_url).strip()) object.__setattr__(self, 'S3_ACCESS_KEY_ID', str(ak).strip()) object.__setattr__(self, 'S3_SECRET_ACCESS_KEY', str(sk).strip()) region = ( self._vault_kv(s3, 's3_region', 'S3_REGION', 'region') ) if region is not None and str(region).strip() != '': object.__setattr__(self, 'S3_REGION', str(region).strip()) public_base = ( self._vault_kv(s3, 's3_public_base_url', 'S3_PUBLIC_BASE_URL', 'public_base_url') or self._vault_kv(s3, 'public_url') ) if public_base is not None and str(public_base).strip() != '': object.__setattr__(self, 'S3_PUBLIC_BASE_URL', str(public_base).strip()) prefix = self._vault_kv(s3, 'avatar_key_prefix', 'S3_AVATAR_KEY_PREFIX', 's3_avatar_key_prefix') if prefix is not None and str(prefix).strip() != '': object.__setattr__(self, 'S3_AVATAR_KEY_PREFIX', str(prefix).strip()) rf = ( self._vault_kv(s3, 's3_reg_ru_public_website_host', 'S3_REGRU_PUBLIC_WEBSITE_HOST') ) if rf is not None: v = str(rf).strip().lower() if v in {'1', 'true', 'yes', 'on'}: object.__setattr__(self, 'S3_REGRU_PUBLIC_WEBSITE_HOST', True) elif v in {'0', 'false', 'no', 'off'}: object.__setattr__(self, 'S3_REGRU_PUBLIC_WEBSITE_HOST', False) def model_post_init(self, __context: Any) -> None: self._reset_s3_config() if not self.VAULT_ROLE_ID.strip() or not self.VAULT_SECRET_ID.strip(): if not self.DATABASE_URL: raise ValueError( 'Set VAULT_ROLE_ID and VAULT_SECRET_ID for Vault, or set DATABASE_URL ' '(or DATABASE_HOST, DATABASE_USER, DATABASE_PASSWORD, DATABASE_NAME) in the environment', ) return client = VaultClient( addr=self.VAULT_ADDR, role_id=self.VAULT_ROLE_ID, secret_id=self.VAULT_SECRET_ID, namespace=self.VAULT_NAMESPACE, mount_point=self.VAULT_MOUNT_POINT, ) db = client.read_secret(self.VAULT_DATABASE_SECRET_PATH) object.__setattr__(self, '_vault_database_secrets', db) def kv(d: dict[str, Any], *keys: str) -> Any: for k in keys: if k in d and d[k] is not None: return d[k] return None if kv(db, 'HOST', 'host') is not None: object.__setattr__(self, 'DATABASE_HOST', str(kv(db, 'HOST', 'host'))) if kv(db, 'PORT', 'port') is not None: object.__setattr__(self, 'DATABASE_PORT', _as_int(kv(db, 'PORT', 'port'), self.DATABASE_PORT)) if kv(db, 'NAME', 'name') is not None: object.__setattr__(self, 'DATABASE_NAME', str(kv(db, 'NAME', 'name'))) if kv(db, 'USER', 'user') is not None: object.__setattr__(self, 'DATABASE_USER', str(kv(db, 'USER', 'user'))) if kv(db, 'PASSWORD', 'password') is not None: object.__setattr__(self, 'DATABASE_PASSWORD', str(kv(db, 'PASSWORD', 'password'))) rabbit = client.read_secret_optional(self.VAULT_RABBIT_SECRET_PATH) if rabbit: if kv(rabbit, 'HOST', 'host') is not None: object.__setattr__(self, 'RABBIT_HOST', str(kv(rabbit, 'HOST', 'host'))) if kv(rabbit, 'PORT', 'port') is not None: object.__setattr__(self, 'RABBIT_PORT', _as_int(kv(rabbit, 'PORT', 'port'), self.RABBIT_PORT)) if kv(rabbit, 'USER', 'user') is not None: object.__setattr__(self, 'RABBIT_USER', str(kv(rabbit, 'USER', 'user'))) if kv(rabbit, 'PASSWORD', 'password') is not None: object.__setattr__(self, 'RABBIT_PASSWORD', str(kv(rabbit, 'PASSWORD', 'password'))) if kv(rabbit, 'VHOST', 'vhost') is not None: object.__setattr__(self, 'RABBIT_VHOST', str(kv(rabbit, 'VHOST', 'vhost'))) csrf = client.read_secret_optional(self.VAULT_CSRF_SECRET_PATH) if csrf and kv(csrf, 'KEY', 'key') is not None: key = str(kv(csrf, 'KEY', 'key')) if len(key) >= 32: object.__setattr__(self, 'CSRF_SECRET_KEY', key) docs = client.read_secret_optional(self.VAULT_DOCS_SECRET_PATH) if docs: u = docs.get('DOCS_USERNAME') or docs.get('USERNAME') p = docs.get('DOCS_PASSWORD') or docs.get('PASSWORD') if u is not None: object.__setattr__(self, 'DOCS_USERNAME', str(u)) if p is not None: object.__setattr__(self, 'DOCS_PASSWORD', str(p)) s3_rel_path = self.VAULT_S3_SECRET_PATH.strip() if s3_rel_path: try: s3_secret_data = client.read_secret(s3_rel_path) except Exception as exc: raise ValueError( f'Vault S3 secret not readable at mount {self.VAULT_MOUNT_POINT}/{s3_rel_path}: {exc!r}' ) from exc self._apply_s3_from_vault_secret(s3_secret_data) if not self.DATABASE_URL: raise ValueError('Database URL could not be built from Vault database secret') @property def DATABASE_URL(self) -> str: direct = (self.DATABASE_URL_DIRECT or '').strip() if direct: return direct ready_url = self._get_vault_secret( self._vault_database_secrets, 'DATABASE_URL', 'database_url', ) if ready_url: return ready_url host = self._get_vault_secret(self._vault_database_secrets, 'host', 'HOST') port = self._get_vault_secret(self._vault_database_secrets, 'port', 'PORT') or str(self.DATABASE_PORT) user = self._get_vault_secret(self._vault_database_secrets, 'user', 'USER') password = self._get_vault_secret(self._vault_database_secrets, 'password', 'PASSWORD') name = self._get_vault_secret(self._vault_database_secrets, 'name', 'NAME', 'database', 'DATABASE') if not host or not user or not password or not name: h = (self.DATABASE_HOST or '').strip() u = (self.DATABASE_USER or '').strip() p = (self.DATABASE_PASSWORD or '').strip() n = (self.DATABASE_NAME or '').strip() if h and u and p and n: quoted_user = quote(u, safe='') quoted_password = quote(p, safe='') po = str(self.DATABASE_PORT) return f'postgresql+asyncpg://{quoted_user}:{quoted_password}@{h}:{po}/{n}' return '' quoted_user = quote(user, safe='') quoted_password = quote(password, safe='') return f'postgresql+asyncpg://{quoted_user}:{quoted_password}@{host}:{port}/{name}' @property def REDIS_URL(self) -> str: auth = f':{self.REDIS_PASSWORD}@' if self.REDIS_PASSWORD else '' return f'redis://{auth}{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}' @property def RABBIT_URL(self) -> str: vhost = '%2F' if self.RABBIT_VHOST == '/' else self.RABBIT_VHOST.lstrip('/') return f'amqp://{self.RABBIT_USER}:{self.RABBIT_PASSWORD}@{self.RABBIT_HOST}:{self.RABBIT_PORT}/{vhost}' @property def EXCLUDED_PATHS(self) -> List[str]: return ['/docs', '/redoc', '/openapi.json', '/ping', '/health'] @lru_cache(maxsize=1) def get_settings() -> Settings: return Settings() settings = get_settings()