fix: update

This commit is contained in:
2026-05-14 00:57:42 +03:00
parent 83e89f30df
commit 47eec7117b
5 changed files with 79 additions and 12 deletions

View File

@@ -14,4 +14,5 @@ MISMATCH_TIMEOUT_MS=300000
POLL_INTERVAL_MS=15000 POLL_INTERVAL_MS=15000
MAX_TRANSFER_USDT=1000 MAX_TRANSFER_USDT=1000
MIN_ETH_BALANCE_WEI=1 MIN_ETH_BALANCE_WEI=1
STALE_WEB3_PROCESSING_MS=900000
WEBHOOK_TIMEOUT_MS=10000 WEBHOOK_TIMEOUT_MS=10000

View File

@@ -13,6 +13,7 @@ export interface AppConfig {
minEthBalanceWei: bigint; minEthBalanceWei: bigint;
balanceCheckAttempts: number; balanceCheckAttempts: number;
balanceCheckIntervalMs: number; balanceCheckIntervalMs: number;
staleWeb3ProcessingMs: number;
vaultAddr: string; vaultAddr: string;
vaultMountPoint: string; vaultMountPoint: string;
vaultRoleId: string; vaultRoleId: string;
@@ -30,6 +31,7 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): AppConfig {
minEthBalanceWei: BigInt(env.MIN_ETH_BALANCE_WEI ?? '1'), minEthBalanceWei: BigInt(env.MIN_ETH_BALANCE_WEI ?? '1'),
balanceCheckAttempts: 3, balanceCheckAttempts: 3,
balanceCheckIntervalMs: 60_000, balanceCheckIntervalMs: 60_000,
staleWeb3ProcessingMs: readNonNegativeInteger(env.STALE_WEB3_PROCESSING_MS, 900_000),
vaultAddr: readRequired(env.VAULT_ADDR, 'VAULT_ADDR'), vaultAddr: readRequired(env.VAULT_ADDR, 'VAULT_ADDR'),
vaultMountPoint: readRequired(env.VAULT_MOUNT_POINT, 'VAULT_MOUNT_POINT'), vaultMountPoint: readRequired(env.VAULT_MOUNT_POINT, 'VAULT_MOUNT_POINT'),
vaultRoleId: readRequired(env.VAULT_ROLE_ID, 'VAULT_ROLE_ID'), vaultRoleId: readRequired(env.VAULT_ROLE_ID, 'VAULT_ROLE_ID'),
@@ -76,6 +78,19 @@ function readInteger(value: string | undefined, defaultValue: number): number {
} }
function readNonNegativeInteger(value: string | undefined, defaultValue: number): number {
if (!value) {
return defaultValue;
}
const parsed = Number.parseInt(value, 10);
if (!Number.isFinite(parsed) || parsed < 0) {
throw new Error(`Expected non-negative integer, got ${value}`);
}
return parsed;
}
function readOptionalPlaceholder(value: string | undefined, placeholder: string): string | undefined { function readOptionalPlaceholder(value: string | undefined, placeholder: string): string | undefined {
if (!value || value === placeholder) { if (!value || value === placeholder) {
return undefined; return undefined;

View File

@@ -14,6 +14,7 @@ export type PaymentStatus =
export interface PaymentRecord { export interface PaymentRecord {
status: PaymentStatus; status: PaymentStatus;
web3_transaction_hash: string | null; web3_transaction_hash: string | null;
updated_at: Date;
} }
export interface PaymentClaimResult { export interface PaymentClaimResult {
@@ -59,7 +60,7 @@ export class PostgresClient {
async getPaymentByOrderId(orderId: string): Promise<PaymentRecord | null> { async getPaymentByOrderId(orderId: string): Promise<PaymentRecord | null> {
const result = await this.pool.query<PaymentRecord>( const result = await this.pool.query<PaymentRecord>(
'SELECT status, web3_transaction_hash FROM payments WHERE order_id = $1', 'SELECT status, web3_transaction_hash, updated_at FROM payments WHERE order_id = $1',
[orderId] [orderId]
); );
return result.rows[0] ?? null; return result.rows[0] ?? null;
@@ -72,8 +73,8 @@ export class PostgresClient {
updated_at = now() updated_at = now()
WHERE order_id = $1 WHERE order_id = $1
AND web3_transaction_hash IS NULL AND web3_transaction_hash IS NULL
AND status NOT IN ('web3_processing', 'usdt_delivered', 'web3_hash_error', 'web3_balance_problem') AND status = 'money_accepted'
RETURNING status, web3_transaction_hash`, RETURNING status, web3_transaction_hash, updated_at`,
[orderId] [orderId]
); );
@@ -90,8 +91,23 @@ export class PostgresClient {
}; };
} }
async setPaymentTxHash(orderId: string, txHash: string): Promise<void> { async releaseStaleWeb3Processing(orderId: string, staleMs: number): Promise<boolean> {
await this.pool.query( const result = await this.pool.query<{ order_id: string }>(
`UPDATE payments
SET status = 'money_accepted',
updated_at = now()
WHERE order_id = $1
AND status = 'web3_processing'
AND web3_transaction_hash IS NULL
AND updated_at < now() - ($2::bigint * interval '1 millisecond')
RETURNING order_id`,
[orderId, staleMs]
);
return (result.rowCount ?? 0) > 0;
}
async setPaymentTxHash(orderId: string, txHash: string): Promise<boolean> {
const result = await this.pool.query(
`UPDATE payments `UPDATE payments
SET web3_transaction_hash = $2, SET web3_transaction_hash = $2,
updated_at = now() updated_at = now()
@@ -99,6 +115,7 @@ export class PostgresClient {
AND status = 'web3_processing'`, AND status = 'web3_processing'`,
[orderId, txHash] [orderId, txHash]
); );
return (result.rowCount ?? 0) > 0;
} }
async markPaymentDelivered(orderId: string, txHash: string): Promise<void> { async markPaymentDelivered(orderId: string, txHash: string): Promise<void> {

View File

@@ -47,7 +47,8 @@ async function main(): Promise<void> {
maxTransferAmountUnits: config.maxTransferAmountUnits, maxTransferAmountUnits: config.maxTransferAmountUnits,
minEthBalanceWei: config.minEthBalanceWei, minEthBalanceWei: config.minEthBalanceWei,
balanceCheckAttempts: config.balanceCheckAttempts, balanceCheckAttempts: config.balanceCheckAttempts,
balanceCheckIntervalMs: config.balanceCheckIntervalMs balanceCheckIntervalMs: config.balanceCheckIntervalMs,
staleWeb3ProcessingMs: config.staleWeb3ProcessingMs
}); });
await amqp.startConsumer((message, log) => orchestrator.handle(message, log)); await amqp.startConsumer((message, log) => orchestrator.handle(message, log));

View File

@@ -2,7 +2,7 @@ import {isAddress} from 'ethers';
import type {Logger} from 'pino'; import type {Logger} from 'pino';
import {ulid} from 'ulid'; import {ulid} from 'ulid';
import {parseUsdtAmount} from '../domain/amount.js'; import {parseUsdtAmount} from '../domain/amount.js';
import type {PostgresClient} from '../db/PostgresClient.js'; import type {PostgresClient,PaymentRecord} from '../db/PostgresClient.js';
import type {EthereumGateway} from '../ethereum/EthereumGateway.js'; import type {EthereumGateway} from '../ethereum/EthereumGateway.js';
import type {AmqpClient} from '../queue/AmqpClient.js'; import type {AmqpClient} from '../queue/AmqpClient.js';
import type {CryptoTransferRequest} from '../queue/messageSchema.js'; import type {CryptoTransferRequest} from '../queue/messageSchema.js';
@@ -12,6 +12,7 @@ export interface TransferOrchestratorOptions {
minEthBalanceWei: bigint; minEthBalanceWei: bigint;
balanceCheckAttempts: number; balanceCheckAttempts: number;
balanceCheckIntervalMs: number; balanceCheckIntervalMs: number;
staleWeb3ProcessingMs: number;
sleep?: (ms: number) => Promise<void>; sleep?: (ms: number) => Promise<void>;
} }
@@ -28,13 +29,35 @@ export class TransferOrchestrator {
} }
async handle(message: CryptoTransferRequest, log: Logger): Promise<void> { async handle(message: CryptoTransferRequest, log: Logger): Promise<void> {
const claim = await this.db.claimPaymentForTransfer(message.order_id); let claim = await this.db.claimPaymentForTransfer(message.order_id);
const existing = claim.payment; let existing = claim.payment;
if (!existing) { if (!existing) {
log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack'); log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack');
return; return;
} }
if (!claim.claimed && existing.status === 'web3_processing' && !existing.web3_transaction_hash) {
if (this.options.staleWeb3ProcessingMs > 0) {
const released = await this.db.releaseStaleWeb3Processing(
message.order_id,
this.options.staleWeb3ProcessingMs
);
if (released) {
log.warn(
{ event: 'payment.stale_processing_released', stale_after_ms: this.options.staleWeb3ProcessingMs },
'released stale web3_processing to money_accepted'
);
claim = await this.db.claimPaymentForTransfer(message.order_id);
existing = claim.payment;
}
}
}
if (!existing) {
log.error({ event: 'payment.missing_after_release' }, 'no payment row after stale release');
return;
}
if (!claim.claimed) { if (!claim.claimed) {
await this.handleAlreadyClaimed(message, existing, log); await this.handleAlreadyClaimed(message, existing, log);
return; return;
@@ -108,7 +131,13 @@ export class TransferOrchestrator {
} }
log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK'); log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK');
await this.db.setPaymentTxHash(message.order_id, txHash); const persisted = await this.db.setPaymentTxHash(message.order_id, txHash);
if (!persisted) {
log.error(
{ event: 'transfer.tx_hash_persist_failed', tx_hash: txHash },
'could not persist tx hash, check payments row status'
);
}
const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log); const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log);
if (!delivered) { if (!delivered) {
@@ -193,7 +222,7 @@ export class TransferOrchestrator {
private async handleAlreadyClaimed( private async handleAlreadyClaimed(
message: CryptoTransferRequest, message: CryptoTransferRequest,
existing: { status: string; web3_transaction_hash: string | null }, existing: PaymentRecord,
log: Logger log: Logger
): Promise<void> { ): Promise<void> {
if (existing.status === 'usdt_delivered') { if (existing.status === 'usdt_delivered') {
@@ -204,7 +233,11 @@ export class TransferOrchestrator {
if (existing.status === 'web3_processing') { if (existing.status === 'web3_processing') {
log.warn( log.warn(
{ event: 'payment.already_processing', tx_hash: existing.web3_transaction_hash }, {
event: 'payment.already_processing',
tx_hash: existing.web3_transaction_hash,
updated_at: existing.updated_at.toISOString()
},
'skip - payment is already claimed by another worker' 'skip - payment is already claimed by another worker'
); );
return; return;