import secrets from datetime import datetime, timezone from ulid import ULID from src.application.abstractions import IUnitOfWork from src.application.contracts import IHashService, ICache, ILogger, IQueueMessanger from src.application.domain.exceptions import ApplicationException from src.infrastructure.config import settings from src.infrastructure.context_vars import trace_id_var from src.infrastructure.database.decorators import transactional class ForgotPasswordStartCommand: def __init__( self, hash_service: IHashService, cache: ICache, unit_of_work: IUnitOfWork, logger: ILogger, messanger: IQueueMessanger, ): self._hash_service = hash_service self._unit_of_work = unit_of_work self._cache = cache self._logger = logger self._messanger = messanger @staticmethod def _normalize_email(email: str) -> str: return email.strip().lower() @transactional async def __call__(self, email: str) -> bool: TTL = 300 LOCK_TTL = 30 MAX_ATTEMPTS = 20 EMAIL_PREFIX = 'forgot_password:email:' CODE_PREFIX = 'forgot_password:code:' LOCK_PREFIX = 'forgot_password:lock:' normalized = self._normalize_email(email) user = await self._unit_of_work.user_repository.get_user_by_email(normalized) if user is None: self._logger.info(f'Forgot password start: no user for email hash lookup') return True trace_id = trace_id_var.get() if not trace_id or trace_id == 'N/A': trace_id = None lock_key = f'{LOCK_PREFIX}{normalized}' locked = await self._cache.set_nx(lock_key, '1', ttl=LOCK_TTL) if not locked: self._logger.info(f'Forgot password throttled by lock (user_id={user.id})') raise ApplicationException(429, 'Too many requests. Please wait.') try: email_key = f'{EMAIL_PREFIX}{normalized}' existing = await self._cache.get(email_key) if existing: self._logger.info(f'Forgot password denied: code already exists for user_id={user.id}') raise ApplicationException(429, 'Code already sent. Please wait before retrying.') for _ in range(MAX_ATTEMPTS): code = f'{secrets.randbelow(1_000_000):06d}' code_key = f'{CODE_PREFIX}{code}' code_hash = await self._hash_service.hash(code) reserved = await self._cache.set_nx(code_key, normalized, ttl=TTL) if not reserved: continue saved = await self._cache.set(email_key, code_hash, ttl=TTL) if not saved: await self._cache.delete(code_key) self._logger.error(f'Forgot password failed: cannot save code hash for user_id={user.id}') raise ApplicationException(503, 'Temporary error. Please try again.') message_id = str(ULID()) now = datetime.now(timezone.utc).isoformat() metadata = { 'trace_id': trace_id, 'source': 'user-service', 'timestamp': now, 'message_id': message_id, } payload = { 'email': normalized, 'code': code, 'ttl_seconds': TTL, } message = { 'event': 'forgot_password', 'payload': payload, 'metadata': metadata, } self._logger.info(f'Forgot password code created for user_id={user.id}') try: await self._messanger.publish_to_queue( queue=settings.RABBIT_EMAIL_CODE_QUEUE, message=message, persist=True, correlation_id=trace_id, message_id=message_id, headers={'trace_id': trace_id} if trace_id else None, ) except Exception as exception: try: await self._cache.delete(email_key) await self._cache.delete(code_key) except Exception as rollback_err: self._logger.error( f'Publish failed and rollback cache failed for user_id={user.id}: {str(rollback_err)}' ) self._logger.error(f'Failed to publish forgot password email for user_id={user.id}: {str(exception)}') raise ApplicationException(503, 'Temporary error. Please try again.') return True self._logger.error(f'Forgot password failed: code space exhausted for user_id={user.id}') raise ApplicationException(503, 'Temporary error. Please try again.') finally: await self._cache.delete(lock_key)