from __future__ import annotations from functools import lru_cache from typing import List, Literal import os from dotenv import load_dotenv, find_dotenv from pydantic import AliasChoices, Field, field_validator, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict from src.infrastructure.vault import create_hvac_client_from_approle, read_kv2_secret env_file = find_dotenv(".env") if env_file: load_dotenv(env_file) def normalize_vault_base_url(raw: str) -> str: u = raw.strip().rstrip('/') if not u: return raw.strip() if '://' not in u: return f'https://{u}' return u class Settings(BaseSettings): VAULT_ADDR: str = Field(default='http://localhost:8200') VAULT_ROLE_ID: str = Field(..., description='AppRole role_id') VAULT_SECRET_ID: str = Field( ..., description='AppRole secret_id', validation_alias=AliasChoices('VAULT_SECRET_ID', 'VAULT_SECRET_TOKEN'), ) VAULT_NAMESPACE: str | None = Field(default=None) VAULT_MOUNT_POINT: str = Field(default='dev-secrets') VAULT_JWT_KID_PATH: str = 'jwt/kid' VAULT_JWT_KIDS_PREFIX: str = 'jwt/kids' JWT_KEYS_REFRESH_SECONDS: int = 3600 DATABASE_HOST: str DATABASE_PORT: int = Field(default=5432, ge=1, le=65535) DATABASE_NAME: str DATABASE_USER: str DATABASE_PASSWORD: str DATABASE_POOL_SIZE: int = 10 DATABASE_MAX_OVERFLOW: int = 20 DATABASE_POOL_TIMEOUT: int = 30 DATABASE_POOL_RECYCLE: int = 3600 DATABASE_ECHO: bool = False CSRF_SECRET_KEY: str = Field(min_length=32) CSRF_COOKIE_SECURE: bool = False CSRF_COOKIE_HTTPONLY: bool = True CSRF_COOKIE_SAMESITE: Literal['Lax', 'Strict', 'None'] = 'Lax' CSRF_COOKIE_PATH: str = '/' CSRF_COOKIE_DOMAIN: str | None = None DOCS_USERNAME: str = 'admin' DOCS_PASSWORD: str = 'admin' JWT_ACCESS_TTL_SECONDS: int = 15 * 60 JWT_REFRESH_TTL_SECONDS: int = 30 * 24 * 60 * 60 JWT_ISSUER: str | None = None JWT_AUDIENCE: str | None = None JWT_ALGORITHM: str = 'RS256' REDIS_HOST: str = 'localhost' REDIS_PORT: int = 6379 REDIS_PASSWORD: str | None = None REDIS_DB: int = 0 RABBIT_HOST: str = 'localhost' RABBIT_PORT: int = 5672 RABBIT_USER: str = 'guest' RABBIT_PASSWORD: str = 'guest' RABBIT_VHOST: str = '/' RABBIT_PUBLISH_PERSIST: bool = True RABBIT_CONNECT_TIMEOUT: int = 5 RABBIT_EMAIL_CODE_QUEUE: str = 'email.verification_code' CORS_ORIGINS: str = 'http://localhost:3000' CORS_ALLOW_CREDENTIALS: bool = True RATE_LIMIT_REQUESTS: int = 60 RATE_LIMIT_WINDOW: int = 60 LOG_LEVEL: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO' LOG_FORMAT: Literal['JSON', 'TEXT'] = 'TEXT' @field_validator('VAULT_ADDR', mode='before') @classmethod def vault_addr_scheme(cls, v): if v is None or not isinstance(v, str): return v return normalize_vault_base_url(v) @field_validator('CSRF_COOKIE_DOMAIN', mode='before') @classmethod def empty_csrf_domain_to_none(cls, v): if v is None or (isinstance(v, str) and not v.strip()): return None return v model_config = SettingsConfigDict( env_file='.env', env_file_encoding='utf-8', case_sensitive=True, extra='ignore', populate_by_name=True, ) @model_validator(mode='before') @classmethod def load_from_vault(cls, data: dict): if not isinstance(data, dict): return data addr_raw = data.get('VAULT_ADDR') or os.getenv('VAULT_ADDR') or 'http://localhost:8200' addr = normalize_vault_base_url(addr_raw) data['VAULT_ADDR'] = addr role_id = data.get('VAULT_ROLE_ID') or os.getenv('VAULT_ROLE_ID') secret_id = ( data.get('VAULT_SECRET_ID') or data.get('VAULT_SECRET_TOKEN') or os.getenv('VAULT_SECRET_ID') or os.getenv('VAULT_SECRET_TOKEN') ) namespace = data.get('VAULT_NAMESPACE') if namespace is None: namespace = os.getenv('VAULT_NAMESPACE') namespace = namespace if namespace else None mount = data.get('VAULT_MOUNT_POINT') or os.getenv('VAULT_MOUNT_POINT') or 'dev-secrets' if not role_id or not secret_id: raise RuntimeError( 'VAULT_ROLE_ID and VAULT_SECRET_ID (or VAULT_SECRET_TOKEN) are required for Vault AppRole' ) data['VAULT_ROLE_ID'] = str(role_id).strip() data['VAULT_SECRET_ID'] = str(secret_id).strip() client = create_hvac_client_from_approle( url=addr, role_id=role_id, secret_id=secret_id, namespace=namespace, timeout=5, ) def read_secret(path: str) -> dict: return read_kv2_secret(client=client, mount_point=mount, path=path) def read_secret_optional(path: str) -> dict: try: return read_secret(path) except Exception: return {} database = read_secret('database') csrf = read_secret('csrf') db_ci = {str(k).lower(): v for k, v in database.items()} def db_nonempty(key: str) -> bool: v = db_ci.get(key) if v is None: return False if isinstance(v, str) and not v.strip(): return False return True required_db = ['host', 'name', 'user', 'password', 'port'] missing_db = [k for k in required_db if not db_nonempty(k)] if missing_db: raise RuntimeError(f'Vault secret database missing non-empty keys: {missing_db}') data['DATABASE_HOST'] = str(db_ci['host']).strip() data['DATABASE_PORT'] = int(db_ci['port']) data['DATABASE_NAME'] = str(db_ci['name']).strip() data['DATABASE_USER'] = str(db_ci['user']).strip() data['DATABASE_PASSWORD'] = str(db_ci['password']).strip() csrf_secret = None for entry_key, entry_val in csrf.items(): if str(entry_key).lower() == 'key' and entry_val is not None and str(entry_val).strip(): csrf_secret = str(entry_val).strip() break if not csrf_secret: raise RuntimeError( 'Vault secret at csrf must contain a non-empty field named key (e.g. key=...)' ) data['CSRF_SECRET_KEY'] = csrf_secret rabbit = read_secret_optional('rabbitmq') if rabbit: r_ci = {str(k).lower(): v for k, v in rabbit.items()} def rb_set(field: str, env_key: str, *, as_int: bool = False) -> None: v = r_ci.get(field) if v is None: return if isinstance(v, str) and not v.strip(): return if as_int: data[env_key] = int(v) else: data[env_key] = str(v).strip() rb_set('host', 'RABBIT_HOST') rb_set('port', 'RABBIT_PORT', as_int=True) rb_set('user', 'RABBIT_USER') rb_set('password', 'RABBIT_PASSWORD') rb_set('vhost', 'RABBIT_VHOST') return data def cors_origins_list(self) -> List[str]: return [o.strip() for o in self.CORS_ORIGINS.split(',') if o.strip()] @property def DATABASE_URL(self) -> str: return ( f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" ) @property def REDIS_URL(self) -> str: auth = f":{self.REDIS_PASSWORD}@" if self.REDIS_PASSWORD else "" return f"redis://{auth}{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}" @property def RABBIT_URL(self) -> str: vhost = "%2F" if self.RABBIT_VHOST == "/" else self.RABBIT_VHOST.lstrip("/") return f"amqp://{self.RABBIT_USER}:{self.RABBIT_PASSWORD}@{self.RABBIT_HOST}:{self.RABBIT_PORT}/{vhost}" @property def EXCLUDED_PATHS(self) -> List[str]: return ['/docs', '/redoc', '/openapi.json', '/ping', '/health'] @lru_cache(maxsize=1) def get_settings() -> Settings: return Settings() settings = get_settings()