Initial commit
This commit is contained in:
1
src/infrastructure/config/__init__.py
Normal file
1
src/infrastructure/config/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from src.infrastructure.config.settings import settings
|
||||
135
src/infrastructure/config/settings.py
Normal file
135
src/infrastructure/config/settings.py
Normal file
@@ -0,0 +1,135 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from functools import lru_cache
|
||||
from typing import List, Literal
|
||||
import os
|
||||
from dotenv import load_dotenv, find_dotenv
|
||||
from pydantic import Field, model_validator
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
from src.infrastructure.vault import create_hvac_client, read_kv2_secret
|
||||
|
||||
env_file = find_dotenv('.env')
|
||||
if env_file:
|
||||
load_dotenv(env_file)
|
||||
|
||||
|
||||
def _vault_key_get(mapping: dict, key: str, default=None):
|
||||
ku = key.upper()
|
||||
kl = key.lower()
|
||||
if ku in mapping:
|
||||
return mapping[ku]
|
||||
if kl in mapping:
|
||||
return mapping[kl]
|
||||
return default
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
VAULT_ADDR: str = Field(default='http://localhost:8200')
|
||||
VAULT_ROLE_ID: str = Field(..., description='Vault AppRole role_id')
|
||||
VAULT_SECRET_ID: str = Field(..., description='Vault AppRole secret_id')
|
||||
VAULT_AUTH_MOUNT: str = Field(default='approle')
|
||||
VAULT_MOUNT_POINT: str = Field(default='secrets')
|
||||
|
||||
DOCS_USERNAME: str = "admin"
|
||||
DOCS_PASSWORD: str = "admin"
|
||||
|
||||
SMTP_FROM: str = ""
|
||||
SMTP_HOST: str = "localhost"
|
||||
SMTP_PASSWORD: str = ""
|
||||
SMTP_PORT: int = 587
|
||||
SMTP_USER: str = ""
|
||||
|
||||
RABBIT_HOST: str = "localhost"
|
||||
RABBIT_PORT: int = 5672
|
||||
RABBIT_USER: str = "guest"
|
||||
RABBIT_PASSWORD: str = "guest"
|
||||
RABBIT_VHOST: str = "/"
|
||||
|
||||
RABBIT_PUBLISH_PERSIST: bool = True
|
||||
RABBIT_CONNECT_TIMEOUT: int = 5
|
||||
RABBIT_EMAIL_CODE_QUEUE: str = "email.verification_code"
|
||||
|
||||
LOG_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
|
||||
LOG_FORMAT: Literal["JSON", "TEXT"] = "TEXT"
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
case_sensitive=True,
|
||||
extra="ignore",
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def load_from_vault(cls, data: dict):
|
||||
addr = data.get('VAULT_ADDR') or os.getenv('VAULT_ADDR') or 'http://localhost:8200'
|
||||
role_id = data.get('VAULT_ROLE_ID') or os.getenv('VAULT_ROLE_ID')
|
||||
secret_id = data.get('VAULT_SECRET_ID') or os.getenv('VAULT_SECRET_ID')
|
||||
auth_mount = data.get('VAULT_AUTH_MOUNT') or os.getenv('VAULT_AUTH_MOUNT') or 'approle'
|
||||
mount = data.get('VAULT_MOUNT_POINT') or os.getenv('VAULT_MOUNT_POINT') or 'secrets'
|
||||
|
||||
if not role_id or not secret_id:
|
||||
raise RuntimeError('VAULT_ROLE_ID and VAULT_SECRET_ID are required')
|
||||
|
||||
client = create_hvac_client(
|
||||
url=addr,
|
||||
role_id=role_id,
|
||||
secret_id=secret_id,
|
||||
auth_mount_point=auth_mount,
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
def safe_read(path: str) -> dict:
|
||||
try:
|
||||
return read_kv2_secret(client=client, mount_point=mount, path=path)
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
rabbitmq = safe_read("rabbitmq")
|
||||
email = safe_read("email")
|
||||
|
||||
|
||||
if rabbitmq:
|
||||
data['RABBIT_HOST'] = _vault_key_get(rabbitmq, 'HOST', data.get('RABBIT_HOST'))
|
||||
data['RABBIT_PORT'] = _vault_key_get(rabbitmq, 'PORT', data.get('RABBIT_PORT'))
|
||||
data['RABBIT_USER'] = _vault_key_get(rabbitmq, 'USER', data.get('RABBIT_USER'))
|
||||
data['RABBIT_PASSWORD'] = _vault_key_get(rabbitmq, 'PASSWORD', data.get('RABBIT_PASSWORD'))
|
||||
data['RABBIT_VHOST'] = _vault_key_get(rabbitmq, 'VHOST', data.get('RABBIT_VHOST'))
|
||||
|
||||
if email:
|
||||
data['SMTP_FROM'] = _vault_key_get(email, 'FROM', data.get('SMTP_FROM'))
|
||||
data['SMTP_HOST'] = _vault_key_get(email, 'HOST', data.get('SMTP_HOST'))
|
||||
data['SMTP_PASSWORD'] = _vault_key_get(email, 'PASSWORD', data.get('SMTP_PASSWORD'))
|
||||
data['SMTP_PORT'] = _vault_key_get(email, 'PORT', data.get('SMTP_PORT'))
|
||||
data['SMTP_USER'] = _vault_key_get(email, 'USER', data.get('SMTP_USER'))
|
||||
|
||||
return data
|
||||
|
||||
@property
|
||||
def DATABASE_URL(self) -> str:
|
||||
return (
|
||||
f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}"
|
||||
f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}"
|
||||
)
|
||||
|
||||
@property
|
||||
def REDIS_URL(self) -> str:
|
||||
auth = f":{self.REDIS_PASSWORD}@" if self.REDIS_PASSWORD else ""
|
||||
return f"redis://{auth}{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
|
||||
|
||||
@property
|
||||
def RABBIT_URL(self) -> str:
|
||||
vhost = "%2F" if self.RABBIT_VHOST == "/" else self.RABBIT_VHOST.lstrip("/")
|
||||
return f"amqp://{self.RABBIT_USER}:{self.RABBIT_PASSWORD}@{self.RABBIT_HOST}:{self.RABBIT_PORT}/{vhost}"
|
||||
|
||||
@property
|
||||
def EXCLUDED_PATHS(self) -> List[str]:
|
||||
return ["/docs", "/redoc", "/openapi.json", "/ping", "/health"]
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_settings() -> Settings:
|
||||
return Settings()
|
||||
|
||||
|
||||
settings = get_settings()
|
||||
Reference in New Issue
Block a user