From bf68aca4fa447934fd0ff485cd433c6f318d9e2a Mon Sep 17 00:00:00 2001 From: Noloquideus Date: Fri, 1 May 2026 13:10:13 +0300 Subject: [PATCH] feat: add full pay path --- docker-compose.yml | 62 +++--- .../abstractions/i_unit_of_work.py | 6 +- .../abstractions/repositories/__init__.py | 20 +- .../repositories/i_order_repository.py | 24 +++ .../repositories/i_payment_repository.py | 10 + .../command/create_order_command.py | 20 -- src/application/commands/__init__.py | 2 + .../commands/create_order_command.py | 81 ++++++++ .../commands/create_payment_command.py | 49 +++++ src/application/contracts/__init__.py | 4 +- src/application/contracts/i_cache.py | 4 + src/application/contracts/i_itpay_service.py | 11 ++ src/application/contracts/i_receipt.py | 24 +++ src/application/domain/entities/__init__.py | 6 +- src/application/domain/entities/order.py | 36 ++++ src/application/domain/entities/payment.py | 21 ++ src/application/domain/enums/__init__.py | 5 +- .../domain/enums/itpay_payment_status.py | 7 + src/application/domain/enums/order_status.py | 10 + .../domain/enums/payment_status.py | 12 ++ .../exceptions/application_exceptions.py | 2 +- src/infrastructure/cache/__init__.py | 5 +- src/infrastructure/cache/client.py | 5 +- src/infrastructure/cache/keydb_client.py | 3 + src/infrastructure/cache/remote_cache.py | 68 +++++++ src/infrastructure/cloud_kassir/__init__.py | 5 + src/infrastructure/cloud_kassir/client.py | 130 ++++++++++++ src/infrastructure/config/settings.py | 119 ++++++++--- .../database/models/__init__.py | 5 +- src/infrastructure/database/models/order.py | 54 +++++ src/infrastructure/database/models/payment.py | 46 +++++ .../database/models/sessions.py | 50 ----- src/infrastructure/database/models/user.py | 40 ++-- .../database/repositories/order_repository.py | 134 +++++++++++++ .../repositories/payment_repository.py | 41 ++++ src/infrastructure/database/unit_of_work.py | 21 +- .../itpay}/__init__.py | 0 src/infrastructure/itpay/client.py | 109 ++++++++++ src/infrastructure/logger/logger.py | 4 +- src/main.py | 9 +- src/presentation/decorators/cache.py | 13 +- src/presentation/dependencies/__init__.py | 23 +-- src/presentation/dependencies/cache.py | 32 ++- src/presentation/dependencies/commands.py | 56 ++++-- .../handlers/application_handler.py | 2 +- src/presentation/messaging/__init__.py | 5 + src/presentation/messaging/crypto_transfer.py | 39 ++++ src/presentation/routing/order.py | 186 +++++++----------- .../schemas/itpay_payment_base.py | 15 ++ .../schemas/itpay_payment_completed_event.py | 17 ++ .../schemas/itpay_payment_models.py | 93 +++++++++ .../schemas/itpay_payment_pay_event.py | 17 ++ src/presentation/schemas/order.py | 8 +- 53 files changed, 1436 insertions(+), 334 deletions(-) create mode 100644 src/application/abstractions/repositories/i_order_repository.py create mode 100644 src/application/abstractions/repositories/i_payment_repository.py delete mode 100644 src/application/command/create_order_command.py create mode 100644 src/application/commands/__init__.py create mode 100644 src/application/commands/create_order_command.py create mode 100644 src/application/commands/create_payment_command.py create mode 100644 src/application/contracts/i_itpay_service.py create mode 100644 src/application/contracts/i_receipt.py create mode 100644 src/application/domain/entities/order.py create mode 100644 src/application/domain/entities/payment.py create mode 100644 src/application/domain/enums/itpay_payment_status.py create mode 100644 src/application/domain/enums/order_status.py create mode 100644 src/application/domain/enums/payment_status.py create mode 100644 src/infrastructure/cache/remote_cache.py create mode 100644 src/infrastructure/cloud_kassir/__init__.py create mode 100644 src/infrastructure/cloud_kassir/client.py create mode 100644 src/infrastructure/database/models/order.py create mode 100644 src/infrastructure/database/models/payment.py delete mode 100644 src/infrastructure/database/models/sessions.py create mode 100644 src/infrastructure/database/repositories/order_repository.py create mode 100644 src/infrastructure/database/repositories/payment_repository.py rename src/{application/command => infrastructure/itpay}/__init__.py (100%) create mode 100644 src/infrastructure/itpay/client.py create mode 100644 src/presentation/messaging/__init__.py create mode 100644 src/presentation/messaging/crypto_transfer.py create mode 100644 src/presentation/schemas/itpay_payment_base.py create mode 100644 src/presentation/schemas/itpay_payment_completed_event.py create mode 100644 src/presentation/schemas/itpay_payment_models.py create mode 100644 src/presentation/schemas/itpay_payment_pay_event.py diff --git a/docker-compose.yml b/docker-compose.yml index 9f6ecb4..26bd360 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,37 +20,37 @@ services: restart: no keydb: - image: eqalpha/keydb - container_name: keydb - restart: no - expose: - - "6379" - volumes: - - keydb_data:/data - command: - - keydb-server - - --requirepass - - keydb - - --dir - - /data - - --appendonly - - "yes" - - --appendfsync - - everysec - - --save - - "900" - - "1" - - --save - - "300" - - "10" - - --save - - "60" - - "10000" - healthcheck: - test: [ "CMD", "redis-cli", "-a", "keydb", "ping" ] - interval: 5s - timeout: 2s - retries: 20 + image: eqalpha/keydb + container_name: keydb + restart: no + expose: + - "6379" + volumes: + - keydb_data:/data + command: + - keydb-server + - --requirepass + - ${KEYDB_PASSWORD} + - --dir + - /data + - --appendonly + - "yes" + - --appendfsync + - everysec + - --save + - "900" + - "1" + - --save + - "300" + - "10" + - --save + - "60" + - "10000" + healthcheck: + test: ["CMD", "redis-cli", "-a", "${KEYDB_PASSWORD}", "ping"] + interval: 5s + timeout: 2s + retries: 20 volumes: keydb_data: \ No newline at end of file diff --git a/src/application/abstractions/i_unit_of_work.py b/src/application/abstractions/i_unit_of_work.py index 6a706bc..7db4a2d 100644 --- a/src/application/abstractions/i_unit_of_work.py +++ b/src/application/abstractions/i_unit_of_work.py @@ -1,6 +1,6 @@ from __future__ import annotations from typing import Protocol, runtime_checkable -from src.application.abstractions.repositories import IUserRepository, ISessionRepository +from src.application.abstractions.repositories import IOrderRepository,IPaymentRepository @runtime_checkable @@ -12,8 +12,8 @@ class IUnitOfWork(Protocol): async def rollback(self) -> None: ... @property - def user_repository(self) -> IUserRepository: ... + def order_repository(self) -> IOrderRepository: ... @property - def session_repository(self) -> ISessionRepository: ... + def payment_repository(self) -> IPaymentRepository: ... diff --git a/src/application/abstractions/repositories/__init__.py b/src/application/abstractions/repositories/__init__.py index 5322e20..a7a94c4 100644 --- a/src/application/abstractions/repositories/__init__.py +++ b/src/application/abstractions/repositories/__init__.py @@ -1,18 +1,2 @@ -from __future__ import annotations - -from typing import Protocol,runtime_checkable - -from src.application.domain.entities import SessionEntity,UserEntity - - -@runtime_checkable -class IUserRepository(Protocol): - ... - - -@runtime_checkable -class ISessionRepository(Protocol): - ... - - -__all__=['IUserRepository','ISessionRepository','UserEntity','SessionEntity'] +from src.application.abstractions.repositories.i_order_repository import IOrderRepository +from src.application.abstractions.repositories.i_payment_repository import IPaymentRepository \ No newline at end of file diff --git a/src/application/abstractions/repositories/i_order_repository.py b/src/application/abstractions/repositories/i_order_repository.py new file mode 100644 index 0000000..38527de --- /dev/null +++ b/src/application/abstractions/repositories/i_order_repository.py @@ -0,0 +1,24 @@ +from abc import ABC,abstractmethod + +from src.application.domain.entities.order import OrderEntity + + +class IOrderRepository(ABC): + @abstractmethod + async def create(self,order: OrderEntity) -> OrderEntity: + raise NotImplementedError + + + @abstractmethod + async def get_by_client_payment_id(self,client_payment_id: str) -> OrderEntity | None: + raise NotImplementedError + + + @abstractmethod + async def update_after_itpay_payment_created(self,order: OrderEntity) -> OrderEntity: + raise NotImplementedError + + + @abstractmethod + async def update_after_itpay_failure(self,order: OrderEntity) -> OrderEntity: + raise NotImplementedError \ No newline at end of file diff --git a/src/application/abstractions/repositories/i_payment_repository.py b/src/application/abstractions/repositories/i_payment_repository.py new file mode 100644 index 0000000..1fd1948 --- /dev/null +++ b/src/application/abstractions/repositories/i_payment_repository.py @@ -0,0 +1,10 @@ +from abc import ABC,abstractmethod + +from src.infrastructure.database.models.payment import Payment + + +class IPaymentRepository(ABC): + @abstractmethod + async def create_completed(self,*,user_id:str,order_id:str,itpay_payment_id:str,itpay_paid_amount:str|None,transaction_id:str|None,paid_at:str|None,expired_date:str|None) -> Payment: + raise NotImplementedError + diff --git a/src/application/command/create_order_command.py b/src/application/command/create_order_command.py deleted file mode 100644 index f227fe6..0000000 --- a/src/application/command/create_order_command.py +++ /dev/null @@ -1,20 +0,0 @@ -from src.infrastructure.database.decorators import transactional -from src.presentation.schemas.order import CreateOrder - - -class UserLoginStartCommand: - def __init__( - self, - ): - pass - - - @transactional - async def __call__(self, payment_data: CreateOrder) -> bool: - - - metadata: dict = { - 'user_id': str(payment_data.user_id), - } - - diff --git a/src/application/commands/__init__.py b/src/application/commands/__init__.py new file mode 100644 index 0000000..9fcf0d9 --- /dev/null +++ b/src/application/commands/__init__.py @@ -0,0 +1,2 @@ +from src.application.commands.create_order_command import CreateOrderCommand +from src.application.commands.create_payment_command import CreatePaymentCommand \ No newline at end of file diff --git a/src/application/commands/create_order_command.py b/src/application/commands/create_order_command.py new file mode 100644 index 0000000..f3abf25 --- /dev/null +++ b/src/application/commands/create_order_command.py @@ -0,0 +1,81 @@ +from __future__ import annotations +from datetime import datetime, timezone +from decimal import Decimal, ROUND_UP +from ulid import ULID +from src.application.abstractions import IUnitOfWork +from src.application.contracts import ICache, ILogger +from src.application.contracts import IItPayService +from src.application.domain.entities.order import OrderEntity +from src.application.domain.enums import OrderStatus +from src.application.domain.exceptions import ApplicationException +from src.infrastructure.database.decorators import transactional +from src.presentation.schemas.order import CreateOrder + + +class CreateOrderCommand: + + def __init__( + self, + *, + unit_of_work: IUnitOfWork, + logger: ILogger, + cache_local: ICache, + remote_cache: ICache, + itpay_service: IItPayService, + ) -> None: + self._unit_of_work = unit_of_work + self._logger = logger + self._cache_local = cache_local + self._remote_cache = remote_cache + self._itpay_service = itpay_service + + + @transactional + async def __call__(self, payment_data: CreateOrder, user_id: str) -> OrderEntity: + client_payment_id = str(ULID()) + + rate_raw = await self._remote_cache.hget('tradex:rub:rate','value') + gas_raw = await self._remote_cache.hget('gwei:eth:last','normal_rub') + + if rate_raw is None: + self._logger.error('Exchange rate unavailable') + rate_raw = '2.00' + #raise ApplicationException(status_code=503, message='Exchange rate unavailable') + + if gas_raw is None: + self._logger.error('Exchange gas unavailable') + gas_raw = '1.00' + #raise ApplicationException(status_code=503, message='Exchange gas unavailable') + + actual_gas_fee = Decimal(gas_raw).quantize(Decimal('0.00'), rounding=ROUND_UP) + actual_usdt_exchange_rate = Decimal(rate_raw).quantize(Decimal('0.00'), rounding=ROUND_UP) + actual_service_fee = (payment_data.usdt_amount * actual_usdt_exchange_rate * Decimal('0.04')).quantize(Decimal('0.01')) + actual_total_price = (payment_data.usdt_amount * actual_usdt_exchange_rate + actual_service_fee + actual_gas_fee).quantize(Decimal('0.01')) + if actual_total_price > payment_data.total_price * Decimal('1.01'): + self._logger.error('Price has changed, please refresh and try again') + raise ApplicationException(status_code=409, message='Price has changed, please refresh and try again') + + order = OrderEntity( + user_id=user_id, + usdt_amount=payment_data.usdt_amount, + usdt_exchange_rate=actual_usdt_exchange_rate, + gas_fee=actual_gas_fee, + service_fee=actual_service_fee, + total_price=actual_total_price, + status=OrderStatus.PENDING, + created_at=datetime.now(timezone.utc), + client_payment_id=client_payment_id, + ) + + saved = await self._unit_of_work.order_repository.create(order) + with_itpay = await self._itpay_service.create_payment(saved) + if with_itpay.status in ( + OrderStatus.CANCELLED, + OrderStatus.REJECTED, + OrderStatus.ERROR, + ): + await self._unit_of_work.order_repository.update_after_itpay_failure(with_itpay) + else: + await self._unit_of_work.order_repository.update_after_itpay_payment_created(with_itpay) + return with_itpay + diff --git a/src/application/commands/create_payment_command.py b/src/application/commands/create_payment_command.py new file mode 100644 index 0000000..14f634a --- /dev/null +++ b/src/application/commands/create_payment_command.py @@ -0,0 +1,49 @@ +from __future__ import annotations +from ulid import ULID +from src.application.abstractions import IUnitOfWork +from src.application.contracts import ILogger,IQueueMessanger +from src.application.domain.exceptions import ApplicationException +from src.infrastructure.config import settings +from src.infrastructure.database.decorators import transactional +from src.presentation.schemas.itpay_payment_models import ItpayPaymentData + + +class CreatePaymentCommand: + def __init__(self, *, unit_of_work: IUnitOfWork, logger: ILogger, queue_messanger: IQueueMessanger): + self._unit_of_work = unit_of_work + self._logger = logger + self._queue_messanger = queue_messanger + + @transactional + async def __call__(self, payment: ItpayPaymentData) -> None: + if str(payment.status).strip().lower() != 'completed': + return + metadata = payment.metadata or {} + order_id = str(metadata.get('order_id') or '') + user_id = str(metadata.get('user_id') or '') + if not order_id: + raise ApplicationException(status_code=400, message='Itpay webhook metadata missing order_id') + if not user_id: + raise ApplicationException(status_code=400, message='Itpay webhook metadata missing user_id') + await self._unit_of_work.payment_repository.create_completed( + user_id=user_id, + order_id=order_id, + itpay_payment_id=str(payment.id), + itpay_paid_amount=str(payment.amount) if payment.amount is not None else None, + transaction_id=str(payment.transaction_id) if payment.transaction_id is not None else None, + paid_at=str(payment.paid) if payment.paid is not None else None, + expired_date=str(payment.expired_date) if payment.expired_date is not None else None, + ) + message_id = str(ULID()) + message: dict[str,str] = { + 'order_id': order_id, + 'user_id': user_id, + 'trace_id': self._logger.get_trace_id(), + 'message_id': message_id, + } + await self._queue_messanger.publish_to_queue( + queue=settings.RABBIT_CRYPTO_TRANSFER_QUEUE, + message=message, + message_id=message_id, + correlation_id=message['trace_id'], + ) diff --git a/src/application/contracts/__init__.py b/src/application/contracts/__init__.py index 055d045..8f9d6a6 100644 --- a/src/application/contracts/__init__.py +++ b/src/application/contracts/__init__.py @@ -3,4 +3,6 @@ from src.application.contracts.i_jwt_service import IJwtService from src.application.contracts.i_csrf_service import ICsrfService from src.application.contracts.i_cache import ICache from src.application.contracts.i_hash_service import IHashService -from src.application.contracts.i_queue_messanger import IQueueMessanger \ No newline at end of file +from src.application.contracts.i_queue_messanger import IQueueMessanger +from src.application.contracts.i_itpay_service import IItPayService +from src.application.contracts.i_receipt import IReceipt \ No newline at end of file diff --git a/src/application/contracts/i_cache.py b/src/application/contracts/i_cache.py index 9627ad8..7d59d8f 100644 --- a/src/application/contracts/i_cache.py +++ b/src/application/contracts/i_cache.py @@ -17,6 +17,10 @@ class ICache(ABC): async def get(self, key: str) -> str | None: raise NotImplementedError + @abstractmethod + async def hget(self, key: str, field: str) -> str | None: + raise NotImplementedError + @abstractmethod async def delete(self, key: str) -> bool: raise NotImplementedError diff --git a/src/application/contracts/i_itpay_service.py b/src/application/contracts/i_itpay_service.py new file mode 100644 index 0000000..fbc5c71 --- /dev/null +++ b/src/application/contracts/i_itpay_service.py @@ -0,0 +1,11 @@ +from abc import ABC,abstractmethod + +from src.application.domain.entities.order import OrderEntity + + +class IItPayService(ABC): + + + @abstractmethod + async def create_payment(self,order: OrderEntity) -> OrderEntity: + pass diff --git a/src/application/contracts/i_receipt.py b/src/application/contracts/i_receipt.py new file mode 100644 index 0000000..c6ece64 --- /dev/null +++ b/src/application/contracts/i_receipt.py @@ -0,0 +1,24 @@ +from abc import ABC,abstractmethod +from decimal import Decimal +from typing import Any + + +class IReceipt(ABC): + @abstractmethod + async def create_receipt( + self, + *, + order_id: str, + user_id: str, + email: str, + total_amount: Decimal, + principal_amount: Decimal, + service_fee: Decimal, + phone: str | None = None, + customer_inn: str = '', + success_url: str | None = None, + fail_url: str | None = None, + request_id: str | None = None, + ) -> dict[str,Any]: + raise NotImplementedError + diff --git a/src/application/domain/entities/__init__.py b/src/application/domain/entities/__init__.py index 7b2df0e..88fc910 100644 --- a/src/application/domain/entities/__init__.py +++ b/src/application/domain/entities/__init__.py @@ -1,5 +1,5 @@ -from src.application.domain.entities.user import UserEntity -from src.application.domain.entities.session import SessionEntity +from src.application.domain.entities.order import OrderEntity +from src.application.domain.entities.payment import PaymentEntity -__all__ = ['UserEntity', 'SessionEntity'] \ No newline at end of file +__all__ = ['PaymentEntity', 'OrderEntity'] \ No newline at end of file diff --git a/src/application/domain/entities/order.py b/src/application/domain/entities/order.py new file mode 100644 index 0000000..d5e563e --- /dev/null +++ b/src/application/domain/entities/order.py @@ -0,0 +1,36 @@ +from __future__ import annotations +from dataclasses import dataclass +from datetime import datetime +from decimal import Decimal +from src.application.domain.enums import OrderStatus + + +@dataclass(slots=True) +class OrderEntity: + id: str | None = None + created_at: datetime | None = None + updated_at: datetime | None = None + + user_id: str | None = None + usdt_amount: Decimal | None = None + usdt_exchange_rate: Decimal | None = None + gas_fee: Decimal | None = None + total_price: Decimal | None = None + service_fee: Decimal | None = None + status: OrderStatus | None = None + + client_payment_id: str | None = None + + itpay_payment_qr_url_desktop: str | None = None + itpay_payment_qr_url_android: str | None = None + itpay_payment_qr_url_ios: str | None = None + + itpay_payment_qr_image_desktop: str | None = None + itpay_payment_qr_image_android: str | None = None + itpay_payment_qr_image_ios: str | None = None + + itpay_id: str | None = None + itpay_qr_id: str | None = None + itpay_amount: Decimal | None = None + itpay_created_at: datetime | None = None + diff --git a/src/application/domain/entities/payment.py b/src/application/domain/entities/payment.py new file mode 100644 index 0000000..6c9461b --- /dev/null +++ b/src/application/domain/entities/payment.py @@ -0,0 +1,21 @@ +from __future__ import annotations +from dataclasses import dataclass +from datetime import datetime +from decimal import Decimal +from src.application.domain.enums import PaymentStatus + + +@dataclass(slots=True) +class PaymentEntity: + user_id: str | None = None + + order_id: str | None = None + status: PaymentStatus | None = None + + receipt_cloudekassir_link: str | None = None + + itpay_payment_id: str | None = None + transaction_id: str | None = None + web3_transaction_hash: str | None = None + paid_at: str | None = None + expired_date: str | None = None diff --git a/src/application/domain/enums/__init__.py b/src/application/domain/enums/__init__.py index f2785a9..0dbe6df 100644 --- a/src/application/domain/enums/__init__.py +++ b/src/application/domain/enums/__init__.py @@ -1,2 +1,5 @@ from src.application.domain.enums.log_level import LogLevel -from src.application.domain.enums.log_format import LogFormat \ No newline at end of file +from src.application.domain.enums.log_format import LogFormat +from src.application.domain.enums.itpay_payment_status import ItPayPaymentStatus +from src.application.domain.enums.order_status import OrderStatus +from src.application.domain.enums.payment_status import PaymentStatus \ No newline at end of file diff --git a/src/application/domain/enums/itpay_payment_status.py b/src/application/domain/enums/itpay_payment_status.py new file mode 100644 index 0000000..bd1506f --- /dev/null +++ b/src/application/domain/enums/itpay_payment_status.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class ItPayPaymentStatus(Enum): + COMPLETED = "payment.completed" + REJECTED = "payment.rejected" + CANCELLED = "payment.canceled" \ No newline at end of file diff --git a/src/application/domain/enums/order_status.py b/src/application/domain/enums/order_status.py new file mode 100644 index 0000000..3486e2e --- /dev/null +++ b/src/application/domain/enums/order_status.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class OrderStatus(str, Enum): + PENDING = 'pending' + REJECTED = 'rejected' + COMPLETED = 'completed' + CANCELLED = 'cancelled' + ERROR = 'error' + CANCELED = 'canceled' \ No newline at end of file diff --git a/src/application/domain/enums/payment_status.py b/src/application/domain/enums/payment_status.py new file mode 100644 index 0000000..96faf26 --- /dev/null +++ b/src/application/domain/enums/payment_status.py @@ -0,0 +1,12 @@ +from enum import Enum + + +class PaymentStatus(str,Enum): + PENDING='pending' + MONEY_ACCEPTED='money_accepted' + WEB3_HASH_ERROR='web3_hash_error' + WEB3_BALANCE_PROBLEM='web3_balance_problem' + USDT_DELIVERED='usdt_delivered' + RECEIPT_ERROR='receipt_error' + COMPLETED='completed' + diff --git a/src/application/domain/exceptions/application_exceptions.py b/src/application/domain/exceptions/application_exceptions.py index 5006dee..7396ceb 100644 --- a/src/application/domain/exceptions/application_exceptions.py +++ b/src/application/domain/exceptions/application_exceptions.py @@ -15,4 +15,4 @@ class ApplicationException(Exception): self.headers = headers def __str__(self): - return f"{self.status_code}: {self.message}" + return f'{self.status_code}: {self.message}' diff --git a/src/infrastructure/cache/__init__.py b/src/infrastructure/cache/__init__.py index 552c4f5..12bf9af 100644 --- a/src/infrastructure/cache/__init__.py +++ b/src/infrastructure/cache/__init__.py @@ -1,2 +1,5 @@ from src.infrastructure.cache.client import create_redis_client -from src.infrastructure.cache.keydb_client import KeydbCache \ No newline at end of file +from src.infrastructure.cache.keydb_client import KeydbCache +from src.infrastructure.cache.remote_cache import RemoteCache + +__all__ = ['create_redis_client', 'KeydbCache', 'RemoteCache'] diff --git a/src/infrastructure/cache/client.py b/src/infrastructure/cache/client.py index 4c8b59a..b977f24 100644 --- a/src/infrastructure/cache/client.py +++ b/src/infrastructure/cache/client.py @@ -3,9 +3,10 @@ from redis.asyncio.client import Redis from src.infrastructure.config import settings -def create_redis_client() -> Redis: +def create_redis_client(url:str|None=None) -> Redis: + redis_url = url or settings.KEYDB_REMOTE_URL return redis.from_url( - settings.REDIS_URL, + redis_url, max_connections=50, decode_responses=True, socket_timeout=5, diff --git a/src/infrastructure/cache/keydb_client.py b/src/infrastructure/cache/keydb_client.py index 17d98be..494c3e9 100644 --- a/src/infrastructure/cache/keydb_client.py +++ b/src/infrastructure/cache/keydb_client.py @@ -20,6 +20,9 @@ class KeydbCache(ICache): async def get(self, key: str) -> str | None: return await self._r.get(key) + async def hget(self, key: str, field: str) -> str | None: + return await self._r.hget(key, field) + async def delete(self, key: str) -> bool: return (await self._r.delete(key)) > 0 diff --git a/src/infrastructure/cache/remote_cache.py b/src/infrastructure/cache/remote_cache.py new file mode 100644 index 0000000..892ad3a --- /dev/null +++ b/src/infrastructure/cache/remote_cache.py @@ -0,0 +1,68 @@ +from __future__ import annotations +import orjson +from redis.asyncio.client import Redis +from src.application.contracts import ICache +from src.application.domain.entities.user import UserEntity + + +class RemoteCache(ICache): + + + USER_PREFIX = 'user:me' + + + def __init__(self,redis_client: Redis) -> None: + self._r = redis_client + + + async def set(self,key: str,value: str,ttl: int) -> bool: + return bool(await self._r.set(key,value,ex=ttl)) + + + async def set_nx(self,key: str,value: str,ttl: int) -> bool: + return bool(await self._r.set(key,value,ex=ttl,nx=True)) + + + async def get(self,key: str) -> str | None: + mapping = await self._r.hgetall(key) + if not mapping: + return None + return mapping.get('usdt_rub') + + + async def hget(self,key: str,field: str) -> str | None: + return await self._r.hget(key,field) + + + async def delete(self,key: str) -> bool: + return (await self._r.delete(key)) > 0 + + + async def get_user(self,user_id: str) -> dict | None: + raw = await self._r.get(f'{self.USER_PREFIX}:{user_id}') + if raw is None: + return None + return orjson.loads(raw) + + + async def set_user(self,user_id: str,user: UserEntity,ttl: int = 300) -> None: + data = orjson.dumps({ + 'id': user.id, + 'email': user.email, + 'first_name': user.first_name, + 'middle_name': user.middle_name, + 'last_name': user.last_name, + 'birth_date': str(user.birth_date) if user.birth_date else None, + 'crypto_wallet': user.crypto_wallet, + 'phone': user.phone, + 'bik': user.bik, + 'account_number': user.account_number, + 'card_number': user.card_number, + 'inn': user.inn, + 'kyc_verified': user.kyc_verified, + 'is_deleted': user.is_deleted, + 'created_at': user.created_at.isoformat() if user.created_at else None, + 'updated_at': user.updated_at.isoformat() if user.updated_at else None, + 'kyc_verified_at': user.kyc_verified_at.isoformat() if user.kyc_verified_at else None, + }) + await self._r.set(f'{self.USER_PREFIX}:{user_id}',data,ex=ttl) diff --git a/src/infrastructure/cloud_kassir/__init__.py b/src/infrastructure/cloud_kassir/__init__.py new file mode 100644 index 0000000..c43ad11 --- /dev/null +++ b/src/infrastructure/cloud_kassir/__init__.py @@ -0,0 +1,5 @@ +from src.infrastructure.cloud_kassir.client import ClaudeKassirClient + + +__all__=['ClaudeKassirClient'] + diff --git a/src/infrastructure/cloud_kassir/client.py b/src/infrastructure/cloud_kassir/client.py new file mode 100644 index 0000000..004cb29 --- /dev/null +++ b/src/infrastructure/cloud_kassir/client.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from decimal import Decimal +from typing import Any +from ulid import ULID +import aiohttp +from aiohttp import BasicAuth, ClientTimeout +from src.application.contracts import IReceipt +from src.application.domain.exceptions import ApplicationException + + +class ClaudeKassirClient(IReceipt): + def __init__( + self, + *, + public_id: str, + api_secret: str, + inn: str, + api_base_url: str = 'https://api.cloudpayments.ru', + success_url: str | None = None, + fail_url: str | None = None, + timeout_seconds: float = 30, + ) -> None: + self._public_id = public_id + self._api_secret = api_secret + self._inn = inn + self._api_base_url = api_base_url.rstrip('/') + self._success_url = success_url + self._fail_url = fail_url + self._timeout = ClientTimeout(total=timeout_seconds) + + async def create_receipt( + self, + *, + order_id: str, + user_id: str, + email: str, + total_amount: Decimal, + principal_amount: Decimal, + service_fee: Decimal, + phone: str | None = None, + customer_inn: str = '', + success_url: str | None = None, + fail_url: str | None = None, + request_id: str | None = None, + ) -> dict[str, Any]: + total = total_amount.quantize(Decimal('0.01')) + principal = principal_amount.quantize(Decimal('0.01')) + fee = service_fee.quantize(Decimal('0.01')) + description = f'Исполнение поручения принципала по заявке №{order_id}' + fee_description = f'Агентское вознаграждение за исполнение поручения по заявке №{order_id}' + payload: dict[str, Any] = { + 'Inn': self._inn, + 'Type': 'Income', + 'InvoiceId': order_id, + 'AccountId': user_id, + 'Description': description, + 'CustomerReceipt': { + 'Items': [ + { + 'label': description, + 'price': float(principal), + 'quantity': 1.00, + 'amount': float(principal), + 'vat': 0, + 'method': 4, + 'object': 4, + 'measurement_unit': 'шт', + 'agent_info': { + 'type': 2, + }, + 'supplier_info': { + 'name': 'Принципал (физическое лицо)', + 'inn': '', + 'phones': [], + }, + }, + { + 'label': fee_description, + 'price': float(fee), + 'quantity': 1.00, + 'amount': float(fee), + 'vat': 0, + 'method': 4, + 'object': 1, + 'measurement_unit': 'шт', + }, + ], + 'taxationSystem': 2, + 'email': email, + 'phone': phone, + 'customerInn': customer_inn, + 'agentSign': 2, + 'amounts': { + 'electronic': float(total), + 'advancePayment': 0.00, + 'credit': 0.00, + 'provision': 0.00, + }, + }, + 'Email': email, + 'SuccessUrl': success_url or self._success_url, + 'FailUrl': fail_url or self._fail_url, + } + if phone is None: + payload['CustomerReceipt'].pop('phone') + if payload['SuccessUrl'] is None: + payload.pop('SuccessUrl') + if payload['FailUrl'] is None: + payload.pop('FailUrl') + url = f'{self._api_base_url}/kkt/receipt' + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'X-Request-ID': request_id or str(ULID()), + } + try: + async with aiohttp.ClientSession(timeout=self._timeout) as session: + auth = BasicAuth(self._public_id, self._api_secret) + async with session.post(url, json=payload, headers=headers, auth=auth) as resp: + body = await resp.json(content_type=None) + if resp.status >= 400: + raise ApplicationException(status_code=502, message='Receipt provider error') + if body.get('Success') is False: + raise ApplicationException(status_code=409, message=str(body.get('Message') or 'Receipt provider rejected receipt')) + return body + except ApplicationException: + raise + except aiohttp.ClientError: + raise ApplicationException(status_code=502, message='Receipt provider unreachable') diff --git a/src/infrastructure/config/settings.py b/src/infrastructure/config/settings.py index e17a4cd..978a8e4 100644 --- a/src/infrastructure/config/settings.py +++ b/src/infrastructure/config/settings.py @@ -4,9 +4,9 @@ from functools import lru_cache from typing import List, Literal import os from dotenv import load_dotenv, find_dotenv -from pydantic import AliasChoices,Field,field_validator,model_validator +from pydantic import AliasChoices, Field, field_validator, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict -from src.infrastructure.vault import create_hvac_client_from_approle,read_kv2_secret +from src.infrastructure.vault import create_hvac_client_from_approle, read_kv2_secret env_file = find_dotenv(".env") if env_file: @@ -24,11 +24,11 @@ def normalize_vault_base_url(raw: str) -> str: class Settings(BaseSettings): VAULT_ADDR: str = Field(default='http://localhost:8200') - VAULT_ROLE_ID: str = Field(...,description='AppRole role_id') + VAULT_ROLE_ID: str = Field(..., description='AppRole role_id') VAULT_SECRET_ID: str = Field( ..., description='AppRole secret_id', - validation_alias=AliasChoices('VAULT_SECRET_ID','VAULT_SECRET_TOKEN'), + validation_alias=AliasChoices('VAULT_SECRET_ID', 'VAULT_SECRET_TOKEN'), ) VAULT_NAMESPACE: str | None = Field(default=None) VAULT_MOUNT_POINT: str = Field(default='secrets') @@ -74,6 +74,16 @@ class Settings(BaseSettings): REDIS_PASSWORD: str | None = None REDIS_DB: int = 0 + KEYDB_LOCAL_HOST: str | None = None + KEYDB_LOCAL_PORT: int | None = None + KEYDB_LOCAL_PASSWORD: str | None = None + KEYDB_LOCAL_DB: int | None = None + + KEYDB_REMOTE_HOST: str | None = None + KEYDB_REMOTE_PORT: int | None = None + KEYDB_REMOTE_PASSWORD: str | None = None + KEYDB_REMOTE_DB: int | None = None + RABBIT_HOST: str = "localhost" RABBIT_PORT: int = 5672 RABBIT_USER: str = "guest" @@ -83,10 +93,19 @@ class Settings(BaseSettings): RABBIT_PUBLISH_PERSIST: bool = True RABBIT_CONNECT_TIMEOUT: int = 5 RABBIT_EMAIL_CODE_QUEUE: str = "email.verification_code" + RABBIT_CRYPTO_TRANSFER_QUEUE: str = "crypto.transfer.requested" + RABBIT_CRYPTO_TRANSFER_COMPLETED_QUEUE: str = "crypto.transfer.completed" ITPAY_PUBLIC_ID: str ITPAY_API_SECRET: str + CLOUD_KASSIR_PUBLIC_ID: str = '' + CLOUD_KASSIR_API_SECRET: str = '' + CLOUD_KASSIR_INN: str = '' + CLOUD_KASSIR_API_BASE_URL: str = 'https://api.cloudpayments.ru' + CLOUD_KASSIR_SUCCESS_URL: str | None = None + CLOUD_KASSIR_FAIL_URL: str | None = None + LOG_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" LOG_FORMAT: Literal["JSON", "TEXT"] = "TEXT" @@ -98,27 +117,27 @@ class Settings(BaseSettings): populate_by_name=True, ) - @field_validator('VAULT_ADDR',mode='before') + @field_validator('VAULT_ADDR', mode='before') @classmethod def vault_addr_scheme(cls, v): - if v is None or not isinstance(v,str): + if v is None or not isinstance(v, str): return v return normalize_vault_base_url(v) @model_validator(mode="before") @classmethod def load_from_vault(cls, data: dict): - if not isinstance(data,dict): + if not isinstance(data, dict): return data addr_raw = data.get('VAULT_ADDR') or os.getenv('VAULT_ADDR') or 'http://localhost:8200' addr = normalize_vault_base_url(addr_raw) data['VAULT_ADDR'] = addr role_id = data.get('VAULT_ROLE_ID') or os.getenv('VAULT_ROLE_ID') secret_id = ( - data.get('VAULT_SECRET_ID') - or data.get('VAULT_SECRET_TOKEN') - or os.getenv('VAULT_SECRET_ID') - or os.getenv('VAULT_SECRET_TOKEN') + data.get('VAULT_SECRET_ID') + or data.get('VAULT_SECRET_TOKEN') + or os.getenv('VAULT_SECRET_ID') + or os.getenv('VAULT_SECRET_TOKEN') ) namespace = data.get('VAULT_NAMESPACE') if namespace is None: @@ -141,7 +160,7 @@ class Settings(BaseSettings): ) def read_secret(path: str) -> dict: - return read_kv2_secret(client=client,mount_point=mount,path=path) + return read_kv2_secret(client=client, mount_point=mount, path=path) def read_secret_optional(path: str) -> dict: try: @@ -159,11 +178,11 @@ class Settings(BaseSettings): v = db_ci.get(key) if v is None: return False - if isinstance(v,str) and not v.strip(): + if isinstance(v, str) and not v.strip(): return False return True - required_db = ['host','name','user','password','port'] + required_db = ['host', 'name', 'user', 'password', 'port'] missing_db = [k for k in required_db if not db_nonempty(k)] if missing_db: raise RuntimeError(f'Vault secret database missing non-empty keys: {missing_db}') @@ -190,15 +209,49 @@ class Settings(BaseSettings): v = r_ci.get(field) if v is None: return - if isinstance(v,str) and not v.strip(): + if isinstance(v, str) and not v.strip(): return data[env_key] = int(v) if as_int else str(v).strip() - rb_set('host','RABBIT_HOST') - rb_set('port','RABBIT_PORT',as_int=True) - rb_set('user','RABBIT_USER') - rb_set('password','RABBIT_PASSWORD') - rb_set('vhost','RABBIT_VHOST') + rb_set('host', 'RABBIT_HOST') + rb_set('port', 'RABBIT_PORT', as_int=True) + rb_set('user', 'RABBIT_USER') + rb_set('password', 'RABBIT_PASSWORD') + rb_set('vhost', 'RABBIT_VHOST') + + keydb = read_secret('keydb') + k_ci = {str(k).lower(): v for k, v in keydb.items()} + + def keydb_nonempty(key: str) -> bool: + v = k_ci.get(key) + if v is None: + return False + if isinstance(v, str) and not v.strip(): + return False + return True + + missing_keydb = [] + for req in ('host', 'port'): + if not keydb_nonempty(req): + missing_keydb.append(req) + db_raw = k_ci.get('database') + if db_raw is None: + db_raw = k_ci.get('db') + if db_raw is None or (isinstance(db_raw, str) and not str(db_raw).strip()): + missing_keydb.append('database') + if missing_keydb: + raise RuntimeError( + f'Vault secret keydb missing non-empty keys: {missing_keydb} (mount={mount},path=keydb)' + ) + + data['KEYDB_REMOTE_HOST'] = str(k_ci['host']).strip() + data['KEYDB_REMOTE_PORT'] = int(k_ci['port']) + data['KEYDB_REMOTE_DB'] = int(db_raw) + pw_raw = k_ci.get('password') + if pw_raw is not None and str(pw_raw).strip(): + data['KEYDB_REMOTE_PASSWORD'] = str(pw_raw).strip() + else: + data['KEYDB_REMOTE_PASSWORD'] = None itpay_public_id = data.get('ITPAY_PUBLIC_ID') or os.getenv('ITPAY_PUBLIC_ID') itpay_api_secret = data.get('ITPAY_API_SECRET') or os.getenv('ITPAY_API_SECRET') @@ -233,8 +286,28 @@ class Settings(BaseSettings): @property def REDIS_URL(self) -> str: - auth = f":{self.REDIS_PASSWORD}@" if self.REDIS_PASSWORD else "" - return f"redis://{auth}{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}" + return self.KEYDB_REMOTE_URL + + @staticmethod + def _redis_url(*, host: str, port: int, password: str | None, db: int) -> str: + auth = f':{password}@' if password else '' + return f'redis://{auth}{host}:{port}/{db}' + + @property + def KEYDB_LOCAL_URL(self) -> str: + host = self.KEYDB_LOCAL_HOST or self.REDIS_HOST + port = int(self.KEYDB_LOCAL_PORT) if self.KEYDB_LOCAL_PORT is not None else int(self.REDIS_PORT) + password = self.KEYDB_LOCAL_PASSWORD if self.KEYDB_LOCAL_PASSWORD is not None else self.REDIS_PASSWORD + db = int(self.KEYDB_LOCAL_DB) if self.KEYDB_LOCAL_DB is not None else int(self.REDIS_DB) + return self._redis_url(host=host, port=port, password=password, db=db) + + @property + def KEYDB_REMOTE_URL(self) -> str: + host = self.KEYDB_REMOTE_HOST or self.REDIS_HOST + port = int(self.KEYDB_REMOTE_PORT) if self.KEYDB_REMOTE_PORT is not None else int(self.REDIS_PORT) + password = self.KEYDB_REMOTE_PASSWORD if self.KEYDB_REMOTE_PASSWORD is not None else self.REDIS_PASSWORD + db = int(self.KEYDB_REMOTE_DB) if self.KEYDB_REMOTE_DB is not None else int(self.REDIS_DB) + return self._redis_url(host=host, port=port, password=password, db=db) @property def RABBIT_URL(self) -> str: @@ -251,4 +324,4 @@ def get_settings() -> Settings: return Settings() -settings = get_settings() \ No newline at end of file +settings = get_settings() diff --git a/src/infrastructure/database/models/__init__.py b/src/infrastructure/database/models/__init__.py index cf032ec..16813d6 100644 --- a/src/infrastructure/database/models/__init__.py +++ b/src/infrastructure/database/models/__init__.py @@ -1,6 +1,7 @@ from src.infrastructure.database.models.base import Base +from src.infrastructure.database.models.order import Order +from src.infrastructure.database.models.payment import Payment from src.infrastructure.database.models.user import UserModel -from src.infrastructure.database.models.sessions import Session -__all__ = ['Base', 'UserModel', 'Session'] +__all__ = ['Base','Order','Payment','UserModel'] diff --git a/src/infrastructure/database/models/order.py b/src/infrastructure/database/models/order.py new file mode 100644 index 0000000..0f26e5d --- /dev/null +++ b/src/infrastructure/database/models/order.py @@ -0,0 +1,54 @@ +from __future__ import annotations +from datetime import datetime +from decimal import Decimal +from sqlalchemy import DateTime, Enum as SAEnum, ForeignKey, Numeric, String, Text +from sqlalchemy.orm import Mapped, mapped_column +from src.application.domain.enums import OrderStatus +from src.infrastructure.database.models.base import Base +from src.infrastructure.database.models.mixins import AuditTimestampsMixin, UlidPrimaryKeyMixin + + +class Order(Base, UlidPrimaryKeyMixin, AuditTimestampsMixin): + __tablename__ = 'orders' + + user_id: Mapped[str] = mapped_column( + String(26), + ForeignKey('users.id', ondelete='RESTRICT'), + nullable=False, + index=True, + ) + usdt_amount: Mapped[Decimal] = mapped_column(Numeric(38, 2), nullable=False) + usdt_exchange_rate: Mapped[Decimal] = mapped_column(Numeric(38, 2), nullable=False) + gas_fee: Mapped[Decimal] = mapped_column(Numeric(38, 2), nullable=False) + total_price: Mapped[Decimal] = mapped_column(Numeric(38, 2), nullable=False) + service_fee: Mapped[Decimal] = mapped_column(Numeric(38, 2), nullable=False) + status: Mapped[OrderStatus] = mapped_column( + SAEnum(OrderStatus,name='order_status_enum',values_callable=lambda x:[e.value for e in x]), + nullable=False, + index=True, + default=OrderStatus.PENDING, + ) + + client_payment_id: Mapped[str] = mapped_column( + String(26), + nullable=False, + unique=True, + index=True + ) + + itpay_payment_qr_url_desktop: Mapped[str | None] = mapped_column(Text, nullable=True) + itpay_payment_qr_url_android: Mapped[str | None] = mapped_column(Text, nullable=True) + itpay_payment_qr_url_ios: Mapped[str | None] = mapped_column(Text, nullable=True) + + itpay_payment_qr_image_desktop: Mapped[str | None] = mapped_column(Text, nullable=True) + itpay_payment_qr_image_android: Mapped[str | None] = mapped_column(Text, nullable=True) + itpay_payment_qr_image_ios: Mapped[str | None] = mapped_column(Text, nullable=True) + + itpay_id: Mapped[str | None] = mapped_column(String(64), nullable=True) + itpay_qr_id: Mapped[str | None] = mapped_column(String(128), nullable=True) + itpay_amount: Mapped[Decimal] = mapped_column(Numeric(38, 2), nullable=False) + itpay_created_at: Mapped[DateTime] = mapped_column( + DateTime(timezone=True), + nullable=False + ) + diff --git a/src/infrastructure/database/models/payment.py b/src/infrastructure/database/models/payment.py new file mode 100644 index 0000000..5db92b5 --- /dev/null +++ b/src/infrastructure/database/models/payment.py @@ -0,0 +1,46 @@ +from __future__ import annotations +from datetime import datetime +from decimal import Decimal +from sqlalchemy import DateTime,Enum as SAEnum,ForeignKey,Numeric,String,UniqueConstraint,Text +from sqlalchemy.orm import Mapped, mapped_column +from src.application.domain.enums import PaymentStatus +from src.infrastructure.database.models.base import Base +from src.infrastructure.database.models.mixins import AuditTimestampsMixin, UlidPrimaryKeyMixin + + +class Payment(Base, UlidPrimaryKeyMixin, AuditTimestampsMixin): + __tablename__ = 'payments' + __table_args__ = ( + UniqueConstraint('order_id', name='uq_payments_order_id'), + ) + + user_id: Mapped[str] = mapped_column( + String(26), + ForeignKey('users.id', ondelete='RESTRICT'), + nullable=False, + index=True, + ) + + order_id: Mapped[str] = mapped_column( + String(26), + ForeignKey('orders.id', ondelete='RESTRICT'), + nullable=False, + index=True, + ) + + status: Mapped[PaymentStatus] = mapped_column( + SAEnum(PaymentStatus,name='payment_status_enum',values_callable=lambda x:[e.value for e in x]), + nullable=False, + index=True, + default=PaymentStatus.PENDING, + ) + + receipt_cloudekassir_link: Mapped[str] = mapped_column(nullable=True) + + itpay_payment_id: Mapped[str | None] = mapped_column(String(64), nullable=True) + itpay_paid_amount: Mapped[Decimal | None] = mapped_column(Numeric(38, 2), nullable=True) + transaction_id: Mapped[str | None] = mapped_column(String(200), nullable=True) + web3_transaction_hash: Mapped[str | None] = mapped_column(String(128), nullable=True) + paid_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + expired_date: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + diff --git a/src/infrastructure/database/models/sessions.py b/src/infrastructure/database/models/sessions.py deleted file mode 100644 index b482d74..0000000 --- a/src/infrastructure/database/models/sessions.py +++ /dev/null @@ -1,50 +0,0 @@ -from datetime import datetime, timezone -from sqlalchemy import String, DateTime, ForeignKey, Index -from sqlalchemy.orm import Mapped, mapped_column -from ulid import ULID -from src.infrastructure.database.models import Base -from src.infrastructure.database.models.mixins import UlidPrimaryKeyMixin, AuditTimestampsMixin - - -class Session(Base, UlidPrimaryKeyMixin, AuditTimestampsMixin): - __tablename__ = "sessions" - - sid: Mapped[str] = mapped_column( - String(26), - unique=True, - index=True, - nullable=False, - default=lambda: str(ULID()), - ) - - user_id: Mapped[str] = mapped_column( - String(26), - ForeignKey("users.id", ondelete="CASCADE"), - index=True, - nullable=False, - ) - - device_id: Mapped[str] = mapped_column( - String(26), - nullable=False, - index=True, - ) - - user_agent: Mapped[str | None] = mapped_column(String(500)) - first_ip: Mapped[str | None] = mapped_column(String(64)) - last_ip: Mapped[str | None] = mapped_column(String(64)) - - last_seen_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), - nullable=False, - default=lambda: datetime.now(timezone.utc), - ) - - revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) - - refresh_jti_hash: Mapped[str | None] = mapped_column(String(255)) - refresh_expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) - - -Index("ux_sessions_user_device", Session.user_id, Session.device_id, unique=True) -Index("ix_sessions_user_active", Session.user_id, Session.revoked_at) diff --git a/src/infrastructure/database/models/user.py b/src/infrastructure/database/models/user.py index e1fa316..b0ea948 100644 --- a/src/infrastructure/database/models/user.py +++ b/src/infrastructure/database/models/user.py @@ -1,28 +1,30 @@ from __future__ import annotations -from sqlalchemy import Boolean, Date, String, DateTime -from sqlalchemy.orm import Mapped, mapped_column + +from sqlalchemy import Boolean,Date,DateTime,String +from sqlalchemy.orm import Mapped,mapped_column from src.infrastructure.database.models.base import Base -from src.infrastructure.database.models.mixins import UlidPrimaryKeyMixin, AuditTimestampsMixin, SoftDeleteMixin +from src.infrastructure.database.models.mixins import AuditTimestampsMixin,SoftDeleteMixin,UlidPrimaryKeyMixin -class UserModel(Base, UlidPrimaryKeyMixin, AuditTimestampsMixin, SoftDeleteMixin): - __tablename__ = 'users' +class UserModel(Base,UlidPrimaryKeyMixin,AuditTimestampsMixin,SoftDeleteMixin): + __tablename__='users' - email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True, index=True) - password_hash: Mapped[str] = mapped_column(String(255), nullable=False) + email: Mapped[str]=mapped_column(String(255),nullable=False,unique=True,index=True) + password_hash: Mapped[str]=mapped_column(String(255),nullable=False) - last_name: Mapped[str | None] = mapped_column(String(128), nullable=True) - first_name: Mapped[str | None] = mapped_column(String(128), nullable=True) - middle_name: Mapped[str | None] = mapped_column(String(128), nullable=True) - birth_date: Mapped[Date | None] = mapped_column(Date, nullable=True) + last_name: Mapped[str|None]=mapped_column(String(128),nullable=True) + first_name: Mapped[str|None]=mapped_column(String(128),nullable=True) + middle_name: Mapped[str|None]=mapped_column(String(128),nullable=True) + birth_date: Mapped[Date|None]=mapped_column(Date,nullable=True) - crypto_wallet: Mapped[str | None] = mapped_column(String(255), nullable=True) - phone: Mapped[str | None] = mapped_column(String(16), nullable=True) + crypto_wallet: Mapped[str|None]=mapped_column(String(255),nullable=True) + phone: Mapped[str|None]=mapped_column(String(16),nullable=True) - bik: Mapped[str | None] = mapped_column(String(9), nullable=True) - account_number: Mapped[str | None] = mapped_column(String(20), nullable=True) - card_number: Mapped[str | None] = mapped_column(String(19), nullable=True) - inn: Mapped[str | None] = mapped_column(String(12), nullable=True) + bik: Mapped[str|None]=mapped_column(String(9),nullable=True) + account_number: Mapped[str|None]=mapped_column(String(20),nullable=True) + card_number: Mapped[str|None]=mapped_column(String(19),nullable=True) + inn: Mapped[str|None]=mapped_column(String(12),nullable=True) + + kyc_verified: Mapped[bool]=mapped_column(Boolean,nullable=False,server_default='false',default=False) + kyc_verified_at: Mapped[DateTime|None]=mapped_column(DateTime(timezone=True),nullable=True) - kyc_verified: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default='false', default=False) - kyc_verified_at: Mapped[DateTime | None] = mapped_column(DateTime(timezone=True), nullable=True) diff --git a/src/infrastructure/database/repositories/order_repository.py b/src/infrastructure/database/repositories/order_repository.py new file mode 100644 index 0000000..fb135ce --- /dev/null +++ b/src/infrastructure/database/repositories/order_repository.py @@ -0,0 +1,134 @@ +from __future__ import annotations +from dataclasses import replace +from datetime import datetime,timezone +from decimal import Decimal +from sqlalchemy import select,update +from sqlalchemy.ext.asyncio import AsyncSession +from src.application.abstractions.repositories.i_order_repository import IOrderRepository +from src.application.contracts import ILogger +from src.application.domain.entities.order import OrderEntity +from src.infrastructure.database.models.order import Order + + +class OrderRepository(IOrderRepository): + def __init__(self, session: AsyncSession, logger: ILogger): + self._session = session + self._logger = logger + + + @staticmethod + def _to_entity(model: Order) -> OrderEntity: + return OrderEntity( + id=model.id, + created_at=model.created_at, + updated_at=model.updated_at, + user_id=model.user_id, + usdt_amount=model.usdt_amount, + usdt_exchange_rate=model.usdt_exchange_rate, + gas_fee=model.gas_fee, + total_price=model.total_price, + service_fee=model.service_fee, + status=model.status, + client_payment_id=model.client_payment_id, + itpay_payment_qr_url_desktop=model.itpay_payment_qr_url_desktop, + itpay_payment_qr_url_android=model.itpay_payment_qr_url_android, + itpay_payment_qr_url_ios=model.itpay_payment_qr_url_ios, + itpay_payment_qr_image_desktop=model.itpay_payment_qr_image_desktop, + itpay_payment_qr_image_android=model.itpay_payment_qr_image_android, + itpay_payment_qr_image_ios=model.itpay_payment_qr_image_ios, + itpay_id=model.itpay_id, + itpay_qr_id=model.itpay_qr_id, + itpay_amount=model.itpay_amount, + itpay_created_at=model.itpay_created_at, + ) + + + async def create(self,order: OrderEntity) -> OrderEntity: + model = Order( + user_id=order.user_id, + usdt_amount=order.usdt_amount, + usdt_exchange_rate=order.usdt_exchange_rate, + gas_fee=order.gas_fee, + total_price=order.total_price, + service_fee=order.service_fee, + status=order.status, + client_payment_id=order.client_payment_id, + itpay_payment_qr_url_desktop=None, + itpay_payment_qr_url_android=None, + itpay_payment_qr_url_ios=None, + itpay_payment_qr_image_desktop=None, + itpay_payment_qr_image_android=None, + itpay_payment_qr_image_ios=None, + itpay_id=None, + itpay_qr_id=None, + itpay_amount=Decimal('0.00'), + itpay_created_at=datetime.now(timezone.utc), + ) + self._session.add(model) + await self._session.flush() + return replace(order,id=model.id) + + + async def get_by_client_payment_id(self,client_payment_id: str) -> OrderEntity | None: + stmt=select(Order).where(Order.client_payment_id==client_payment_id) + model=await self._session.scalar(stmt) + if model is None: + return None + return self._to_entity(model) + + + async def update_after_itpay_payment_created(self,order: OrderEntity) -> OrderEntity: + if not order.id: + raise ValueError('OrderEntity.id is required') + itpay_amount: Decimal + if order.itpay_amount is None: + itpay_amount = Decimal('0.00') + else: + itpay_amount = Decimal(str(order.itpay_amount)) + itpay_created_at = order.itpay_created_at or datetime.now(timezone.utc) + stmt = ( + update(Order) + .where(Order.id == order.id) + .values( + itpay_payment_qr_url_desktop=order.itpay_payment_qr_url_desktop, + itpay_payment_qr_url_android=order.itpay_payment_qr_url_android, + itpay_payment_qr_url_ios=order.itpay_payment_qr_url_ios, + itpay_payment_qr_image_desktop=order.itpay_payment_qr_image_desktop, + itpay_payment_qr_image_android=order.itpay_payment_qr_image_android, + itpay_payment_qr_image_ios=order.itpay_payment_qr_image_ios, + itpay_id=order.itpay_id, + itpay_qr_id=order.itpay_qr_id, + itpay_amount=itpay_amount, + itpay_created_at=itpay_created_at, + ) + ) + await self._session.execute(stmt) + await self._session.flush() + return order + + + async def update_after_itpay_failure(self,order: OrderEntity) -> OrderEntity: + if not order.id: + raise ValueError('OrderEntity.id is required') + if order.status is None: + raise ValueError('OrderEntity.status is required') + itpay_amount: Decimal + if order.itpay_amount is None: + itpay_amount = Decimal('0.00') + else: + itpay_amount = Decimal(str(order.itpay_amount)) + itpay_created_at = order.itpay_created_at or datetime.now(timezone.utc) + stmt = ( + update(Order) + .where(Order.id == order.id) + .values( + status=order.status, + itpay_id=order.itpay_id, + itpay_qr_id=order.itpay_qr_id, + itpay_amount=itpay_amount, + itpay_created_at=itpay_created_at, + ) + ) + await self._session.execute(stmt) + await self._session.flush() + return order \ No newline at end of file diff --git a/src/infrastructure/database/repositories/payment_repository.py b/src/infrastructure/database/repositories/payment_repository.py new file mode 100644 index 0000000..04711f2 --- /dev/null +++ b/src/infrastructure/database/repositories/payment_repository.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from datetime import datetime +from decimal import Decimal +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from src.application.abstractions.repositories.i_payment_repository import IPaymentRepository +from src.application.contracts import ILogger +from src.application.domain.enums import PaymentStatus +from src.infrastructure.database.models.payment import Payment + + +class PaymentRepository(IPaymentRepository): + def __init__(self,session: AsyncSession,logger: ILogger): + self._session=session + self._logger=logger + + + async def create_completed(self,*,user_id:str,order_id:str,itpay_payment_id:str,itpay_paid_amount:str|None,transaction_id:str|None,paid_at:str|None,expired_date:str|None) -> Payment: + stmt=select(Payment).where(Payment.order_id==order_id) + existing=await self._session.scalar(stmt) + if existing is not None: + return existing + paid_at_dt=datetime.fromisoformat(paid_at.replace('Z','+00:00')) if paid_at else None + expired_dt=datetime.fromisoformat(expired_date.replace('Z','+00:00')) if expired_date else None + paid_amount_dec=Decimal(str(itpay_paid_amount)) if itpay_paid_amount is not None else None + model=Payment( + user_id=user_id, + order_id=order_id, + status=PaymentStatus.PENDING, + receipt_cloudekassir_link=None, + itpay_payment_id=itpay_payment_id, + itpay_paid_amount=paid_amount_dec, + transaction_id=transaction_id, + paid_at=paid_at_dt, + expired_date=expired_dt, + ) + self._session.add(model) + await self._session.flush() + return model + diff --git a/src/infrastructure/database/unit_of_work.py b/src/infrastructure/database/unit_of_work.py index c3429fc..b6ddfad 100644 --- a/src/infrastructure/database/unit_of_work.py +++ b/src/infrastructure/database/unit_of_work.py @@ -1,7 +1,9 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from src.application.abstractions import IUnitOfWork +from src.application.abstractions.repositories import IOrderRepository,IPaymentRepository from src.application.contracts import ILogger -# from src.application.abstractions.repositories import IUserRepository, ISessionRepository +from src.infrastructure.database.repositories.order_repository import OrderRepository +from src.infrastructure.database.repositories.payment_repository import PaymentRepository # from src.infrastructure.database.repositories import UserRepository, SessionRepository @@ -10,12 +12,16 @@ class UnitOfWork(IUnitOfWork): def __init__(self, session_factory: async_sessionmaker[AsyncSession], logger: ILogger): self.session_factory = session_factory self._session: AsyncSession = None + self._order_repository: IOrderRepository | None = None + self._payment_repository: IPaymentRepository | None = None # self._user_repository: IUserRepository = None # self._session_repository: ISessionRepository = None self._logger: ILogger = logger async def __aenter__(self): self._session = self.session_factory() + self._order_repository = None + self._payment_repository = None return self async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -29,6 +35,19 @@ class UnitOfWork(IUnitOfWork): self._logger.debug('Commit') await self._session.close() + @property + def order_repository(self) -> IOrderRepository: + if self._order_repository is None: + self._order_repository = OrderRepository(session=self._session, logger=self._logger) + return self._order_repository + + + @property + def payment_repository(self) -> IPaymentRepository: + if self._payment_repository is None: + self._payment_repository = PaymentRepository(session=self._session, logger=self._logger) + return self._payment_repository + # @property # def user_repository(self) -> IUserRepository: # if self._user_repository is None: diff --git a/src/application/command/__init__.py b/src/infrastructure/itpay/__init__.py similarity index 100% rename from src/application/command/__init__.py rename to src/infrastructure/itpay/__init__.py diff --git a/src/infrastructure/itpay/client.py b/src/infrastructure/itpay/client.py new file mode 100644 index 0000000..f17ccdb --- /dev/null +++ b/src/infrastructure/itpay/client.py @@ -0,0 +1,109 @@ +import orjson +from dataclasses import replace +from datetime import datetime, timezone +from decimal import Decimal +from typing import Any +import aiohttp +from aiohttp import BasicAuth, ClientTimeout +from src.application.contracts.i_itpay_service import IItPayService +from src.application.domain.entities.order import OrderEntity +from src.application.domain.enums import OrderStatus +from src.application.domain.exceptions import ApplicationException + + +class ItPayClient(IItPayService): + + def __init__( + self, + *, + public_id: str, + api_secret: str, + api_base_url: str = 'https://api.gw.itpay.ru', + timeout_seconds: float = 30, + ) -> None: + self._api_base_url = api_base_url.rstrip('/') + self._public_id = public_id + self._api_secret = api_secret + self._timeout = ClientTimeout(total=timeout_seconds) + + async def create_payment(self, order: OrderEntity) -> OrderEntity: + total = order.total_price if order.total_price is not None else Decimal('0') + amount = total if isinstance(total, Decimal) else Decimal(str(total)) + amount_str = str(amount.quantize(Decimal('0.01'))) + metadata: dict[str,Any] = { + 'order_id': order.id, + 'user_id': order.user_id, + 'usdt_amount': str(order.usdt_amount) if order.usdt_amount is not None else None, + 'usdt_exchange_rate': str(order.usdt_exchange_rate) if order.usdt_exchange_rate is not None else None, + 'gas_fee': str(order.gas_fee) if order.gas_fee is not None else None, + 'service_fee': str(order.service_fee) if order.service_fee is not None else None, + 'amount': amount_str, + } + metadata = {k:v for k,v in metadata.items() if v is not None and v != ''} + payload: dict[str, Any] = { + 'amount': amount_str, + 'client_payment_id': order.client_payment_id or '', + 'method': 'sbp', + 'description': 'CFU', + 'metadata': metadata, + } + url = f'{self._api_base_url}/v1/payments' + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + try: + async with aiohttp.ClientSession(timeout=self._timeout) as session: + auth = BasicAuth(self._public_id, self._api_secret) + async with session.post(url, json=payload, headers=headers, auth=auth) as resp: + response_text = await resp.text() + try: + response_json: dict[str, Any] = orjson.loads(response_text) + except orjson.JSONDecodeError: + response_json = {'raw': response_text} + if resp.status >= 400: + raise ApplicationException(status_code=502, message='Payment provider error') + body_raw = response_json.get('data') + body = body_raw if isinstance(body_raw, dict) else response_json + + status = str(body['status']).strip().lower() + itpay_id = str(body['id']) + + if status == 'cancelled': + return replace(order, status=OrderStatus.CANCELLED, itpay_id=itpay_id) + if status == 'rejected': + return replace(order, status=OrderStatus.REJECTED, itpay_id=itpay_id) + if status == 'error': + return replace(order, status=OrderStatus.ERROR, itpay_id=itpay_id) + + qrc_id = str(body['qrc_id']) + itpay_amount = Decimal(str(body['amount'])) + + created_norm = str(body['created']).replace('Z', '+00:00') + itpay_created_at = datetime.fromisoformat(created_norm) + + payment_qr_urls = body['payment_qr_urls'] + if isinstance(payment_qr_urls, str): + payment_qr_urls = orjson.loads(payment_qr_urls) + + payment_qr_images = body['payment_qr_images'] + if isinstance(payment_qr_images, str): + payment_qr_images = orjson.loads(payment_qr_images) + + return replace( + order, + itpay_id=itpay_id, + itpay_qr_id=qrc_id, + itpay_created_at=itpay_created_at, + itpay_amount=itpay_amount, + itpay_payment_qr_url_android=str(payment_qr_urls['android']), + itpay_payment_qr_url_ios=str(payment_qr_urls['ios']), + itpay_payment_qr_url_desktop=str(payment_qr_urls['desktop']), + itpay_payment_qr_image_android=str(payment_qr_images['android']), + itpay_payment_qr_image_ios=str(payment_qr_images['ios']), + itpay_payment_qr_image_desktop=str(payment_qr_images['desktop']), + ) + except ApplicationException: + raise + except aiohttp.ClientError: + raise ApplicationException(status_code=502, message='Payment provider unreachable') diff --git a/src/infrastructure/logger/logger.py b/src/infrastructure/logger/logger.py index 0fc2c8d..6891fdc 100644 --- a/src/infrastructure/logger/logger.py +++ b/src/infrastructure/logger/logger.py @@ -1,7 +1,7 @@ import traceback import inspect import sys -import json +import orjson from datetime import datetime from typing import Callable, Optional, Any from ulid import ULID @@ -94,7 +94,7 @@ class Logger(ILogger): log_data = self._prepare_log_data(level, message) if self.log_format == LogFormat.JSON: - log_message = json.dumps(log_data, ensure_ascii=False) + log_message = orjson.dumps(log_data).decode() else: log_message = ( f"{log_data['timestamp']} - {log_data['level']} - " diff --git a/src/main.py b/src/main.py index bbf186a..b1c8d27 100644 --- a/src/main.py +++ b/src/main.py @@ -14,6 +14,7 @@ from src.infrastructure.utils import generate_instance_id from src.infrastructure.logger import logger from src.infrastructure.config import settings from src.presentation.handlers import application_exception_handler, unhandled_exception_handler +from src.presentation.messaging import crypto_transfer_router from src.presentation.middleware import TraceIDMiddleware, SecurityHeadersMiddleware from src.presentation.routing import order_router @@ -39,7 +40,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: logger.set_instance_id(instance_id) logger.info(f'Users service instance started with id {instance_id}') - app.state.redis = create_redis_client() + app.state.redis_local = create_redis_client(settings.KEYDB_LOCAL_URL) + app.state.redis_remote = create_redis_client(settings.KEYDB_REMOTE_URL) + app.state.redis = app.state.redis_remote jwt_store = JwtKeyStore( vault_addr=settings.VAULT_ADDR, @@ -58,7 +61,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: app.state.jwt_key_store = jwt_store app.state.jwt_keys_scheduler = jwt_scheduler yield - await app.state.redis.aclose() + await app.state.redis_local.aclose() + await app.state.redis_remote.aclose() logger.info(f'Users service instance ended with id {instance_id}') @@ -73,6 +77,7 @@ app.add_exception_handler(ApplicationException, application_exception_handler) app.add_exception_handler(Exception, unhandled_exception_handler) app.include_router(order_router) +app.include_router(crypto_transfer_router) # Added middleware diff --git a/src/presentation/decorators/cache.py b/src/presentation/decorators/cache.py index a7cbdaf..ed36590 100644 --- a/src/presentation/decorators/cache.py +++ b/src/presentation/decorators/cache.py @@ -1,15 +1,19 @@ from __future__ import annotations + + import functools -from typing import Any, Awaitable, Callable +from typing import Any,Awaitable,Callable + from fastapi import Request from fastapi.responses import ORJSONResponse -from src.infrastructure.cache import KeydbCache + from src.infrastructure.logger import get_logger -from src.presentation.dependencies.cache import get_redis +from src.presentation.dependencies.cache import get_cache_remote def cached(*, prefix: str) -> Callable: + def decorator(func: Callable[..., Awaitable[Any]]): @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: @@ -31,8 +35,7 @@ def cached(*, prefix: str) -> Callable: cache_key = f'{prefix}:{user_id}' try: - redis = get_redis(request) - cache = KeydbCache(redis) + cache = get_cache_remote(request) hit = await cache.get_user(user_id) if hit is not None: logger.debug(f'Cache hit key={cache_key}') diff --git a/src/presentation/dependencies/__init__.py b/src/presentation/dependencies/__init__.py index af0fa45..bdd654b 100644 --- a/src/presentation/dependencies/__init__.py +++ b/src/presentation/dependencies/__init__.py @@ -1,16 +1,9 @@ -# from src.presentation.dependencies.commands import ( -# get_get_me_command, -# get_set_phone_command, -# get_set_crypto_wallet_start_command, -# get_set_crypto_wallet_complete_command, -# get_update_bank_details_start_command, -# get_update_bank_details_complete_command, -# get_change_password_start_command, -# get_change_password_complete_command, -# get_change_email_start_command, -# get_change_email_confirm_old_command, -# get_change_email_complete_command, -# ) -from src.presentation.dependencies.security import get_jwt_service -from src.presentation.dependencies.cache import get_redis, get_cache +from __future__ import annotations + + +from src.presentation.dependencies.cache import get_cache,get_cache_local,get_cache_remote,get_redis,get_redis_local,get_redis_remote from src.presentation.dependencies.queue_messanger import get_rabbit +from src.presentation.dependencies.security import get_jwt_service + + +__all__=['get_jwt_service','get_redis','get_redis_local','get_redis_remote','get_cache','get_cache_local','get_cache_remote','get_rabbit'] diff --git a/src/presentation/dependencies/cache.py b/src/presentation/dependencies/cache.py index fb4fc7a..de410bc 100644 --- a/src/presentation/dependencies/cache.py +++ b/src/presentation/dependencies/cache.py @@ -1,12 +1,36 @@ -from fastapi import Depends, Request +from __future__ import annotations + + +from fastapi import Depends,Request from redis.asyncio.client import Redis + from src.application.contracts import ICache -from src.infrastructure.cache import KeydbCache +from src.infrastructure.cache import KeydbCache,RemoteCache + + +def get_redis_local(request: Request) -> Redis: + return request.app.state.redis_local + + +def get_redis_remote(request: Request) -> Redis: + return request.app.state.redis_remote def get_redis(request: Request) -> Redis: - return request.app.state.redis + return request.app.state.redis_remote -def get_cache(redis_client: Redis = Depends(get_redis)) -> ICache: +def get_cache_local(redis_client: Redis = Depends(get_redis_local)) -> ICache: return KeydbCache(redis_client) + + +def get_cache_remote(redis_client: Redis = Depends(get_redis_remote)) -> ICache: + return KeydbCache(redis_client) + + +def get_remote_cache(redis_client: Redis = Depends(get_redis_remote)) -> ICache: + return RemoteCache(redis_client) + + +def get_cache(cache: ICache = Depends(get_cache_remote)) -> ICache: + return cache diff --git a/src/presentation/dependencies/commands.py b/src/presentation/dependencies/commands.py index 25311fc..b7fcb4b 100644 --- a/src/presentation/dependencies/commands.py +++ b/src/presentation/dependencies/commands.py @@ -1,17 +1,43 @@ -# from fastapi import Depends -# from src.application.abstractions import IUnitOfWork -# from src.application.commands import GetMeCommand, SetPhoneCommand, SetCryptoWalletStartCommand, SetCryptoWalletCompleteCommand, UpdateBankDetailsStartCommand, UpdateBankDetailsCompleteCommand, ChangePasswordStartCommand, ChangePasswordCompleteCommand, ChangeEmailStartCommand, ChangeEmailConfirmOldCommand, ChangeEmailCompleteCommand -# from src.application.contracts import ILogger, ICache, IQueueMessanger, IHashService -# from src.presentation.dependencies.cache import get_cache -# from src.presentation.dependencies.logger import get_logger -# from src.presentation.dependencies.queue_messanger import get_rabbit -# from src.presentation.dependencies.security import get_hash_service -# from src.presentation.dependencies.unit_of_work import get_unit_of_work +from __future__ import annotations +from fastapi import Depends +from src.application.abstractions import IUnitOfWork +from src.application.commands import CreateOrderCommand,CreatePaymentCommand +from src.application.contracts import ICache,ILogger,IQueueMessanger +from src.application.contracts.i_itpay_service import IItPayService +from src.infrastructure.config import settings +from src.infrastructure.itpay.client import ItPayClient +from src.presentation.dependencies.cache import get_cache_local,get_remote_cache +from src.presentation.dependencies.logger import get_logger +from src.presentation.dependencies.queue_messanger import get_rabbit +from src.presentation.dependencies.unit_of_work import get_unit_of_work -# def get_get_me_command( -# logger: ILogger = Depends(get_logger), -# unit_of_work: IUnitOfWork = Depends(get_unit_of_work), -# cache: ICache = Depends(get_cache), -# ) -> GetMeCommand: -# return GetMeCommand(logger=logger, unit_of_work=unit_of_work, cache=cache) +def get_itpay_service() -> IItPayService: + return ItPayClient( + public_id=settings.ITPAY_PUBLIC_ID, + api_secret=settings.ITPAY_API_SECRET, + ) + + +def get_create_order_command( + logger: ILogger = Depends(get_logger), + unit_of_work: IUnitOfWork = Depends(get_unit_of_work), + cache_local: ICache = Depends(get_cache_local), + remote_cache: ICache = Depends(get_remote_cache), + itpay_service: IItPayService = Depends(get_itpay_service), +) -> CreateOrderCommand: + return CreateOrderCommand( + unit_of_work=unit_of_work, + logger=logger, + cache_local=cache_local, + remote_cache=remote_cache, + itpay_service=itpay_service, + ) + + +def get_create_payment_command( + logger: ILogger = Depends(get_logger), + unit_of_work: IUnitOfWork = Depends(get_unit_of_work), + queue_messanger: IQueueMessanger = Depends(get_rabbit), +) -> CreatePaymentCommand: + return CreatePaymentCommand(unit_of_work=unit_of_work,logger=logger,queue_messanger=queue_messanger) \ No newline at end of file diff --git a/src/presentation/handlers/application_handler.py b/src/presentation/handlers/application_handler.py index aa68716..2835ac5 100644 --- a/src/presentation/handlers/application_handler.py +++ b/src/presentation/handlers/application_handler.py @@ -10,7 +10,7 @@ async def application_exception_handler(_request: Request, exc: ApplicationExcep return ORJSONResponse( status_code=exc.status_code, - content={"detail": detail}, + content={'detail': detail, 'status_code': exc.status_code}, headers=dict(exc.headers) if exc.headers else None, ) diff --git a/src/presentation/messaging/__init__.py b/src/presentation/messaging/__init__.py new file mode 100644 index 0000000..d0147d3 --- /dev/null +++ b/src/presentation/messaging/__init__.py @@ -0,0 +1,5 @@ +from src.presentation.messaging.crypto_transfer import crypto_transfer_router + + +__all__=['crypto_transfer_router'] + diff --git a/src/presentation/messaging/crypto_transfer.py b/src/presentation/messaging/crypto_transfer.py new file mode 100644 index 0000000..e55093b --- /dev/null +++ b/src/presentation/messaging/crypto_transfer.py @@ -0,0 +1,39 @@ +from fastapi import Depends +import orjson +from faststream.rabbit.fastapi import RabbitMessage,RabbitRouter +from pydantic import BaseModel +from src.application.contracts import ILogger +from src.infrastructure.config import settings +from src.infrastructure.context_vars import trace_id_var +from src.presentation.dependencies.logger import get_logger + + +crypto_transfer_router=RabbitRouter(settings.RABBIT_URL) + + +class CryptoTransferCompletedMessage(BaseModel): + user_id: str + order_id: str + trace_id: str + message_id: str + + +@crypto_transfer_router.subscriber(settings.RABBIT_CRYPTO_TRANSFER_COMPLETED_QUEUE) +async def crypto_transfer_completed_handler( + msg_body: CryptoTransferCompletedMessage, + message: RabbitMessage, + logger: ILogger = Depends(get_logger), +) -> None: + trace_id=msg_body.trace_id + token=trace_id_var.set(trace_id) + try: + payload=msg_body.model_dump(mode='json') + logger.info(orjson.dumps({ + 'event':'crypto_transfer_completed_received', + 'payload':payload, + 'rabbit_message_id':message.message_id, + 'rabbit_correlation_id':message.correlation_id, + },default=str).decode()) + finally: + trace_id_var.reset(token) + diff --git a/src/presentation/routing/order.py b/src/presentation/routing/order.py index 2b4d29a..6b88653 100644 --- a/src/presentation/routing/order.py +++ b/src/presentation/routing/order.py @@ -1,141 +1,103 @@ -import json -import os -from decimal import Decimal from urllib.parse import parse_qs -import aiohttp +import orjson from fastapi import APIRouter, Depends, Request from fastapi.responses import ORJSONResponse -from ulid import ULID +from src.application.commands import CreateOrderCommand +from src.application.commands import CreatePaymentCommand from src.application.contracts import ILogger from src.application.domain.dto import AuthContext -from src.application.domain.exceptions import ApplicationException -from src.presentation.decorators import csrf_protect, require_access_token +from src.application.domain.enums import OrderStatus +from src.presentation.decorators import require_access_token, csrf_protect +from src.presentation.dependencies.commands import get_create_order_command, get_create_payment_command from src.presentation.dependencies.logger import get_logger from src.presentation.schemas.order import CreateOrder -from src.infrastructure.config import settings - +from src.presentation.schemas.itpay_payment_models import ItpayPaymentData order_router = APIRouter(prefix='/order', tags=['orders']) -ITPAY_API_BASE = 'https://api.gw.itpay.ru' -HARDCODED_USDT_TO_RUB = Decimal('10') -HARDCODED_GAS_RUB = Decimal('5') -HARDCODED_OUR_COMMISSION_RUB = Decimal('5') -HARDCODED_ITPAY_TEST_AMOUNT_RUB = Decimal('20.00') - - -def _amount_rub_for_itpay(amount_usdt: Decimal) -> Decimal: - return (amount_usdt * HARDCODED_USDT_TO_RUB + HARDCODED_GAS_RUB + HARDCODED_OUR_COMMISSION_RUB).quantize(Decimal('0.01')) - - @order_router.post('/create') #@csrf_protect() async def create_order( - request: Request, - body: CreateOrder, - #auth: AuthContext = Depends(require_access_token), - logger: ILogger = Depends(get_logger), + payment_data: CreateOrder, + #auth: AuthContext = Depends(require_access_token), + command: CreateOrderCommand = Depends(get_create_order_command), + logger: ILogger = Depends(get_logger), ) -> ORJSONResponse: - amount_rub = _amount_rub_for_itpay(body.amount_usdt) - if (os.getenv('ITPAY_TEST_FORCE_20_RUB') or '').strip() == '1': - amount_rub = HARDCODED_ITPAY_TEST_AMOUNT_RUB - amount_str = str(amount_rub) - client_payment_id = str(ULID()) - payload = { - 'amount': amount_str, - 'client_payment_id': client_payment_id, - 'description': f'USDT {body.amount_usdt}', - 'metadata': { - 'user_id': '01KPSYW27JZ26HBDR3QS5J6VMS', - 'amount_usdt': str(body.amount_usdt), - 'rate': str(HARDCODED_USDT_TO_RUB), - 'gas_rub': str(HARDCODED_GAS_RUB), - 'commission_rub': str(HARDCODED_OUR_COMMISSION_RUB), - }, - } - url = f'{ITPAY_API_BASE}/v1/payments' - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - } - logger.info(json.dumps({ - 'event': 'itpay_payment_create_request', - 'client_payment_id': client_payment_id, - 'amount_usdt': str(body.amount_usdt), - 'amount_rub': amount_str, - 'url': url, - 'payload': payload, - }, ensure_ascii=False, default=str)) - try: - timeout = aiohttp.ClientTimeout(total=30) - async with aiohttp.ClientSession(timeout=timeout) as session: - auth = aiohttp.BasicAuth(settings.ITPAY_PUBLIC_ID, settings.ITPAY_API_SECRET) - async with session.post(url, json=payload, headers=headers, auth=auth) as resp: - response_text = await resp.text() - try: - response_json = json.loads(response_text) - except json.JSONDecodeError: - response_json = {'raw': response_text} - logger.info(json.dumps({ - 'event': 'itpay_payment_create_response', - 'client_payment_id': client_payment_id, - 'status': resp.status, - 'response': response_json, - }, ensure_ascii=False, default=str)) - if resp.status >= 400: - logger.warning(f'itpay payments POST {resp.status} {response_text}') - raise ApplicationException(status_code=502, message='Payment provider error') - except ApplicationException: - raise - except aiohttp.ClientError as e: - logger.error(str(e)) - raise ApplicationException(status_code=502, message='Payment provider unreachable') - return ORJSONResponse( - content={ - 'itpay': response_json, - 'client_payment_id': client_payment_id, - 'amount_usdt': str(body.amount_usdt), - 'amount_rub': amount_str, - 'hardcoded': { - 'usdt_to_rub': str(HARDCODED_USDT_TO_RUB), - 'gas_rub': str(HARDCODED_GAS_RUB), - 'commission_rub': str(HARDCODED_OUR_COMMISSION_RUB), - }, - } + #o = await command(payment_data, auth.user_id) + o = await command(payment_data, '01KPKAFN6J1NJBY15DX8JE2QYB') + itpay_error = o.status in ( + OrderStatus.CANCELLED, + OrderStatus.REJECTED, + OrderStatus.ERROR, ) - + http_code = 409 if itpay_error else 201 + content: dict = { + 'status_code': http_code, + 'order': { + 'id': o.id, + 'created_at': o.created_at.isoformat() if o.created_at is not None else None, + 'updated_at': o.updated_at.isoformat() if o.updated_at is not None else None, + 'user_id': o.user_id, + 'usdt_amount': str(o.usdt_amount) if o.usdt_amount is not None else None, + 'usdt_exchange_rate': str(o.usdt_exchange_rate) if o.usdt_exchange_rate is not None else None, + 'gas_fee': str(o.gas_fee) if o.gas_fee is not None else None, + 'total_price': str(o.total_price) if o.total_price is not None else None, + 'service_fee': str(o.service_fee) if o.service_fee is not None else None, + 'status': o.status.value if o.status is not None else None, + 'client_payment_id': o.client_payment_id, + 'itpay_payment_qr_url_desktop': o.itpay_payment_qr_url_desktop, + 'itpay_payment_qr_url_android': o.itpay_payment_qr_url_android, + 'itpay_payment_qr_url_ios': o.itpay_payment_qr_url_ios, + 'itpay_payment_qr_image_desktop': o.itpay_payment_qr_image_desktop, + 'itpay_payment_qr_image_android': o.itpay_payment_qr_image_android, + 'itpay_payment_qr_image_ios': o.itpay_payment_qr_image_ios, + 'itpay_id': o.itpay_id, + 'itpay_qr_id': o.itpay_qr_id, + 'itpay_amount': str(o.itpay_amount) if o.itpay_amount is not None else None, + 'itpay_created_at': o.itpay_created_at.isoformat() if o.itpay_created_at is not None else None, + } + } + log_ids = { + 'event': 'order_create_itpay_failed' if itpay_error else 'order_created', + 'order_id': o.id, + 'user_id': o.user_id, + 'client_payment_id': o.client_payment_id, + 'itpay_id': o.itpay_id, + 'order_status': o.status.value if o.status is not None else None, + } + logger.info(orjson.dumps(log_ids, default=str).decode()) + return ORJSONResponse(content=content, status_code=http_code) @order_router.post('/webhook/itpay') -async def itpay_webhook(request: Request, logger: ILogger = Depends(get_logger)) -> ORJSONResponse: +async def itpay_webhook( + request: Request, + payment_command: CreatePaymentCommand = Depends(get_create_payment_command), + logger: ILogger = Depends(get_logger) +) -> ORJSONResponse: raw = await request.body() ct = (request.headers.get('content-type') or '').lower() - logger.info(json.dumps({ - 'event': 'itpay_webhook_received', - 'method': request.method, - 'url': str(request.url), - 'content_type': ct, - 'body_size': len(raw), - }, ensure_ascii=False, default=str)) if 'application/json' in ct: - try: - parsed = json.loads(raw.decode('utf-8')) - except (json.JSONDecodeError, UnicodeDecodeError): - parsed = raw.decode('utf-8', errors='replace') + payload = orjson.loads(raw) elif 'application/x-www-form-urlencoded' in ct: decoded = raw.decode('utf-8', errors='replace') qs = parse_qs(decoded, keep_blank_values=True) - parsed = {k: (vals[0] if len(vals) == 1 else vals) for k, vals in qs.items()} + payload = {k: (vals[0] if len(vals) == 1 else vals) for k, vals in qs.items()} else: - parsed = raw.decode('utf-8', errors='replace') - headers = {k: v for k, v in request.headers.items() if k.lower() not in {'authorization', 'cookie'}} + payload = orjson.loads(raw) + data = payload.get('data') if isinstance(payload.get('data'), dict) else {} + status = str(data.get('status') or '').strip().lower() log_payload = { - 'event': 'itpay_webhook_payload', - 'method': request.method, - 'url': str(request.url), - 'headers': headers, - 'body': parsed, + 'event': 'itpay_webhook_received', + 'webhook_id': payload.get('id'), + 'webhook_type': payload.get('type'), + 'payment_id': data.get('id'), + 'client_payment_id': data.get('client_payment_id'), + 'payment_status': status, } - logger.info(json.dumps(log_payload, ensure_ascii=False, default=str)) + logger.info(orjson.dumps(log_payload, default=str).decode()) + if status == 'completed': + payment = ItpayPaymentData.model_validate(data) + await payment_command(payment) return ORJSONResponse(content={'status': 0}) diff --git a/src/presentation/schemas/itpay_payment_base.py b/src/presentation/schemas/itpay_payment_base.py new file mode 100644 index 0000000..d1c8049 --- /dev/null +++ b/src/presentation/schemas/itpay_payment_base.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any,Literal + +from pydantic import BaseModel + + +class ItpayEventBase(BaseModel): + id:str + object:Literal['event'] + type:str + created:datetime + data:Any + diff --git a/src/presentation/schemas/itpay_payment_completed_event.py b/src/presentation/schemas/itpay_payment_completed_event.py new file mode 100644 index 0000000..bbcda83 --- /dev/null +++ b/src/presentation/schemas/itpay_payment_completed_event.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel + +from src.presentation.schemas.itpay_payment_models import ItpayPaymentData + + +class ItpayPaymentCompletedEvent(BaseModel): + id:str + object:Literal['event'] + type:Literal['payment.completed'] + created:datetime + data:ItpayPaymentData + diff --git a/src/presentation/schemas/itpay_payment_models.py b/src/presentation/schemas/itpay_payment_models.py new file mode 100644 index 0000000..5e7cf8f --- /dev/null +++ b/src/presentation/schemas/itpay_payment_models.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel + + +class ItpayPaymentShopLegalEntity(BaseModel): + id:int|None=None + name:str|None=None + theme:str|None=None + opf_short:str|None=None + short_name:str|None=None + taxation_system:int|None=None + + +class ItpayPaymentShopPaymentMethod(BaseModel): + id:int|None=None + method:str|None=None + status:str|None=None + order:int|None=None + shop_uuid:str|None=None + is_savable:bool|None=None + payment_agent_percent:str|None=None + + +class ItpayPaymentShop(BaseModel): + id:int|None=None + name:str|None=None + description:str|None=None + legal_entity:ItpayPaymentShopLegalEntity|None=None + type:str|None=None + address:str|None=None + timezone:str|None=None + payout_currency:str|None=None + payment_methods:list[ItpayPaymentShopPaymentMethod]|None=None + + +class ItpayPaymentOrderPlace(BaseModel): + name:str|None=None + number:int|None=None + + +class ItpayPaymentOrder(BaseModel): + state:Any=None + external_id:str|None=None + number:str|None=None + place:ItpayPaymentOrderPlace|None=None + total_amount:Any=None + paid_amount:Any=None + external_updated:Any=None + external_created:Any=None + + +class ItpayPaymentQrUrls(BaseModel): + desktop:str|None=None + android:str|None=None + ios:str|None=None + + +class ItpayPaymentQrImages(BaseModel): + desktop:str|None=None + android:str|None=None + ios:str|None=None + + +class ItpayPaymentData(BaseModel): + id:str + amount:float|int|str|None=None + currency:str|None=None + payout_amount:float|int|str|None=None + payout_currency:str|None=None + created:datetime|str|None=None + updated:datetime|str|None=None + paid:datetime|str|None=None + status:str|None=None + status_code_error:Any=None + metadata:dict[str,Any]|None=None + success_url:str|None=None + success_url_description:str|None=None + qrc_id:str|None=None + transaction_id:str|None=None + description:str|None=None + shop:ItpayPaymentShop|None=None + order:ItpayPaymentOrder|None=None + method:str|None=None + payment_qr_urls:ItpayPaymentQrUrls|None=None + payment_qr_images:ItpayPaymentQrImages|None=None + client_payment_id:str|None=None + expired_date:datetime|str|None=None + payer:Any=None + diff --git a/src/presentation/schemas/itpay_payment_pay_event.py b/src/presentation/schemas/itpay_payment_pay_event.py new file mode 100644 index 0000000..6a5750c --- /dev/null +++ b/src/presentation/schemas/itpay_payment_pay_event.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel + +from src.presentation.schemas.itpay_payment_models import ItpayPaymentData + + +class ItpayPaymentPayEvent(BaseModel): + id:str + object:Literal['event'] + type:Literal['payment.pay'] + created:datetime + data:ItpayPaymentData + diff --git a/src/presentation/schemas/order.py b/src/presentation/schemas/order.py index 10f06d1..26e0e3c 100644 --- a/src/presentation/schemas/order.py +++ b/src/presentation/schemas/order.py @@ -1,6 +1,10 @@ from decimal import Decimal -from pydantic import BaseModel, Field +from pydantic import BaseModel,Field class CreateOrder(BaseModel): - amount_usdt: Decimal = Field(gt=0) + usdt_amount: Decimal = Field(gt=0, decimal_places=2, max_digits=20) + usdt_exchange_rate: Decimal = Field(gt=0, decimal_places=2, max_digits=20) + gas_fee: Decimal = Field(gt=0, decimal_places=2, max_digits=20) + total_price: Decimal = Field(gt=0, decimal_places=2, max_digits=20) +