Files
adminka/src/application/commands/admin_login.py
2026-06-05 12:41:25 +03:00

95 lines
3.1 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone, timedelta
from ulid import ULID
from src.application.abstractions import IUnitOfWork
from src.application.contracts import IHashService, IJwtService, ILogger
from src.application.domain.dto.admin_auth import AdminLoginDto
from src.application.domain.exceptions import ApplicationException
from src.infrastructure.config import settings
from src.infrastructure.database.decorators import transactional
class AdminLoginCommand:
def __init__(
self,
unit_of_work: IUnitOfWork,
hash_service: IHashService,
jwt_service: IJwtService,
logger: ILogger,
):
self._unit_of_work = unit_of_work
self._hash_service = hash_service
self._jwt_service = jwt_service
self._logger = logger
@transactional
async def __call__(
self,
*,
login: str,
password: str,
device_id: str | None,
ip: str | None,
user_agent: str | None,
) -> AdminLoginDto:
login = (login or '').strip()
if not login:
raise ApplicationException(status_code=400, message='Login is required')
admin = await self._unit_of_work.admin_user_repository.get_by_login(login)
if not admin.is_active:
raise ApplicationException(status_code=403, message='Admin account is inactive')
ok = await self._hash_service.verify(plain_value=password, hashed_value=admin.password_hash)
if not ok:
self._logger.warning(f'Admin login failed for {login}')
raise ApplicationException(status_code=401, message='Invalid credentials')
now = datetime.now(timezone.utc)
await self._unit_of_work.admin_user_repository.update_last_login(admin.id, last_login_at=now)
resolved_device_id = device_id or str(ULID())
sid = str(ULID())
jti = str(ULID())
jti_hash = await self._hash_service.hash(value=jti)
refresh_expires_at = now + timedelta(seconds=int(settings.JWT_REFRESH_TTL_SECONDS))
await self._unit_of_work.admin_session_repository.upsert_by_device(
admin_user_id=admin.id,
device_id=resolved_device_id,
sid=sid,
refresh_jti_hash=jti_hash,
refresh_expires_at=refresh_expires_at,
user_agent=user_agent,
ip=ip,
now=now,
)
access_token = await self._jwt_service.create_access_token(
user_id=admin.id,
role=admin.role,
sid=sid,
)
refresh_token = await self._jwt_service.create_refresh_token(
user_id=admin.id,
sid=sid,
refresh_jti=jti,
)
self._logger.info(f'Admin logged in admin_user_id={admin.id}')
return AdminLoginDto(
id=admin.id,
login=admin.login,
first_name=admin.first_name,
last_name=admin.last_name,
role=admin.role,
access_token=access_token,
refresh_token=refresh_token,
device_id=resolved_device_id,
last_login_at=now,
)