diff --git a/src/db/PostgresClient.ts b/src/db/PostgresClient.ts index 12f43a7..628f06a 100644 --- a/src/db/PostgresClient.ts +++ b/src/db/PostgresClient.ts @@ -1,3 +1,4 @@ +import {AsyncLocalStorage} from 'node:async_hooks'; import pg from 'pg'; import type {DatabaseSecret} from '../secrets/VaultClient.js'; @@ -25,6 +26,8 @@ export interface PaymentClaimResult { export class PostgresClient { private readonly pool: pg.Pool; + private readonly queryClient = new AsyncLocalStorage(); + constructor(secret: DatabaseSecret) { this.pool = new Pool({ host: secret.host, @@ -38,12 +41,34 @@ export class PostgresClient { }); } + + async withOrderProcessingLock(orderId: string, fn: () => Promise): Promise { + const client = await this.pool.connect(); + try { + await client.query('SELECT pg_advisory_lock(hashtext($1::text))', [orderId]); + return await this.queryClient.run(client, fn); + } finally { + await client.query('SELECT pg_advisory_unlock(hashtext($1::text))', [orderId]); + client.release(); + } + } + + + private async query(text: string, params?: unknown[]): Promise> { + const pinned = this.queryClient.getStore(); + if (pinned) { + return pinned.query(text, params); + } + return this.pool.query(text, params); + } + + async ping(): Promise { - await this.pool.query('SELECT 1'); + await this.query('SELECT 1'); } async getOrderUsdtAmount(orderId: string): Promise { - const result = await this.pool.query<{ usdt_amount: string }>( + const result = await this.query<{ usdt_amount: string }>( 'SELECT usdt_amount::text AS usdt_amount FROM orders WHERE id = $1', [orderId] ); @@ -51,7 +76,7 @@ export class PostgresClient { } async getUserEthWalletAddress(userId: string): Promise { - const result = await this.pool.query<{ address: string }>( + const result = await this.query<{ address: string }>( `SELECT address FROM wallets WHERE user_id = $1 AND chain = 'ETH' LIMIT 1`, [userId] ); @@ -59,7 +84,7 @@ export class PostgresClient { } async getPaymentByOrderId(orderId: string): Promise { - const result = await this.pool.query( + const result = await this.query( 'SELECT status, web3_transaction_hash, updated_at FROM payments WHERE order_id = $1', [orderId] ); @@ -67,13 +92,13 @@ export class PostgresClient { } async claimPaymentForTransfer(orderId: string): Promise { - const claimed = await this.pool.query( + const claimed = await this.query( `UPDATE payments SET status = 'web3_processing', updated_at = now() WHERE order_id = $1 AND web3_transaction_hash IS NULL - AND status = 'money_accepted' + AND status IN ('money_accepted', 'web3_processing') RETURNING status, web3_transaction_hash, updated_at`, [orderId] ); @@ -92,7 +117,7 @@ export class PostgresClient { } async releaseStaleWeb3Processing(orderId: string, staleMs: number): Promise { - const result = await this.pool.query<{ order_id: string }>( + const result = await this.query<{ order_id: string }>( `UPDATE payments SET status = 'money_accepted', updated_at = now() @@ -107,7 +132,7 @@ export class PostgresClient { } async setPaymentTxHash(orderId: string, txHash: string): Promise { - const result = await this.pool.query( + const result = await this.query( `UPDATE payments SET web3_transaction_hash = $2, updated_at = now() @@ -119,7 +144,7 @@ export class PostgresClient { } async markPaymentDelivered(orderId: string, txHash: string): Promise { - await this.pool.query( + await this.query( `UPDATE payments SET status = 'usdt_delivered', web3_transaction_hash = $2, @@ -132,7 +157,7 @@ export class PostgresClient { } async markPaymentHashError(orderId: string): Promise { - await this.pool.query( + await this.query( `UPDATE payments SET status = 'web3_hash_error', updated_at = now() @@ -143,7 +168,7 @@ export class PostgresClient { } async markPaymentBalanceProblem(orderId: string, txHash: string): Promise { - await this.pool.query( + await this.query( `UPDATE payments SET status = 'web3_balance_problem', web3_transaction_hash = $2, diff --git a/src/services/TransferOrchestrator.ts b/src/services/TransferOrchestrator.ts index db12989..28b0094 100644 --- a/src/services/TransferOrchestrator.ts +++ b/src/services/TransferOrchestrator.ts @@ -29,126 +29,128 @@ export class TransferOrchestrator { } async handle(message: CryptoTransferRequest, log: Logger): Promise { - let claim = await this.db.claimPaymentForTransfer(message.order_id); - let existing = claim.payment; - if (!existing) { - log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack'); - return; - } + await this.db.withOrderProcessingLock(message.order_id, async () => { + let claim = await this.db.claimPaymentForTransfer(message.order_id); + let existing = claim.payment; + if (!existing) { + log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack'); + return; + } - if (!claim.claimed && existing.status === 'web3_processing' && !existing.web3_transaction_hash) { - if (this.options.staleWeb3ProcessingMs > 0) { - const released = await this.db.releaseStaleWeb3Processing( - message.order_id, - this.options.staleWeb3ProcessingMs - ); - if (released) { - log.warn( - { event: 'payment.stale_processing_released', stale_after_ms: this.options.staleWeb3ProcessingMs }, - 'released stale web3_processing to money_accepted' + if (!claim.claimed && existing.status === 'web3_processing' && !existing.web3_transaction_hash) { + if (this.options.staleWeb3ProcessingMs > 0) { + const released = await this.db.releaseStaleWeb3Processing( + message.order_id, + this.options.staleWeb3ProcessingMs ); - claim = await this.db.claimPaymentForTransfer(message.order_id); - existing = claim.payment; + if (released) { + log.warn( + { event: 'payment.stale_processing_released', stale_after_ms: this.options.staleWeb3ProcessingMs }, + 'released stale web3_processing to money_accepted' + ); + claim = await this.db.claimPaymentForTransfer(message.order_id); + existing = claim.payment; + } } } - } - if (!existing) { - log.error({ event: 'payment.missing_after_release' }, 'no payment row after stale release'); - return; - } + if (!existing) { + log.error({ event: 'payment.missing_after_release' }, 'no payment row after stale release'); + return; + } - if (!claim.claimed) { - await this.handleAlreadyClaimed(message, existing, log); - return; - } + if (!claim.claimed) { + await this.handleAlreadyClaimed(message, existing, log); + return; + } - const usdtAmountString = await this.db.getOrderUsdtAmount(message.order_id); - if (!usdtAmountString) { - log.error({ event: 'order.not_found' }, 'order missing for order_id'); - await this.db.markPaymentHashError(message.order_id); - return; - } + const usdtAmountString = await this.db.getOrderUsdtAmount(message.order_id); + if (!usdtAmountString) { + log.error({ event: 'order.not_found' }, 'order missing for order_id'); + await this.db.markPaymentHashError(message.order_id); + return; + } - let amountUnits: bigint; - try { - amountUnits = parseUsdtAmount(usdtAmountString); - } catch (error) { - log.error( - { event: 'order.invalid_amount', err: error, usdt_amount: usdtAmountString }, - 'could not parse usdt_amount' + let amountUnits: bigint; + try { + amountUnits = parseUsdtAmount(usdtAmountString); + } catch (error) { + log.error( + { event: 'order.invalid_amount', err: error, usdt_amount: usdtAmountString }, + 'could not parse usdt_amount' + ); + await this.db.markPaymentHashError(message.order_id); + return; + } + + if (amountUnits > this.options.maxTransferAmountUnits) { + log.error( + { event: 'transfer.amount_exceeds_limit', amount_units: amountUnits.toString() }, + 'usdt_amount exceeds MAX_TRANSFER_USDT' + ); + await this.db.markPaymentHashError(message.order_id); + return; + } + + const recipient = await this.db.getUserEthWalletAddress(message.user_id); + if (!recipient || !isAddress(recipient)) { + log.error({ event: 'wallet.not_found_or_invalid', recipient }, 'no valid ETH wallet for user'); + await this.db.markPaymentHashError(message.order_id); + return; + } + + try { + await this.assertHotWalletReady(amountUnits); + } catch (error) { + log.error({ event: 'hot_wallet.not_ready', err: error }, 'hot wallet pre-check failed'); + await this.db.markPaymentHashError(message.order_id); + return; + } + + let preBalance: bigint; + try { + preBalance = await this.ethereum.getUsdtBalance(recipient); + } catch (error) { + log.error({ event: 'rpc.pre_balance_failed', err: error }, 'failed to read recipient pre-balance'); + await this.db.markPaymentHashError(message.order_id); + return; + } + + log.info( + { event: 'transfer.pre_balance', pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() }, + 'captured pre-balance' ); - await this.db.markPaymentHashError(message.order_id); - return; - } - if (amountUnits > this.options.maxTransferAmountUnits) { - log.error( - { event: 'transfer.amount_exceeds_limit', amount_units: amountUnits.toString() }, - 'usdt_amount exceeds MAX_TRANSFER_USDT' - ); - await this.db.markPaymentHashError(message.order_id); - return; - } + let txHash: string; + try { + const tx = await this.ethereum.sendUsdtTransfer(recipient, amountUnits); + txHash = tx.hash; + } catch (error) { + log.error({ event: 'transfer.broadcast_failed', err: error }, 'broadcast failed'); + await this.db.markPaymentHashError(message.order_id); + return; + } - const recipient = await this.db.getUserEthWalletAddress(message.user_id); - if (!recipient || !isAddress(recipient)) { - log.error({ event: 'wallet.not_found_or_invalid', recipient }, 'no valid ETH wallet for user'); - await this.db.markPaymentHashError(message.order_id); - return; - } + log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK'); + const persisted = await this.db.setPaymentTxHash(message.order_id, txHash); + if (!persisted) { + log.error( + { event: 'transfer.tx_hash_persist_failed', tx_hash: txHash }, + 'could not persist tx hash, check payments row status' + ); + } - try { - await this.assertHotWalletReady(amountUnits); - } catch (error) { - log.error({ event: 'hot_wallet.not_ready', err: error }, 'hot wallet pre-check failed'); - await this.db.markPaymentHashError(message.order_id); - return; - } + const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log); + if (!delivered) { + log.error({ event: 'transfer.balance_problem', tx_hash: txHash }, 'balance did not increase after 3 checks'); + await this.db.markPaymentBalanceProblem(message.order_id, txHash); + return; + } - let preBalance: bigint; - try { - preBalance = await this.ethereum.getUsdtBalance(recipient); - } catch (error) { - log.error({ event: 'rpc.pre_balance_failed', err: error }, 'failed to read recipient pre-balance'); - await this.db.markPaymentHashError(message.order_id); - return; - } - - log.info( - { event: 'transfer.pre_balance', pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() }, - 'captured pre-balance' - ); - - let txHash: string; - try { - const tx = await this.ethereum.sendUsdtTransfer(recipient, amountUnits); - txHash = tx.hash; - } catch (error) { - log.error({ event: 'transfer.broadcast_failed', err: error }, 'broadcast failed'); - await this.db.markPaymentHashError(message.order_id); - return; - } - - log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK'); - const persisted = await this.db.setPaymentTxHash(message.order_id, txHash); - if (!persisted) { - log.error( - { event: 'transfer.tx_hash_persist_failed', tx_hash: txHash }, - 'could not persist tx hash, check payments row status' - ); - } - - const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log); - if (!delivered) { - log.error({ event: 'transfer.balance_problem', tx_hash: txHash }, 'balance did not increase after 3 checks'); - await this.db.markPaymentBalanceProblem(message.order_id, txHash); - return; - } - - await this.db.markPaymentDelivered(message.order_id, txHash); - log.info({ event: 'transfer.delivered', tx_hash: txHash }, 'marked usdt_delivered'); - await this.publishCompleted(message, log); + await this.db.markPaymentDelivered(message.order_id, txHash); + log.info({ event: 'transfer.delivered', tx_hash: txHash }, 'marked usdt_delivered'); + await this.publishCompleted(message, log); + }); } private async assertHotWalletReady(amountUnits: bigint): Promise {