From 47eec7117b6e9fad8ffae1ac3033be87e8163246 Mon Sep 17 00:00:00 2001 From: Noloquideus Date: Thu, 14 May 2026 00:57:42 +0300 Subject: [PATCH] fix: update --- .env.example | 1 + src/config.ts | 15 ++++++++++ src/db/PostgresClient.ts | 27 +++++++++++++---- src/index.ts | 3 +- src/services/TransferOrchestrator.ts | 45 ++++++++++++++++++++++++---- 5 files changed, 79 insertions(+), 12 deletions(-) diff --git a/.env.example b/.env.example index 86cc615..af01dbb 100644 --- a/.env.example +++ b/.env.example @@ -14,4 +14,5 @@ MISMATCH_TIMEOUT_MS=300000 POLL_INTERVAL_MS=15000 MAX_TRANSFER_USDT=1000 MIN_ETH_BALANCE_WEI=1 +STALE_WEB3_PROCESSING_MS=900000 WEBHOOK_TIMEOUT_MS=10000 diff --git a/src/config.ts b/src/config.ts index 8b544ec..4f78ce6 100644 --- a/src/config.ts +++ b/src/config.ts @@ -13,6 +13,7 @@ export interface AppConfig { minEthBalanceWei: bigint; balanceCheckAttempts: number; balanceCheckIntervalMs: number; + staleWeb3ProcessingMs: number; vaultAddr: string; vaultMountPoint: string; vaultRoleId: string; @@ -30,6 +31,7 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): AppConfig { minEthBalanceWei: BigInt(env.MIN_ETH_BALANCE_WEI ?? '1'), balanceCheckAttempts: 3, balanceCheckIntervalMs: 60_000, + staleWeb3ProcessingMs: readNonNegativeInteger(env.STALE_WEB3_PROCESSING_MS, 900_000), vaultAddr: readRequired(env.VAULT_ADDR, 'VAULT_ADDR'), vaultMountPoint: readRequired(env.VAULT_MOUNT_POINT, 'VAULT_MOUNT_POINT'), vaultRoleId: readRequired(env.VAULT_ROLE_ID, 'VAULT_ROLE_ID'), @@ -76,6 +78,19 @@ function readInteger(value: string | undefined, defaultValue: number): number { } +function readNonNegativeInteger(value: string | undefined, defaultValue: number): number { + if (!value) { + return defaultValue; + } + + const parsed = Number.parseInt(value, 10); + if (!Number.isFinite(parsed) || parsed < 0) { + throw new Error(`Expected non-negative integer, got ${value}`); + } + return parsed; +} + + function readOptionalPlaceholder(value: string | undefined, placeholder: string): string | undefined { if (!value || value === placeholder) { return undefined; diff --git a/src/db/PostgresClient.ts b/src/db/PostgresClient.ts index 4f73b21..12f43a7 100644 --- a/src/db/PostgresClient.ts +++ b/src/db/PostgresClient.ts @@ -14,6 +14,7 @@ export type PaymentStatus = export interface PaymentRecord { status: PaymentStatus; web3_transaction_hash: string | null; + updated_at: Date; } export interface PaymentClaimResult { @@ -59,7 +60,7 @@ export class PostgresClient { async getPaymentByOrderId(orderId: string): Promise { const result = await this.pool.query( - 'SELECT status, web3_transaction_hash FROM payments WHERE order_id = $1', + 'SELECT status, web3_transaction_hash, updated_at FROM payments WHERE order_id = $1', [orderId] ); return result.rows[0] ?? null; @@ -72,8 +73,8 @@ export class PostgresClient { updated_at = now() WHERE order_id = $1 AND web3_transaction_hash IS NULL - AND status NOT IN ('web3_processing', 'usdt_delivered', 'web3_hash_error', 'web3_balance_problem') - RETURNING status, web3_transaction_hash`, + AND status = 'money_accepted' + RETURNING status, web3_transaction_hash, updated_at`, [orderId] ); @@ -90,8 +91,23 @@ export class PostgresClient { }; } - async setPaymentTxHash(orderId: string, txHash: string): Promise { - await this.pool.query( + async releaseStaleWeb3Processing(orderId: string, staleMs: number): Promise { + const result = await this.pool.query<{ order_id: string }>( + `UPDATE payments + SET status = 'money_accepted', + updated_at = now() + WHERE order_id = $1 + AND status = 'web3_processing' + AND web3_transaction_hash IS NULL + AND updated_at < now() - ($2::bigint * interval '1 millisecond') + RETURNING order_id`, + [orderId, staleMs] + ); + return (result.rowCount ?? 0) > 0; + } + + async setPaymentTxHash(orderId: string, txHash: string): Promise { + const result = await this.pool.query( `UPDATE payments SET web3_transaction_hash = $2, updated_at = now() @@ -99,6 +115,7 @@ export class PostgresClient { AND status = 'web3_processing'`, [orderId, txHash] ); + return (result.rowCount ?? 0) > 0; } async markPaymentDelivered(orderId: string, txHash: string): Promise { diff --git a/src/index.ts b/src/index.ts index 2554af8..dcd7037 100644 --- a/src/index.ts +++ b/src/index.ts @@ -47,7 +47,8 @@ async function main(): Promise { maxTransferAmountUnits: config.maxTransferAmountUnits, minEthBalanceWei: config.minEthBalanceWei, balanceCheckAttempts: config.balanceCheckAttempts, - balanceCheckIntervalMs: config.balanceCheckIntervalMs + balanceCheckIntervalMs: config.balanceCheckIntervalMs, + staleWeb3ProcessingMs: config.staleWeb3ProcessingMs }); await amqp.startConsumer((message, log) => orchestrator.handle(message, log)); diff --git a/src/services/TransferOrchestrator.ts b/src/services/TransferOrchestrator.ts index 0c670ce..db12989 100644 --- a/src/services/TransferOrchestrator.ts +++ b/src/services/TransferOrchestrator.ts @@ -2,7 +2,7 @@ import {isAddress} from 'ethers'; import type {Logger} from 'pino'; import {ulid} from 'ulid'; import {parseUsdtAmount} from '../domain/amount.js'; -import type {PostgresClient} from '../db/PostgresClient.js'; +import type {PostgresClient,PaymentRecord} from '../db/PostgresClient.js'; import type {EthereumGateway} from '../ethereum/EthereumGateway.js'; import type {AmqpClient} from '../queue/AmqpClient.js'; import type {CryptoTransferRequest} from '../queue/messageSchema.js'; @@ -12,6 +12,7 @@ export interface TransferOrchestratorOptions { minEthBalanceWei: bigint; balanceCheckAttempts: number; balanceCheckIntervalMs: number; + staleWeb3ProcessingMs: number; sleep?: (ms: number) => Promise; } @@ -28,13 +29,35 @@ export class TransferOrchestrator { } async handle(message: CryptoTransferRequest, log: Logger): Promise { - const claim = await this.db.claimPaymentForTransfer(message.order_id); - const existing = claim.payment; + let claim = await this.db.claimPaymentForTransfer(message.order_id); + let existing = claim.payment; if (!existing) { log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack'); return; } + if (!claim.claimed && existing.status === 'web3_processing' && !existing.web3_transaction_hash) { + if (this.options.staleWeb3ProcessingMs > 0) { + const released = await this.db.releaseStaleWeb3Processing( + message.order_id, + this.options.staleWeb3ProcessingMs + ); + if (released) { + log.warn( + { event: 'payment.stale_processing_released', stale_after_ms: this.options.staleWeb3ProcessingMs }, + 'released stale web3_processing to money_accepted' + ); + claim = await this.db.claimPaymentForTransfer(message.order_id); + existing = claim.payment; + } + } + } + + if (!existing) { + log.error({ event: 'payment.missing_after_release' }, 'no payment row after stale release'); + return; + } + if (!claim.claimed) { await this.handleAlreadyClaimed(message, existing, log); return; @@ -108,7 +131,13 @@ export class TransferOrchestrator { } log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK'); - await this.db.setPaymentTxHash(message.order_id, txHash); + const persisted = await this.db.setPaymentTxHash(message.order_id, txHash); + if (!persisted) { + log.error( + { event: 'transfer.tx_hash_persist_failed', tx_hash: txHash }, + 'could not persist tx hash, check payments row status' + ); + } const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log); if (!delivered) { @@ -193,7 +222,7 @@ export class TransferOrchestrator { private async handleAlreadyClaimed( message: CryptoTransferRequest, - existing: { status: string; web3_transaction_hash: string | null }, + existing: PaymentRecord, log: Logger ): Promise { if (existing.status === 'usdt_delivered') { @@ -204,7 +233,11 @@ export class TransferOrchestrator { if (existing.status === 'web3_processing') { log.warn( - { event: 'payment.already_processing', tx_hash: existing.web3_transaction_hash }, + { + event: 'payment.already_processing', + tx_hash: existing.web3_transaction_hash, + updated_at: existing.updated_at.toISOString() + }, 'skip - payment is already claimed by another worker' ); return;