diff --git a/src/application/abstractions/repositories/i_kyc_repository.py b/src/application/abstractions/repositories/i_kyc_repository.py index 093f356..7e70ffc 100644 --- a/src/application/abstractions/repositories/i_kyc_repository.py +++ b/src/application/abstractions/repositories/i_kyc_repository.py @@ -42,6 +42,21 @@ class IKycRepository(ABC): raise NotImplementedError + @abstractmethod + async def expire_all_started_sessions(self,*,now: datetime) -> None: + raise NotImplementedError + + + @abstractmethod + async def get_started_sessions(self,*,now: datetime,limit: int) -> list[KycEntity]: + raise NotImplementedError + + @abstractmethod async def get_active_session(self,*,user_id: str,now: datetime) -> KycEntity | None: raise NotImplementedError + + + @abstractmethod + async def get_latest_session(self,*,user_id: str) -> KycEntity | None: + raise NotImplementedError diff --git a/src/application/commands/__init__.py b/src/application/commands/__init__.py index 4bca62a..5262955 100644 --- a/src/application/commands/__init__.py +++ b/src/application/commands/__init__.py @@ -1 +1 @@ -from src.application.commands.create_kyc_command import CompleteKycCommand,GetKycSessionCommand,PassKycCommand \ No newline at end of file +from src.application.commands.create_kyc_command import CompleteKycCommand,GetKycSessionCommand,PassKycCommand,PollKycSessionsCommand \ No newline at end of file diff --git a/src/application/commands/create_kyc_command.py b/src/application/commands/create_kyc_command.py index 12a7dc0..ff8243f 100644 --- a/src/application/commands/create_kyc_command.py +++ b/src/application/commands/create_kyc_command.py @@ -132,7 +132,7 @@ class GetKycSessionCommand: now = _utc_now() async with self._unit_of_work as unit_of_work: await unit_of_work.kyc_repository.expire_started_sessions(user_id=user_id,now=now) - session = await unit_of_work.kyc_repository.get_active_session(user_id=user_id,now=now) + session = await unit_of_work.kyc_repository.get_latest_session(user_id=user_id) if session is None or session.expires_at is None: raise ApplicationException(status_code=404,message='KYC session expired') @@ -142,10 +142,99 @@ class GetKycSessionCommand: link=session.link, qr_code=session.qr_code, user_token=session.user_token, + done_state=session.done_state, + error=session.error, expires_at=session.expires_at, expires_in=expires_in, ) +class PollKycSessionsCommand: + + def __init__( + self, + *, + unit_of_work: IUnitOfWork, + logger: ILogger, + beorg_service: IBeorgService, + batch_size: int, + ) -> None: + self._unit_of_work = unit_of_work + self._logger = logger + self._beorg_service = beorg_service + self._batch_size = batch_size + + + async def __call__(self) -> None: + now = _utc_now() + async with self._unit_of_work as unit_of_work: + await unit_of_work.kyc_repository.expire_all_started_sessions(now=now) + sessions = await unit_of_work.kyc_repository.get_started_sessions(now=now,limit=self._batch_size) + + for session in sessions: + if not session.user_id or not session.user_token: + continue + try: + await self._poll_session(user_id=session.user_id,user_token=session.user_token) + except ApplicationException as exc: + if exc.status_code in (400,403,422): + async with self._unit_of_work as unit_of_work: + await unit_of_work.kyc_repository.update_session_result( + user_id=session.user_id, + user_token=session.user_token, + status='failed', + done_state=True, + set_id=None, + result_data=session.result_data, + error=exc.message, + ) + self._logger.error(f'KYC polling failed for user {session.user_id}: {exc.message}') + except Exception as exc: + self._logger.error(f'KYC polling failed for user {session.user_id}: {str(exc)}') + + + async def _poll_session(self,*,user_id: str,user_token: str) -> None: + result = await self._beorg_service.get_result(user_token=user_token) + if result.done_state is None: + return + if result.done_state is False: + async with self._unit_of_work as unit_of_work: + await unit_of_work.kyc_repository.update_session_result( + user_id=user_id, + user_token=user_token, + status='failed', + done_state=result.done_state, + set_id=result.set_id, + result_data=result.data, + error='KYC failed', + ) + self._logger.info(f'KYC failed for user {user_id}') + return + + personal_data = extract_personal_data(result.data) + birth_date = parse_birth_date(personal_data.birth_date) + ensure_adult(birth_date) + + async with self._unit_of_work as unit_of_work: + await unit_of_work.user_repository.update_kyc_data( + user_id=user_id, + first_name=personal_data.first_name, + last_name=personal_data.last_name, + middle_name=personal_data.middle_name, + birth_date=birth_date, + inn=personal_data.inn, + ) + await unit_of_work.kyc_repository.update_session_result( + user_id=user_id, + user_token=user_token, + status='completed', + done_state=result.done_state, + set_id=result.set_id, + result_data=result.data, + error=None, + ) + self._logger.info(f'KYC completed for user {user_id}') + + def _utc_now() -> datetime: return datetime.now(timezone.utc) diff --git a/src/application/domain/dto/beorg.py b/src/application/domain/dto/beorg.py index c4313fe..b3ad23b 100644 --- a/src/application/domain/dto/beorg.py +++ b/src/application/domain/dto/beorg.py @@ -33,5 +33,7 @@ class KycSessionResponse(BaseModel): link: str | None = None qr_code: str | None = None user_token: str | None = None + done_state: bool | None = None + error: str | None = None expires_at: datetime expires_in: int diff --git a/src/application/services/kyc_personal_data.py b/src/application/services/kyc_personal_data.py index 7c92238..cfaeadc 100644 --- a/src/application/services/kyc_personal_data.py +++ b/src/application/services/kyc_personal_data.py @@ -6,11 +6,12 @@ from src.application.domain.exceptions import ApplicationException FIELD_ALIASES = { - 'first_name': {'first_name','name','given_name','имя'}, - 'last_name': {'last_name','surname','family_name','фамилия'}, - 'middle_name': {'middle_name','patronymic','отчество'}, - 'birth_date': {'birth_date','birthdate','date_birth','birthday','дата рождения'}, - 'inn': {'inn','tax_id','инн'}, + 'full_name': {'full_name','fullname','fio','фио'}, + 'first_name': {'first_name','firstname','name','given_name','givenname','имя'}, + 'last_name': {'last_name','lastname','surname','family_name','familyname','фамилия'}, + 'middle_name': {'middle_name','middlename','patronymic','отчество'}, + 'birth_date': {'birth_date','birthdate','date_birth','datebirth','birthday','дата_рождения'}, + 'inn': {'inn','tax_id','taxid','инн'}, } @@ -22,6 +23,14 @@ def extract_personal_data(data: Any) -> KycPersonalData: if field not in values and normalized in aliases and value not in (None,''): values[field] = str(value).strip() + if values.get('full_name') and (not values.get('first_name') or not values.get('last_name')): + parts = values['full_name'].split() + if len(parts) >= 2: + values.setdefault('last_name',parts[0]) + values.setdefault('first_name',parts[1]) + if len(parts) >= 3: + values.setdefault('middle_name',' '.join(parts[2:])) + missing = [field for field in ('first_name','last_name','birth_date') if not values.get(field)] if missing: raise ApplicationException(status_code=422,message='KYC personal data is incomplete') diff --git a/src/infrastructure/config/settings.py b/src/infrastructure/config/settings.py index dd3fd31..b5278a9 100644 --- a/src/infrastructure/config/settings.py +++ b/src/infrastructure/config/settings.py @@ -36,6 +36,8 @@ class Settings(BaseSettings): DATABASE_POOL_TIMEOUT: int = 30 DATABASE_POOL_RECYCLE: int = 1800 DATABASE_ECHO: bool = False + KYC_POLL_SECONDS: int = 10 + KYC_POLL_BATCH_SIZE: int = 20 EXCLUDED_PATHS: tuple[str,...] = ('/docs','/redoc','/openapi.json','/ping') BEORG_TIMEOUT: int = 15 BEORG_PROCESS_INFO: list[dict[str,Any]] = Field(default_factory=lambda: [ diff --git a/src/infrastructure/database/repositories/kyc_repository.py b/src/infrastructure/database/repositories/kyc_repository.py index 0216cd3..4e30706 100644 --- a/src/infrastructure/database/repositories/kyc_repository.py +++ b/src/infrastructure/database/repositories/kyc_repository.py @@ -83,6 +83,33 @@ class KycRepository(IKycRepository): await self._session.flush() + async def expire_all_started_sessions(self,*,now: datetime) -> None: + result = await self._session.execute( + select(KycModel) + .where( + KycModel.status == 'started', + KycModel.expires_at <= now, + ) + ) + for kyc in result.scalars(): + kyc.status = 'expired' + await self._session.flush() + + + async def get_started_sessions(self,*,now: datetime,limit: int) -> list[KycEntity]: + result = await self._session.execute( + select(KycModel) + .where( + KycModel.status == 'started', + KycModel.expires_at > now, + KycModel.user_token.is_not(None), + ) + .order_by(KycModel.created_at.asc()) + .limit(limit) + ) + return [self._to_entity(kyc) for kyc in result.scalars()] + + async def get_active_session(self,*,user_id: str,now: datetime) -> KycEntity | None: result = await self._session.execute( select(KycModel) @@ -100,6 +127,19 @@ class KycRepository(IKycRepository): return self._to_entity(kyc) + async def get_latest_session(self,*,user_id: str) -> KycEntity | None: + result = await self._session.execute( + select(KycModel) + .where(KycModel.user_id == user_id) + .order_by(KycModel.created_at.desc()) + .limit(1) + ) + kyc = result.scalar_one_or_none() + if kyc is None: + return None + return self._to_entity(kyc) + + def _to_entity(self,kyc: KycModel) -> KycEntity: return KycEntity( id=kyc.id, diff --git a/src/main.py b/src/main.py index a757e8b..e122c35 100644 --- a/src/main.py +++ b/src/main.py @@ -2,13 +2,18 @@ from __future__ import annotations from contextlib import asynccontextmanager import secrets from typing import AsyncGenerator +from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import Depends, FastAPI, status from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html from fastapi.responses import HTMLResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials +from src.application.commands import PollKycSessionsCommand from src.application.domain.enums import LogFormat,LogLevel from src.application.domain.exceptions import ApplicationException +from src.infrastructure.beorg import BeorgService from src.infrastructure.config.settings import get_settings +from src.infrastructure.database.context import async_session_maker +from src.infrastructure.database.unit_of_work import UnitOfWork from src.infrastructure.vault import JwtKeyStore, start_jwt_keys_scheduler from src.infrastructure.utils import generate_instance_id from src.infrastructure.logger import logger @@ -54,10 +59,33 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: await jwt_store.refresh() jwt_scheduler = start_jwt_keys_scheduler(jwt_store, refresh_seconds=settings.JWT_KEYS_REFRESH_SECONDS) + kyc_poll_command = PollKycSessionsCommand( + unit_of_work=UnitOfWork(session_factory=async_session_maker,logger=logger), + logger=logger, + beorg_service=BeorgService( + project_id=settings.BEORG_PROJECT_ID, + machine_uid=settings.BEORG_MACHINE_UID, + token=settings.BEORG_TOKEN, + process_info=settings.BEORG_PROCESS_INFO, + timeout=settings.BEORG_TIMEOUT, + ), + batch_size=settings.KYC_POLL_BATCH_SIZE, + ) + kyc_scheduler = AsyncIOScheduler() + kyc_scheduler.add_job( + kyc_poll_command.__call__, + 'interval', + seconds=settings.KYC_POLL_SECONDS, + max_instances=1, + ) + kyc_scheduler.start() app.state.jwt_key_store = jwt_store app.state.jwt_keys_scheduler = jwt_scheduler + app.state.kyc_scheduler = kyc_scheduler yield + app.state.kyc_scheduler.shutdown(wait=False) + app.state.jwt_keys_scheduler.shutdown(wait=False) logger.info(f'Users service instance ended with id {instance_id}') diff --git a/src/presentation/dependencies/commands.py b/src/presentation/dependencies/commands.py index 8f96aa5..eae0409 100644 --- a/src/presentation/dependencies/commands.py +++ b/src/presentation/dependencies/commands.py @@ -1,7 +1,7 @@ from __future__ import annotations from fastapi import Depends from src.application.abstractions import IUnitOfWork -from src.application.commands import CompleteKycCommand,GetKycSessionCommand,PassKycCommand +from src.application.commands import GetKycSessionCommand,PassKycCommand from src.application.contracts import IBeorgService,ILogger from src.infrastructure.config import settings from src.infrastructure.beorg import BeorgService @@ -37,15 +37,3 @@ def get_kyc_session_command( return GetKycSessionCommand( unit_of_work=unit_of_work, ) - - -def get_complete_kyc_command( - logger: ILogger = Depends(get_logger), - unit_of_work: IUnitOfWork = Depends(get_unit_of_work), - beorg_service: IBeorgService = Depends(get_beorg_service), -) -> CompleteKycCommand: - return CompleteKycCommand( - unit_of_work=unit_of_work, - logger=logger, - beorg_service=beorg_service, - ) \ No newline at end of file diff --git a/src/presentation/routing/kyc.py b/src/presentation/routing/kyc.py index 01fb696..bba2d5e 100644 --- a/src/presentation/routing/kyc.py +++ b/src/presentation/routing/kyc.py @@ -1,9 +1,9 @@ from fastapi import APIRouter,Depends from fastapi.responses import ORJSONResponse -from src.application.commands import CompleteKycCommand,GetKycSessionCommand,PassKycCommand +from src.application.commands import GetKycSessionCommand,PassKycCommand from src.application.domain.dto import AuthContext from src.presentation.decorators.auth import require_access_token -from src.presentation.dependencies.commands import get_complete_kyc_command,get_kyc_session_command,get_pass_kyc_command +from src.presentation.dependencies.commands import get_kyc_session_command,get_pass_kyc_command kyc_router = APIRouter(prefix='/kyc', tags=['Kyc']) @@ -11,29 +11,17 @@ kyc_router = APIRouter(prefix='/kyc', tags=['Kyc']) @kyc_router.post('/create') async def create_kyc( - #auth: AuthContext = Depends(require_access_token), + auth: AuthContext = Depends(require_access_token), command: PassKycCommand = Depends(get_pass_kyc_command), ) -> ORJSONResponse: - #result = await command(user_id=user_id) - result = await command(user_id='01KPKAFN6J1NJBY15DX8JE2QYB') + result = await command(user_id=auth.user_id) return ORJSONResponse(result.model_dump()) @kyc_router.get('/session') async def get_kyc_session( - #auth: AuthContext = Depends(require_access_token), + auth: AuthContext = Depends(require_access_token), command: GetKycSessionCommand = Depends(get_kyc_session_command), ) -> ORJSONResponse: - #result = await command(user_id=user_id) - result = await command(user_id='01KPKAFN6J1NJBY15DX8JE2QYB') - return ORJSONResponse(result.model_dump()) - - -@kyc_router.post('/complete') -async def complete_kyc( - #auth: AuthContext = Depends(require_access_token), - command: CompleteKycCommand = Depends(get_complete_kyc_command), -) -> ORJSONResponse: - #result = await command(user_id=user_id) - result = await command(user_id='01KPKAFN6J1NJBY15DX8JE2QYB') + result = await command(user_id=auth.user_id) return ORJSONResponse(result.model_dump()) \ No newline at end of file