Files
kyc/src/infrastructure/beorg/client.py

84 lines
3.0 KiB
Python

from __future__ import annotations
from typing import Any
import aiohttp
from src.application.contracts import IBeorgService
from src.application.domain.dto import BeorgKycCreateResponse,BeorgKycResultResponse
from src.application.domain.exceptions import BeorgConfigException,BeorgRejectedException,BeorgUnavailableException
class BeorgService(IBeorgService):
BASE_URL = 'https://webapp.beorg.ru'
CREATE_ENDPOINT = '/kyc/create'
GET_RESULT_ENDPOINT = '/kyc/get_result'
def __init__(
self,
*,
project_id: str,
machine_uid: str,
token: str,
process_info: list[dict[str,Any]],
timeout: int = 15,
) -> None:
self._project_id = project_id
self._machine_uid = machine_uid
self._token = token
self._process_info = process_info
self._expires = 3600
self._timeout = timeout
async def create_identification(self,client_user_token: str) -> BeorgKycCreateResponse:
self._ensure_configured()
payload: dict[str,Any] = {
'project_id': self._project_id,
'machine_uid': self._machine_uid,
'token': self._token,
'process_info': self._process_info,
'client_user_token': client_user_token,
'expires': self._expires,
}
timeout = aiohttp.ClientTimeout(total=self._timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f'{self.BASE_URL}{self.CREATE_ENDPOINT}',
json=payload,
headers={'Content-Type': 'application/json'},
) as response:
data = await response.json(content_type=None)
if response.status >= 500:
raise BeorgUnavailableException()
result = BeorgKycCreateResponse.model_validate(data)
if not result.status:
raise BeorgRejectedException(result.error or 'Beorg rejected kyc request')
return result
async def get_result(self,user_token: str) -> BeorgKycResultResponse:
self._ensure_configured()
timeout = aiohttp.ClientTimeout(total=self._timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(
f'{self.BASE_URL}{self.GET_RESULT_ENDPOINT}',
params={
'token': self._token,
'user_token': user_token,
},
headers={'Content-Type': 'application/json'},
) as response:
data = await response.json(content_type=None)
if response.status >= 500:
raise BeorgUnavailableException()
return BeorgKycResultResponse.model_validate(data)
def _ensure_configured(self) -> None:
if not self._project_id or not self._machine_uid or not self._token or not self._process_info:
raise BeorgConfigException()