51 lines
1.8 KiB
Python
51 lines
1.8 KiB
Python
from fastapi import Depends
|
|
from faststream.rabbit.fastapi import RabbitMessage,RabbitRouter
|
|
from pydantic import BaseModel
|
|
from src.application.commands import CreateCryptoTransferCompletedCommand
|
|
from src.application.contracts import ILogger
|
|
from src.infrastructure.config import settings
|
|
from src.infrastructure.context_vars import trace_id_var
|
|
from src.presentation.dependencies.commands import get_crypto_transfer_completed_command
|
|
from src.presentation.dependencies.logger import get_logger
|
|
|
|
|
|
crypto_transfer_router=RabbitRouter(settings.RABBIT_URL)
|
|
|
|
|
|
class CryptoTransferCompletedMessage(BaseModel):
|
|
user_id: str
|
|
order_id: str
|
|
trace_id: str
|
|
message_id: str
|
|
web3_transaction_hash: str | None = None
|
|
transaction_hash: str | None = None
|
|
tx_hash: str | None = None
|
|
|
|
|
|
@crypto_transfer_router.subscriber(settings.RABBIT_CRYPTO_TRANSFER_COMPLETED_QUEUE)
|
|
async def crypto_transfer_completed_handler(
|
|
msg_body: CryptoTransferCompletedMessage,
|
|
message: RabbitMessage,
|
|
command: CreateCryptoTransferCompletedCommand = Depends(get_crypto_transfer_completed_command),
|
|
logger: ILogger = Depends(get_logger),
|
|
) -> None:
|
|
trace_id=msg_body.trace_id
|
|
token=trace_id_var.set(trace_id)
|
|
try:
|
|
payload=msg_body.model_dump(mode='json')
|
|
logger.info({
|
|
'event':'crypto_transfer_completed_received',
|
|
'payload':payload,
|
|
'rabbit_message_id':message.message_id,
|
|
'rabbit_correlation_id':message.correlation_id,
|
|
})
|
|
web3_transaction_hash=msg_body.web3_transaction_hash or msg_body.transaction_hash or msg_body.tx_hash
|
|
await command(
|
|
order_id=msg_body.order_id,
|
|
user_id=msg_body.user_id,
|
|
web3_transaction_hash=web3_transaction_hash,
|
|
)
|
|
finally:
|
|
trace_id_var.reset(token)
|
|
|