diff --git a/.dockerignore b/.dockerignore index e37db74..cca5f06 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,3 +5,4 @@ dist .gitignore *.log README.md +swagger.yaml \ No newline at end of file diff --git a/.gitignore b/.gitignore index aa0926a..f83b229 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ node_modules/ dist/ .env *.log +swagger.yaml \ No newline at end of file diff --git a/src/db/PostgresClient.ts b/src/db/PostgresClient.ts index a26b2c3..4f73b21 100644 --- a/src/db/PostgresClient.ts +++ b/src/db/PostgresClient.ts @@ -1,13 +1,14 @@ -import pg from "pg"; -import type { DatabaseSecret } from "../secrets/VaultClient.js"; +import pg from 'pg'; +import type {DatabaseSecret} from '../secrets/VaultClient.js'; const { Pool } = pg; export type PaymentStatus = - | "pending" - | "usdt_delivered" - | "web3_hash_error" - | "web3_balance_problem" + | 'pending' + | 'web3_processing' + | 'usdt_delivered' + | 'web3_hash_error' + | 'web3_balance_problem' | string; export interface PaymentRecord { @@ -15,6 +16,11 @@ export interface PaymentRecord { web3_transaction_hash: string | null; } +export interface PaymentClaimResult { + claimed: boolean; + payment: PaymentRecord | null; +} + export class PostgresClient { private readonly pool: pg.Pool; @@ -32,12 +38,12 @@ export class PostgresClient { } async ping(): Promise { - await this.pool.query("SELECT 1"); + await this.pool.query('SELECT 1'); } async getOrderUsdtAmount(orderId: string): Promise { const result = await this.pool.query<{ usdt_amount: string }>( - "SELECT usdt_amount::text AS usdt_amount FROM orders WHERE id = $1", + 'SELECT usdt_amount::text AS usdt_amount FROM orders WHERE id = $1', [orderId] ); return result.rows[0]?.usdt_amount ?? null; @@ -45,7 +51,7 @@ export class PostgresClient { async getUserEthWalletAddress(userId: string): Promise { const result = await this.pool.query<{ address: string }>( - "SELECT address FROM wallets WHERE user_id = $1 AND chain = 'ETH' LIMIT 1", + `SELECT address FROM wallets WHERE user_id = $1 AND chain = 'ETH' LIMIT 1`, [userId] ); return result.rows[0]?.address ?? null; @@ -53,15 +59,44 @@ export class PostgresClient { async getPaymentByOrderId(orderId: string): Promise { const result = await this.pool.query( - "SELECT status, web3_transaction_hash FROM payments WHERE order_id = $1", + 'SELECT status, web3_transaction_hash FROM payments WHERE order_id = $1', [orderId] ); return result.rows[0] ?? null; } + async claimPaymentForTransfer(orderId: string): Promise { + const claimed = await this.pool.query( + `UPDATE payments + SET status = 'web3_processing', + updated_at = now() + WHERE order_id = $1 + AND web3_transaction_hash IS NULL + AND status NOT IN ('web3_processing', 'usdt_delivered', 'web3_hash_error', 'web3_balance_problem') + RETURNING status, web3_transaction_hash`, + [orderId] + ); + + if (claimed.rows[0]) { + return { + claimed: true, + payment: claimed.rows[0] + }; + } + + return { + claimed: false, + payment: await this.getPaymentByOrderId(orderId) + }; + } + async setPaymentTxHash(orderId: string, txHash: string): Promise { await this.pool.query( - "UPDATE payments SET web3_transaction_hash = $2, updated_at = now() WHERE order_id = $1", + `UPDATE payments + SET web3_transaction_hash = $2, + updated_at = now() + WHERE order_id = $1 + AND status = 'web3_processing'`, [orderId, txHash] ); } @@ -73,14 +108,19 @@ export class PostgresClient { web3_transaction_hash = $2, paid_at = now(), updated_at = now() - WHERE order_id = $1`, + WHERE order_id = $1 + AND status = 'web3_processing'`, [orderId, txHash] ); } async markPaymentHashError(orderId: string): Promise { await this.pool.query( - "UPDATE payments SET status = 'web3_hash_error', updated_at = now() WHERE order_id = $1", + `UPDATE payments + SET status = 'web3_hash_error', + updated_at = now() + WHERE order_id = $1 + AND status = 'web3_processing'`, [orderId] ); } @@ -91,7 +131,8 @@ export class PostgresClient { SET status = 'web3_balance_problem', web3_transaction_hash = $2, updated_at = now() - WHERE order_id = $1`, + WHERE order_id = $1 + AND status = 'web3_processing'`, [orderId, txHash] ); } diff --git a/src/services/TransferOrchestrator.ts b/src/services/TransferOrchestrator.ts index a680bca..0c670ce 100644 --- a/src/services/TransferOrchestrator.ts +++ b/src/services/TransferOrchestrator.ts @@ -28,23 +28,15 @@ export class TransferOrchestrator { } async handle(message: CryptoTransferRequest, log: Logger): Promise { - const existing = await this.db.getPaymentByOrderId(message.order_id); + const claim = await this.db.claimPaymentForTransfer(message.order_id); + const existing = claim.payment; if (!existing) { log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack'); return; } - if (existing.status === 'usdt_delivered') { - log.info({ event: 'payment.already_delivered' }, 'already delivered, publishing completion'); - await this.publishCompleted(message, log); - return; - } - - if (existing.status === 'web3_hash_error' || existing.status === 'web3_balance_problem') { - log.warn( - { event: 'payment.terminal_state', status: existing.status }, - 'skip - payment already in terminal failure state' - ); + if (!claim.claimed) { + await this.handleAlreadyClaimed(message, existing, log); return; } @@ -198,6 +190,39 @@ export class TransferOrchestrator { 'published crypto.transfer.completed' ); } + + private async handleAlreadyClaimed( + message: CryptoTransferRequest, + existing: { status: string; web3_transaction_hash: string | null }, + log: Logger + ): Promise { + if (existing.status === 'usdt_delivered') { + log.info({ event: 'payment.already_delivered' }, 'already delivered, publishing completion'); + await this.publishCompleted(message, log); + return; + } + + if (existing.status === 'web3_processing') { + log.warn( + { event: 'payment.already_processing', tx_hash: existing.web3_transaction_hash }, + 'skip - payment is already claimed by another worker' + ); + return; + } + + if (existing.status === 'web3_hash_error' || existing.status === 'web3_balance_problem') { + log.warn( + { event: 'payment.terminal_state', status: existing.status }, + 'skip - payment already in terminal failure state' + ); + return; + } + + log.warn( + { event: 'payment.not_claimed', status: existing.status, tx_hash: existing.web3_transaction_hash }, + 'skip - payment was not claimed for transfer' + ); + } }