import amqplib, { type ChannelModel, type Channel, type ConsumeMessage } from "amqplib"; import type { Logger } from "pino"; import type { RabbitMqSecret } from "../secrets/VaultClient.js"; import type { CryptoTransferCompleted, CryptoTransferRequest } from "./messageSchema.js"; import { parseCryptoTransferRequest } from "./messageSchema.js"; export const REQUEST_QUEUE = "crypto.transfer.requested"; export const COMPLETED_QUEUE = "crypto.transfer.completed"; export type TransferRequestHandler = ( message: CryptoTransferRequest, log: Logger ) => Promise; export interface AmqpClientOptions { secret: RabbitMqSecret; logger: Logger; onConnectionLost: () => void; } export class AmqpClient { private readonly logger: Logger; private readonly secret: RabbitMqSecret; private readonly onConnectionLost: () => void; private connection?: ChannelModel; private channel?: Channel; constructor(options: AmqpClientOptions) { this.secret = options.secret; this.logger = options.logger.child({ component: "amqp" }); this.onConnectionLost = options.onConnectionLost; } async connect(): Promise { this.connection = await amqplib.connect({ protocol: "amqp", hostname: this.secret.host, port: this.secret.port, username: this.secret.user, password: this.secret.password, vhost: this.secret.vhost }); this.connection.on("error", (error) => { this.logger.error({ err: error }, "amqp connection error"); }); this.connection.on("close", () => { this.logger.error("amqp connection closed"); this.onConnectionLost(); }); this.channel = await this.connection.createChannel(); await this.channel.prefetch(1); await this.channel.assertQueue(REQUEST_QUEUE, { durable: true }); await this.channel.assertQueue(COMPLETED_QUEUE, { durable: true }); this.logger.info( { host: this.secret.host, port: this.secret.port, vhost: this.secret.vhost }, "amqp connected" ); } async startConsumer(handler: TransferRequestHandler): Promise { if (!this.channel) { throw new Error("AmqpClient not connected"); } await this.channel.consume(REQUEST_QUEUE, (msg) => { if (!msg) { return; } void this.dispatch(msg, handler); }); this.logger.info({ queue: REQUEST_QUEUE }, "amqp consumer started"); } publishCompleted(payload: CryptoTransferCompleted): boolean { if (!this.channel) { throw new Error("AmqpClient not connected"); } return this.channel.sendToQueue( COMPLETED_QUEUE, Buffer.from(JSON.stringify(payload)), { contentType: "application/json", persistent: true } ); } async close(): Promise { try { await this.channel?.close(); } catch { /* ignore */ } try { await this.connection?.close(); } catch { /* ignore */ } } private async dispatch(msg: ConsumeMessage, handler: TransferRequestHandler): Promise { if (!this.channel) { return; } const channel = this.channel; let parsed: CryptoTransferRequest; try { const body = JSON.parse(msg.content.toString("utf8")) as unknown; parsed = parseCryptoTransferRequest(body); } catch (error) { this.logger.error( { err: error, raw: safeRaw(msg) }, "rejected malformed message" ); channel.ack(msg); return; } const log = this.logger.child({ trace_id: parsed.trace_id, order_id: parsed.order_id, user_id: parsed.user_id, message_id: parsed.message_id }); log.info({ event: "transfer.requested.received" }, "received from queue"); try { await handler(parsed, log); channel.ack(msg); } catch (error) { log.error({ err: error }, "infrastructure error during message handling, requeue=false"); channel.nack(msg, false, false); } } } function safeRaw(msg: ConsumeMessage): string { try { return msg.content.toString("utf8").slice(0, 512); } catch { return ""; } }