from __future__ import annotations from functools import lru_cache from typing import List, Literal import os from dotenv import load_dotenv, find_dotenv from pydantic import AliasChoices, Field, field_validator, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict from src.infrastructure.vault import create_hvac_client_from_approle, read_kv2_secret env_file = find_dotenv(".env") if env_file: load_dotenv(env_file) def normalize_vault_base_url(raw: str) -> str: u = raw.strip().rstrip('/') if not u: return raw.strip() if '://' not in u: return f'https://{u}' return u class Settings(BaseSettings): VAULT_ADDR: str = Field(default='http://localhost:8200') VAULT_ROLE_ID: str = Field(..., description='AppRole role_id') VAULT_SECRET_ID: str = Field( ..., description='AppRole secret_id', validation_alias=AliasChoices('VAULT_SECRET_ID', 'VAULT_SECRET_TOKEN'), ) VAULT_NAMESPACE: str | None = Field(default=None) VAULT_MOUNT_POINT: str = Field(default='dev-secrets') VAULT_JWT_KID_PATH: str = 'jwt/kid' VAULT_JWT_KIDS_PREFIX: str = 'jwt/kids' VAULT_CRYPTO_MASTER_KEY_PATH: str = 'crypto' VAULT_LEGAL_DOCS_S3_SECRET_PATH: str = 's3/b2b_docs' JWT_KEYS_REFRESH_SECONDS: int = 3600 DATABASE_HOST: str DATABASE_PORT: int = Field(default=5432, ge=1, le=65535) DATABASE_NAME: str DATABASE_USER: str DATABASE_PASSWORD: str DATABASE_POOL_SIZE: int = 10 DATABASE_MAX_OVERFLOW: int = 20 DATABASE_POOL_TIMEOUT: int = 30 DATABASE_POOL_RECYCLE: int = 3600 DATABASE_ECHO: bool = False ADMIN_COOKIE_SECURE: bool = False ADMIN_COOKIE_DOMAIN: str | None = '.elcsa.ru' CORS_ALLOW_ORIGIN_REGEX: str = r'https?://([a-z0-9-]+\.)*elcsa\.ru(:\d+)?$' DOCS_USERNAME: str = 'admin' DOCS_PASSWORD: str = 'admin' JWT_ACCESS_TTL_SECONDS: int = 8 * 60 * 60 JWT_REFRESH_TTL_SECONDS: int = 30 * 24 * 60 * 60 ADMIN_JWT_ISSUER: str | None = 'admin-service' JWT_AUDIENCE: str | None = None JWT_ALGORITHM: str = 'RS256' REDIS_HOST: str = 'localhost' REDIS_PORT: int = 6379 REDIS_PASSWORD: str | None = None REDIS_DB: int = 0 LEGAL_DOCS_S3_BUCKET: str = '' LEGAL_DOCS_S3_REGION: str = 'us-east-1' LEGAL_DOCS_S3_ACCESS_KEY_ID: str = '' LEGAL_DOCS_S3_SECRET_ACCESS_KEY: str = '' LEGAL_DOCS_S3_ENDPOINT_URL: str = '' LEGAL_DOCS_S3_KEY_PREFIX: str = 'legal-docs' LEGAL_DOCS_S3_PRESIGNED_TTL_SECONDS: int = 3600 RATE_LIMIT_REQUESTS: int = 60 RATE_LIMIT_WINDOW: int = 60 LOG_LEVEL: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO' LOG_FORMAT: Literal['JSON', 'TEXT'] = 'TEXT' @field_validator('VAULT_ADDR', mode='before') @classmethod def vault_addr_scheme(cls, v): if v is None or not isinstance(v, str): return v return normalize_vault_base_url(v) @field_validator('ADMIN_COOKIE_DOMAIN', mode='before') @classmethod def normalize_admin_cookie_domain(cls, v): if v is None or (isinstance(v, str) and not v.strip()): return '.elcsa.ru' s = str(v).strip() sl = s.lower() if sl in ('.elcsa.ru', 'elcsa.ru'): return '.elcsa.ru' if sl.endswith('.elcsa.ru') and not sl.startswith('.'): return '.elcsa.ru' return s @field_validator('REDIS_PASSWORD', mode='before') @classmethod def empty_redis_password_to_none(cls, v): if v is None or (isinstance(v, str) and not v.strip()): return None return v model_config = SettingsConfigDict( env_file='.env', env_file_encoding='utf-8', case_sensitive=True, extra='ignore', populate_by_name=True, ) @staticmethod def _vault_kv(mapping: dict, *keys: str): for k in keys: if k in mapping and mapping[k] is not None: return mapping[k] return None @classmethod def _apply_s3_from_vault(cls, data: dict, s3: dict) -> None: bucket = cls._vault_kv(s3, 'bucket_name', 'BUCKET_NAME', 'bucket', 'LEGAL_DOCS_S3_BUCKET') endpoint = cls._vault_kv(s3, 's3_endpoint_url', 'S3_ENDPOINT_URL', 'endpoint_url', 'LEGAL_DOCS_S3_ENDPOINT_URL') ak = cls._vault_kv(s3, 's3_access_key_id', 'S3_ACCESS_KEY_ID', 'ACCESS_KEY_ID', 'LEGAL_DOCS_S3_ACCESS_KEY_ID') sk = cls._vault_kv( s3, 's3_secret_access_key', 'S3_SECRET_ACCESS_KEY', 'SECRET_ACCESS_KEY', 'LEGAL_DOCS_S3_SECRET_ACCESS_KEY' ) if bucket: data['LEGAL_DOCS_S3_BUCKET'] = str(bucket).strip() if endpoint: data['LEGAL_DOCS_S3_ENDPOINT_URL'] = str(endpoint).strip() if ak: data['LEGAL_DOCS_S3_ACCESS_KEY_ID'] = str(ak).strip() if sk: data['LEGAL_DOCS_S3_SECRET_ACCESS_KEY'] = str(sk).strip() region = cls._vault_kv(s3, 's3_region', 'S3_REGION', 'region', 'LEGAL_DOCS_S3_REGION') if region: data['LEGAL_DOCS_S3_REGION'] = str(region).strip() prefix = cls._vault_kv(s3, 'key_prefix', 'LEGAL_DOCS_S3_KEY_PREFIX', 's3_key_prefix') if prefix: data['LEGAL_DOCS_S3_KEY_PREFIX'] = str(prefix).strip() @model_validator(mode='before') @classmethod def load_from_vault(cls, data: dict): if not isinstance(data, dict): return data addr_raw = data.get('VAULT_ADDR') or os.getenv('VAULT_ADDR') or 'http://localhost:8200' addr = normalize_vault_base_url(addr_raw) data['VAULT_ADDR'] = addr role_id = data.get('VAULT_ROLE_ID') or os.getenv('VAULT_ROLE_ID') secret_id = ( data.get('VAULT_SECRET_ID') or data.get('VAULT_SECRET_TOKEN') or os.getenv('VAULT_SECRET_ID') or os.getenv('VAULT_SECRET_TOKEN') ) namespace = data.get('VAULT_NAMESPACE') if namespace is None: namespace = os.getenv('VAULT_NAMESPACE') namespace = namespace if namespace else None mount = data.get('VAULT_MOUNT_POINT') or os.getenv('VAULT_MOUNT_POINT') or 'dev-secrets' if not role_id or not secret_id: raise RuntimeError( 'VAULT_ROLE_ID and VAULT_SECRET_ID (or VAULT_SECRET_TOKEN) are required for Vault AppRole' ) data['VAULT_ROLE_ID'] = str(role_id).strip() data['VAULT_SECRET_ID'] = str(secret_id).strip() client = create_hvac_client_from_approle( url=addr, role_id=role_id, secret_id=secret_id, namespace=namespace, timeout=5, ) def read_secret(path: str) -> dict: return read_kv2_secret(client=client, mount_point=mount, path=path) def read_secret_optional(path: str) -> dict: try: return read_secret(path) except Exception: return {} database = read_secret('database') db_ci = {str(k).lower(): v for k, v in database.items()} def db_nonempty(key: str) -> bool: v = db_ci.get(key) if v is None: return False if isinstance(v, str) and not v.strip(): return False return True required_db = ['host', 'name', 'user', 'password', 'port'] missing_db = [k for k in required_db if not db_nonempty(k)] if missing_db: raise RuntimeError(f'Vault secret database missing non-empty keys: {missing_db}') data['DATABASE_HOST'] = str(db_ci['host']).strip() data['DATABASE_PORT'] = int(db_ci['port']) data['DATABASE_NAME'] = str(db_ci['name']).strip() data['DATABASE_USER'] = str(db_ci['user']).strip() data['DATABASE_PASSWORD'] = str(db_ci['password']).strip() redis_secret = read_secret_optional('redis') if redis_secret: rd_ci = {str(k).lower(): v for k, v in redis_secret.items()} def rd_set(field: str, env_key: str, *, as_int: bool = False) -> None: v = rd_ci.get(field) if v is None: return if isinstance(v, str) and not v.strip(): return if as_int: data[env_key] = int(v) else: data[env_key] = str(v).strip() rd_set('host', 'REDIS_HOST') rd_set('port', 'REDIS_PORT', as_int=True) rd_set('password', 'REDIS_PASSWORD') rd_set('db', 'REDIS_DB', as_int=True) s3_path = ( data.get('VAULT_LEGAL_DOCS_S3_SECRET_PATH') or os.getenv('VAULT_LEGAL_DOCS_S3_SECRET_PATH') or 's3/b2b_docs' ) s3_secret = read_secret_optional(str(s3_path)) if s3_secret: cls._apply_s3_from_vault(data, s3_secret) return data @property def DATABASE_URL(self) -> str: return ( f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" ) @property def REDIS_URL(self) -> str: return f'redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}' @property def EXCLUDED_PATHS(self) -> List[str]: return ['/docs', '/redoc', '/openapi.json', '/ping', '/health'] @lru_cache(maxsize=1) def get_settings() -> Settings: return Settings() settings = get_settings()