88 lines
3.2 KiB
Python
88 lines
3.2 KiB
Python
from __future__ import annotations
|
||
from datetime import date,datetime
|
||
from typing import Any
|
||
from src.application.domain.dto import KycPersonalData
|
||
from src.application.domain.exceptions import ApplicationException
|
||
|
||
|
||
FIELD_ALIASES = {
|
||
'full_name': {'full_name','fullname','fio','фио'},
|
||
'first_name': {'first_name','firstname','name','given_name','givenname','имя'},
|
||
'last_name': {'last_name','lastname','surname','family_name','familyname','фамилия'},
|
||
'middle_name': {'middle_name','middlename','patronymic','отчество'},
|
||
'birth_date': {'birth_date','birthdate','date_birth','datebirth','birthday','дата_рождения'},
|
||
'inn': {'inn','tax_id','taxid','инн'},
|
||
}
|
||
|
||
|
||
def extract_personal_data(data: Any) -> KycPersonalData:
|
||
values: dict[str,str] = {}
|
||
for key,value in _walk(data):
|
||
normalized = _normalize_key(key)
|
||
for field,aliases in FIELD_ALIASES.items():
|
||
if field not in values and normalized in aliases and value not in (None,''):
|
||
values[field] = str(value).strip()
|
||
|
||
if values.get('full_name') and (not values.get('first_name') or not values.get('last_name')):
|
||
parts = values['full_name'].split()
|
||
if len(parts) >= 2:
|
||
values.setdefault('last_name',parts[0])
|
||
values.setdefault('first_name',parts[1])
|
||
if len(parts) >= 3:
|
||
values.setdefault('middle_name',' '.join(parts[2:]))
|
||
|
||
missing = [field for field in ('first_name','last_name','birth_date') if not values.get(field)]
|
||
if missing:
|
||
raise ApplicationException(status_code=422,message='KYC personal data is incomplete')
|
||
|
||
return KycPersonalData(
|
||
first_name=values['first_name'],
|
||
last_name=values['last_name'],
|
||
middle_name=values.get('middle_name'),
|
||
birth_date=str(_parse_date(values['birth_date'])),
|
||
inn=values.get('inn'),
|
||
)
|
||
|
||
|
||
def ensure_adult(birth_date: date) -> None:
|
||
today = date.today()
|
||
try:
|
||
adult_from = date(today.year - 18,today.month,today.day)
|
||
except ValueError:
|
||
adult_from = date(today.year - 18,2,28)
|
||
if birth_date > adult_from:
|
||
raise ApplicationException(status_code=403,message='KYC is unavailable for users under 18')
|
||
|
||
|
||
def parse_birth_date(value: str) -> date:
|
||
return _parse_date(value)
|
||
|
||
|
||
def _walk(data: Any) -> list[tuple[str,Any]]:
|
||
items: list[tuple[str,Any]] = []
|
||
if isinstance(data,dict):
|
||
for key,value in data.items():
|
||
if isinstance(value,dict | list):
|
||
items.extend(_walk(value))
|
||
else:
|
||
items.append((str(key),value))
|
||
elif isinstance(data,list):
|
||
for item in data:
|
||
items.extend(_walk(item))
|
||
return items
|
||
|
||
|
||
def _normalize_key(key: str) -> str:
|
||
return key.strip().lower().replace('-','_').replace(' ','_')
|
||
|
||
|
||
def _parse_date(value: str) -> date:
|
||
clean = value.strip()
|
||
formats = ('%Y-%m-%d','%d.%m.%Y','%d-%m-%Y','%d/%m/%Y','%Y.%m.%d')
|
||
for date_format in formats:
|
||
try:
|
||
return datetime.strptime(clean,date_format).date()
|
||
except ValueError:
|
||
continue
|
||
raise ApplicationException(status_code=422,message='KYC birth date has invalid format')
|