fix: delte unused
This commit is contained in:
@@ -1,9 +1,8 @@
|
||||
from __future__ import annotations
|
||||
from datetime import datetime,timedelta,timezone
|
||||
import orjson
|
||||
from ulid import ULID
|
||||
from src.application.abstractions import IUnitOfWork
|
||||
from src.application.contracts import IBeorgService,ICache,ILogger,IQueueMessanger
|
||||
from src.application.contracts import IBeorgService,ICache,ILogger
|
||||
from src.application.domain.dto import BeorgKycCreateResponse,BeorgKycResultResponse,KycSessionResponse
|
||||
from src.application.domain.exceptions import ApplicationException
|
||||
from src.application.services import ensure_adult,extract_personal_data,parse_birth_date
|
||||
@@ -55,15 +54,11 @@ class CompleteKycCommand:
|
||||
logger: ILogger,
|
||||
cache: ICache,
|
||||
beorg_service: IBeorgService,
|
||||
queue_messanger: IQueueMessanger,
|
||||
verified_queue: str,
|
||||
) -> None:
|
||||
self._unit_of_work = unit_of_work
|
||||
self._logger = logger
|
||||
self._cache = cache
|
||||
self._beorg_service = beorg_service
|
||||
self._queue_messanger = queue_messanger
|
||||
self._verified_queue = verified_queue
|
||||
|
||||
|
||||
async def __call__(self,user_id: str) -> BeorgKycResultResponse:
|
||||
@@ -111,21 +106,6 @@ class CompleteKycCommand:
|
||||
)
|
||||
await self._cache.set_user(user_id,user,ttl=KYC_SESSION_TTL)
|
||||
await self._cache.delete(f'kyc:session:{user_id}')
|
||||
await self._queue_messanger.publish_to_queue(
|
||||
self._verified_queue,
|
||||
{
|
||||
'user_id': user_id,
|
||||
'kyc_verified': True,
|
||||
'first_name': user.first_name,
|
||||
'last_name': user.last_name,
|
||||
'middle_name': user.middle_name,
|
||||
'birth_date': str(user.birth_date) if user.birth_date else None,
|
||||
'inn': user.inn,
|
||||
'kyc_verified_at': user.kyc_verified_at.isoformat() if user.kyc_verified_at else None,
|
||||
},
|
||||
message_id=str(ULID()),
|
||||
correlation_id=user_id,
|
||||
)
|
||||
self._logger.info(f'KYC completed for user {user_id}')
|
||||
return result
|
||||
|
||||
|
||||
@@ -3,5 +3,4 @@ from src.application.contracts.i_jwt_service import IJwtService
|
||||
from src.application.contracts.i_csrf_service import ICsrfService
|
||||
from src.application.contracts.i_cache import ICache
|
||||
from src.application.contracts.i_hash_service import IHashService
|
||||
from src.application.contracts.i_queue_messanger import IQueueMessanger
|
||||
from src.application.contracts.i_beorg_service import IBeorgService
|
||||
@@ -1,40 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Mapping, Any
|
||||
|
||||
|
||||
class IQueueMessanger(ABC):
|
||||
|
||||
@abstractmethod
|
||||
async def connect(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def close(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def publish_to_queue(
|
||||
self,
|
||||
queue: str,
|
||||
message: Any,
|
||||
*,
|
||||
persist: bool = True,
|
||||
headers: Mapping[str, Any] | None = None,
|
||||
correlation_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def publish(
|
||||
self,
|
||||
message: Any,
|
||||
*,
|
||||
exchange: str,
|
||||
routing_key: str,
|
||||
persist: bool = True,
|
||||
headers: Mapping[str, Any] | None = None,
|
||||
correlation_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
) -> None:
|
||||
raise NotImplementedError
|
||||
@@ -22,18 +22,15 @@ class Settings(BaseSettings):
|
||||
VAULT_BEORG_SECRET_PATH: str = 'beorg'
|
||||
VAULT_DATABASE_SECRET_PATH: str = 'database'
|
||||
VAULT_JWT_SECRET_PATH: str = 'jwt'
|
||||
VAULT_RABBIT_SECRET_PATH: str = 'rabbitmq'
|
||||
VAULT_DOCS_SECRET_PATH: str = 'docs'
|
||||
VAULT_JWT_KID_PATH: str = 'jwt/kid'
|
||||
VAULT_JWT_KIDS_PREFIX: str = 'jwt/kids'
|
||||
LOG_FORMAT: str = 'json'
|
||||
LOG_LEVEL: str = 'INFO'
|
||||
JWT_KEYS_REFRESH_SECONDS: int = 300
|
||||
JWT_ALGORITHM: str = 'RS256'
|
||||
JWT_AUDIENCE: str | None = None
|
||||
JWT_ISSUER: str | None = None
|
||||
RABBIT_URL: str = 'amqp://guest:guest@localhost:5672/'
|
||||
RABBIT_CRYPTO_TRANSFER_COMPLETED_QUEUE: str = 'crypto_transfer_completed'
|
||||
RABBIT_KYC_VERIFIED_QUEUE: str = 'kyc_verified'
|
||||
RABBIT_PUBLISH_PERSIST: bool = True
|
||||
DATABASE_URL: str = 'postgresql+asyncpg://postgres:postgres@localhost:5432/kyc'
|
||||
DATABASE_POOL_SIZE: int = 5
|
||||
DATABASE_MAX_OVERFLOW: int = 10
|
||||
@@ -110,7 +107,6 @@ class Settings(BaseSettings):
|
||||
self.VAULT_BEORG_SECRET_PATH,
|
||||
self.VAULT_DATABASE_SECRET_PATH,
|
||||
self.VAULT_JWT_SECRET_PATH,
|
||||
self.VAULT_RABBIT_SECRET_PATH,
|
||||
self.VAULT_DOCS_SECRET_PATH,
|
||||
)
|
||||
for field in type(self).model_fields:
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
from src.infrastructure.messanger.rabbit_client import RabbitClient
|
||||
@@ -1,72 +0,0 @@
|
||||
from typing import Any, Mapping
|
||||
from faststream.rabbit import RabbitBroker
|
||||
from src.application.contracts import IQueueMessanger
|
||||
from src.infrastructure.config import settings
|
||||
|
||||
|
||||
class RabbitClient(IQueueMessanger):
|
||||
def __init__(self) -> None:
|
||||
self._broker = RabbitBroker(
|
||||
settings.RABBIT_URL,
|
||||
)
|
||||
self._connected = False
|
||||
|
||||
async def connect(self) -> None:
|
||||
if self._connected:
|
||||
return
|
||||
await self._broker.connect()
|
||||
self._connected = True
|
||||
|
||||
async def close(self) -> None:
|
||||
if not self._connected:
|
||||
return
|
||||
await self._broker.close()
|
||||
self._connected = False
|
||||
|
||||
async def _ensure_connected(self) -> None:
|
||||
if not self._connected:
|
||||
await self.connect()
|
||||
|
||||
async def publish_to_queue(
|
||||
self,
|
||||
queue: str,
|
||||
message: Any,
|
||||
*,
|
||||
persist: bool | None = None,
|
||||
headers: Mapping[str, Any] | None = None,
|
||||
correlation_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
) -> None:
|
||||
await self._ensure_connected()
|
||||
|
||||
await self._broker.publish(
|
||||
message,
|
||||
queue=queue,
|
||||
persist=settings.RABBIT_PUBLISH_PERSIST if persist is None else persist,
|
||||
headers=headers,
|
||||
correlation_id=correlation_id,
|
||||
message_id=message_id,
|
||||
)
|
||||
|
||||
async def publish(
|
||||
self,
|
||||
message: Any,
|
||||
*,
|
||||
exchange: str,
|
||||
routing_key: str,
|
||||
persist: bool | None = None,
|
||||
headers: Mapping[str, Any] | None = None,
|
||||
correlation_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
) -> None:
|
||||
await self._ensure_connected()
|
||||
|
||||
await self._broker.publish(
|
||||
message,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
persist=settings.RABBIT_PUBLISH_PERSIST if persist is None else persist,
|
||||
headers=headers,
|
||||
correlation_id=correlation_id,
|
||||
message_id=message_id,
|
||||
)
|
||||
@@ -6,6 +6,7 @@ from fastapi import Depends, FastAPI, status
|
||||
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
from src.application.domain.enums import LogFormat,LogLevel
|
||||
from src.application.domain.exceptions import ApplicationException
|
||||
from src.infrastructure.cache import create_redis_client
|
||||
from src.infrastructure.config.settings import get_settings
|
||||
@@ -14,7 +15,6 @@ from src.infrastructure.utils import generate_instance_id
|
||||
from src.infrastructure.logger import logger
|
||||
from src.infrastructure.config import settings
|
||||
from src.presentation.handlers import application_exception_handler, unhandled_exception_handler
|
||||
from src.presentation.messaging import crypto_transfer_router
|
||||
from src.presentation.middleware import TraceIDMiddleware, SecurityHeadersMiddleware
|
||||
from src.presentation.routing import kyc_router
|
||||
|
||||
@@ -37,6 +37,8 @@ async def verify_credentials(credentials: HTTPBasicCredentials = Depends(securit
|
||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
|
||||
instance_id = generate_instance_id()
|
||||
logger.set_format(LogFormat(settings.LOG_FORMAT.lower()))
|
||||
logger.set_min_level(LogLevel[settings.LOG_LEVEL.upper()])
|
||||
logger.set_instance_id(instance_id)
|
||||
logger.info(f'Users service instance started with id {instance_id}')
|
||||
|
||||
@@ -74,10 +76,8 @@ app.add_exception_handler(ApplicationException, application_exception_handler)
|
||||
app.add_exception_handler(Exception, unhandled_exception_handler)
|
||||
|
||||
app.include_router(kyc_router)
|
||||
app.include_router(crypto_transfer_router)
|
||||
|
||||
|
||||
# Added middleware
|
||||
app.add_middleware(TraceIDMiddleware, logger=logger)
|
||||
app.add_middleware(
|
||||
SecurityHeadersMiddleware,
|
||||
|
||||
@@ -2,12 +2,11 @@ from __future__ import annotations
|
||||
from fastapi import Depends
|
||||
from src.application.abstractions import IUnitOfWork
|
||||
from src.application.commands import CompleteKycCommand,GetKycSessionCommand,PassKycCommand
|
||||
from src.application.contracts import IBeorgService,ICache,ILogger,IQueueMessanger
|
||||
from src.application.contracts import IBeorgService,ICache,ILogger
|
||||
from src.infrastructure.config import settings
|
||||
from src.infrastructure.beorg import BeorgService
|
||||
from src.presentation.dependencies.cache import get_cache
|
||||
from src.presentation.dependencies.logger import get_logger
|
||||
from src.presentation.dependencies.queue_messanger import get_rabbit
|
||||
from src.presentation.dependencies.unit_of_work import get_unit_of_work
|
||||
|
||||
|
||||
@@ -48,13 +47,10 @@ def get_complete_kyc_command(
|
||||
unit_of_work: IUnitOfWork = Depends(get_unit_of_work),
|
||||
cache: ICache = Depends(get_cache),
|
||||
beorg_service: IBeorgService = Depends(get_beorg_service),
|
||||
queue_messanger: IQueueMessanger = Depends(get_rabbit),
|
||||
) -> CompleteKycCommand:
|
||||
return CompleteKycCommand(
|
||||
unit_of_work=unit_of_work,
|
||||
logger=logger,
|
||||
cache=cache,
|
||||
beorg_service=beorg_service,
|
||||
queue_messanger=queue_messanger,
|
||||
verified_queue=settings.RABBIT_KYC_VERIFIED_QUEUE,
|
||||
)
|
||||
@@ -1,8 +0,0 @@
|
||||
from functools import lru_cache
|
||||
from src.application.contracts import IQueueMessanger
|
||||
from src.infrastructure.messanger import RabbitClient
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_rabbit() -> IQueueMessanger:
|
||||
return RabbitClient()
|
||||
@@ -1 +0,0 @@
|
||||
from src.presentation.messaging.crypto_transfer import crypto_transfer_router
|
||||
@@ -1,39 +0,0 @@
|
||||
from fastapi import Depends
|
||||
import orjson
|
||||
from faststream.rabbit.fastapi import RabbitMessage,RabbitRouter
|
||||
from pydantic import BaseModel
|
||||
from src.application.contracts import ILogger
|
||||
from src.infrastructure.config import settings
|
||||
from src.infrastructure.context_vars import trace_id_var
|
||||
from src.presentation.dependencies.logger import get_logger
|
||||
|
||||
|
||||
crypto_transfer_router=RabbitRouter(settings.RABBIT_URL)
|
||||
|
||||
|
||||
class CryptoTransferCompletedMessage(BaseModel):
|
||||
user_id: str
|
||||
order_id: str
|
||||
trace_id: str
|
||||
message_id: str
|
||||
|
||||
|
||||
@crypto_transfer_router.subscriber(settings.RABBIT_CRYPTO_TRANSFER_COMPLETED_QUEUE)
|
||||
async def crypto_transfer_completed_handler(
|
||||
msg_body: CryptoTransferCompletedMessage,
|
||||
message: RabbitMessage,
|
||||
logger: ILogger = Depends(get_logger),
|
||||
) -> None:
|
||||
trace_id=msg_body.trace_id
|
||||
token=trace_id_var.set(trace_id)
|
||||
try:
|
||||
payload=msg_body.model_dump(mode='json')
|
||||
logger.info(orjson.dumps({
|
||||
'event':'crypto_transfer_completed_received',
|
||||
'payload':payload,
|
||||
'rabbit_message_id':message.message_id,
|
||||
'rabbit_correlation_id':message.correlation_id,
|
||||
},default=str).decode())
|
||||
finally:
|
||||
trace_id_var.reset(token)
|
||||
|
||||
Reference in New Issue
Block a user