feat: update
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
import {AsyncLocalStorage} from 'node:async_hooks';
|
||||||
import pg from 'pg';
|
import pg from 'pg';
|
||||||
import type {DatabaseSecret} from '../secrets/VaultClient.js';
|
import type {DatabaseSecret} from '../secrets/VaultClient.js';
|
||||||
|
|
||||||
@@ -25,6 +26,8 @@ export interface PaymentClaimResult {
|
|||||||
export class PostgresClient {
|
export class PostgresClient {
|
||||||
private readonly pool: pg.Pool;
|
private readonly pool: pg.Pool;
|
||||||
|
|
||||||
|
private readonly queryClient = new AsyncLocalStorage<pg.PoolClient>();
|
||||||
|
|
||||||
constructor(secret: DatabaseSecret) {
|
constructor(secret: DatabaseSecret) {
|
||||||
this.pool = new Pool({
|
this.pool = new Pool({
|
||||||
host: secret.host,
|
host: secret.host,
|
||||||
@@ -38,12 +41,34 @@ export class PostgresClient {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async withOrderProcessingLock<T>(orderId: string, fn: () => Promise<T>): Promise<T> {
|
||||||
|
const client = await this.pool.connect();
|
||||||
|
try {
|
||||||
|
await client.query('SELECT pg_advisory_lock(hashtext($1::text))', [orderId]);
|
||||||
|
return await this.queryClient.run(client, fn);
|
||||||
|
} finally {
|
||||||
|
await client.query('SELECT pg_advisory_unlock(hashtext($1::text))', [orderId]);
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private async query<T extends pg.QueryResultRow>(text: string, params?: unknown[]): Promise<pg.QueryResult<T>> {
|
||||||
|
const pinned = this.queryClient.getStore();
|
||||||
|
if (pinned) {
|
||||||
|
return pinned.query<T>(text, params);
|
||||||
|
}
|
||||||
|
return this.pool.query<T>(text, params);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async ping(): Promise<void> {
|
async ping(): Promise<void> {
|
||||||
await this.pool.query('SELECT 1');
|
await this.query('SELECT 1');
|
||||||
}
|
}
|
||||||
|
|
||||||
async getOrderUsdtAmount(orderId: string): Promise<string | null> {
|
async getOrderUsdtAmount(orderId: string): Promise<string | null> {
|
||||||
const result = await this.pool.query<{ usdt_amount: string }>(
|
const result = await this.query<{ usdt_amount: string }>(
|
||||||
'SELECT usdt_amount::text AS usdt_amount FROM orders WHERE id = $1',
|
'SELECT usdt_amount::text AS usdt_amount FROM orders WHERE id = $1',
|
||||||
[orderId]
|
[orderId]
|
||||||
);
|
);
|
||||||
@@ -51,7 +76,7 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getUserEthWalletAddress(userId: string): Promise<string | null> {
|
async getUserEthWalletAddress(userId: string): Promise<string | null> {
|
||||||
const result = await this.pool.query<{ address: string }>(
|
const result = await this.query<{ address: string }>(
|
||||||
`SELECT address FROM wallets WHERE user_id = $1 AND chain = 'ETH' LIMIT 1`,
|
`SELECT address FROM wallets WHERE user_id = $1 AND chain = 'ETH' LIMIT 1`,
|
||||||
[userId]
|
[userId]
|
||||||
);
|
);
|
||||||
@@ -59,7 +84,7 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getPaymentByOrderId(orderId: string): Promise<PaymentRecord | null> {
|
async getPaymentByOrderId(orderId: string): Promise<PaymentRecord | null> {
|
||||||
const result = await this.pool.query<PaymentRecord>(
|
const result = await this.query<PaymentRecord>(
|
||||||
'SELECT status, web3_transaction_hash, updated_at FROM payments WHERE order_id = $1',
|
'SELECT status, web3_transaction_hash, updated_at FROM payments WHERE order_id = $1',
|
||||||
[orderId]
|
[orderId]
|
||||||
);
|
);
|
||||||
@@ -67,13 +92,13 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async claimPaymentForTransfer(orderId: string): Promise<PaymentClaimResult> {
|
async claimPaymentForTransfer(orderId: string): Promise<PaymentClaimResult> {
|
||||||
const claimed = await this.pool.query<PaymentRecord>(
|
const claimed = await this.query<PaymentRecord>(
|
||||||
`UPDATE payments
|
`UPDATE payments
|
||||||
SET status = 'web3_processing',
|
SET status = 'web3_processing',
|
||||||
updated_at = now()
|
updated_at = now()
|
||||||
WHERE order_id = $1
|
WHERE order_id = $1
|
||||||
AND web3_transaction_hash IS NULL
|
AND web3_transaction_hash IS NULL
|
||||||
AND status = 'money_accepted'
|
AND status IN ('money_accepted', 'web3_processing')
|
||||||
RETURNING status, web3_transaction_hash, updated_at`,
|
RETURNING status, web3_transaction_hash, updated_at`,
|
||||||
[orderId]
|
[orderId]
|
||||||
);
|
);
|
||||||
@@ -92,7 +117,7 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async releaseStaleWeb3Processing(orderId: string, staleMs: number): Promise<boolean> {
|
async releaseStaleWeb3Processing(orderId: string, staleMs: number): Promise<boolean> {
|
||||||
const result = await this.pool.query<{ order_id: string }>(
|
const result = await this.query<{ order_id: string }>(
|
||||||
`UPDATE payments
|
`UPDATE payments
|
||||||
SET status = 'money_accepted',
|
SET status = 'money_accepted',
|
||||||
updated_at = now()
|
updated_at = now()
|
||||||
@@ -107,7 +132,7 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async setPaymentTxHash(orderId: string, txHash: string): Promise<boolean> {
|
async setPaymentTxHash(orderId: string, txHash: string): Promise<boolean> {
|
||||||
const result = await this.pool.query(
|
const result = await this.query(
|
||||||
`UPDATE payments
|
`UPDATE payments
|
||||||
SET web3_transaction_hash = $2,
|
SET web3_transaction_hash = $2,
|
||||||
updated_at = now()
|
updated_at = now()
|
||||||
@@ -119,7 +144,7 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async markPaymentDelivered(orderId: string, txHash: string): Promise<void> {
|
async markPaymentDelivered(orderId: string, txHash: string): Promise<void> {
|
||||||
await this.pool.query(
|
await this.query(
|
||||||
`UPDATE payments
|
`UPDATE payments
|
||||||
SET status = 'usdt_delivered',
|
SET status = 'usdt_delivered',
|
||||||
web3_transaction_hash = $2,
|
web3_transaction_hash = $2,
|
||||||
@@ -132,7 +157,7 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async markPaymentHashError(orderId: string): Promise<void> {
|
async markPaymentHashError(orderId: string): Promise<void> {
|
||||||
await this.pool.query(
|
await this.query(
|
||||||
`UPDATE payments
|
`UPDATE payments
|
||||||
SET status = 'web3_hash_error',
|
SET status = 'web3_hash_error',
|
||||||
updated_at = now()
|
updated_at = now()
|
||||||
@@ -143,7 +168,7 @@ export class PostgresClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async markPaymentBalanceProblem(orderId: string, txHash: string): Promise<void> {
|
async markPaymentBalanceProblem(orderId: string, txHash: string): Promise<void> {
|
||||||
await this.pool.query(
|
await this.query(
|
||||||
`UPDATE payments
|
`UPDATE payments
|
||||||
SET status = 'web3_balance_problem',
|
SET status = 'web3_balance_problem',
|
||||||
web3_transaction_hash = $2,
|
web3_transaction_hash = $2,
|
||||||
|
|||||||
@@ -29,126 +29,128 @@ export class TransferOrchestrator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async handle(message: CryptoTransferRequest, log: Logger): Promise<void> {
|
async handle(message: CryptoTransferRequest, log: Logger): Promise<void> {
|
||||||
let claim = await this.db.claimPaymentForTransfer(message.order_id);
|
await this.db.withOrderProcessingLock(message.order_id, async () => {
|
||||||
let existing = claim.payment;
|
let claim = await this.db.claimPaymentForTransfer(message.order_id);
|
||||||
if (!existing) {
|
let existing = claim.payment;
|
||||||
log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack');
|
if (!existing) {
|
||||||
return;
|
log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack');
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!claim.claimed && existing.status === 'web3_processing' && !existing.web3_transaction_hash) {
|
if (!claim.claimed && existing.status === 'web3_processing' && !existing.web3_transaction_hash) {
|
||||||
if (this.options.staleWeb3ProcessingMs > 0) {
|
if (this.options.staleWeb3ProcessingMs > 0) {
|
||||||
const released = await this.db.releaseStaleWeb3Processing(
|
const released = await this.db.releaseStaleWeb3Processing(
|
||||||
message.order_id,
|
message.order_id,
|
||||||
this.options.staleWeb3ProcessingMs
|
this.options.staleWeb3ProcessingMs
|
||||||
);
|
|
||||||
if (released) {
|
|
||||||
log.warn(
|
|
||||||
{ event: 'payment.stale_processing_released', stale_after_ms: this.options.staleWeb3ProcessingMs },
|
|
||||||
'released stale web3_processing to money_accepted'
|
|
||||||
);
|
);
|
||||||
claim = await this.db.claimPaymentForTransfer(message.order_id);
|
if (released) {
|
||||||
existing = claim.payment;
|
log.warn(
|
||||||
|
{ event: 'payment.stale_processing_released', stale_after_ms: this.options.staleWeb3ProcessingMs },
|
||||||
|
'released stale web3_processing to money_accepted'
|
||||||
|
);
|
||||||
|
claim = await this.db.claimPaymentForTransfer(message.order_id);
|
||||||
|
existing = claim.payment;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (!existing) {
|
if (!existing) {
|
||||||
log.error({ event: 'payment.missing_after_release' }, 'no payment row after stale release');
|
log.error({ event: 'payment.missing_after_release' }, 'no payment row after stale release');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!claim.claimed) {
|
if (!claim.claimed) {
|
||||||
await this.handleAlreadyClaimed(message, existing, log);
|
await this.handleAlreadyClaimed(message, existing, log);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const usdtAmountString = await this.db.getOrderUsdtAmount(message.order_id);
|
const usdtAmountString = await this.db.getOrderUsdtAmount(message.order_id);
|
||||||
if (!usdtAmountString) {
|
if (!usdtAmountString) {
|
||||||
log.error({ event: 'order.not_found' }, 'order missing for order_id');
|
log.error({ event: 'order.not_found' }, 'order missing for order_id');
|
||||||
await this.db.markPaymentHashError(message.order_id);
|
await this.db.markPaymentHashError(message.order_id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let amountUnits: bigint;
|
let amountUnits: bigint;
|
||||||
try {
|
try {
|
||||||
amountUnits = parseUsdtAmount(usdtAmountString);
|
amountUnits = parseUsdtAmount(usdtAmountString);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log.error(
|
log.error(
|
||||||
{ event: 'order.invalid_amount', err: error, usdt_amount: usdtAmountString },
|
{ event: 'order.invalid_amount', err: error, usdt_amount: usdtAmountString },
|
||||||
'could not parse usdt_amount'
|
'could not parse usdt_amount'
|
||||||
|
);
|
||||||
|
await this.db.markPaymentHashError(message.order_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amountUnits > this.options.maxTransferAmountUnits) {
|
||||||
|
log.error(
|
||||||
|
{ event: 'transfer.amount_exceeds_limit', amount_units: amountUnits.toString() },
|
||||||
|
'usdt_amount exceeds MAX_TRANSFER_USDT'
|
||||||
|
);
|
||||||
|
await this.db.markPaymentHashError(message.order_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const recipient = await this.db.getUserEthWalletAddress(message.user_id);
|
||||||
|
if (!recipient || !isAddress(recipient)) {
|
||||||
|
log.error({ event: 'wallet.not_found_or_invalid', recipient }, 'no valid ETH wallet for user');
|
||||||
|
await this.db.markPaymentHashError(message.order_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.assertHotWalletReady(amountUnits);
|
||||||
|
} catch (error) {
|
||||||
|
log.error({ event: 'hot_wallet.not_ready', err: error }, 'hot wallet pre-check failed');
|
||||||
|
await this.db.markPaymentHashError(message.order_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let preBalance: bigint;
|
||||||
|
try {
|
||||||
|
preBalance = await this.ethereum.getUsdtBalance(recipient);
|
||||||
|
} catch (error) {
|
||||||
|
log.error({ event: 'rpc.pre_balance_failed', err: error }, 'failed to read recipient pre-balance');
|
||||||
|
await this.db.markPaymentHashError(message.order_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
{ event: 'transfer.pre_balance', pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() },
|
||||||
|
'captured pre-balance'
|
||||||
);
|
);
|
||||||
await this.db.markPaymentHashError(message.order_id);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (amountUnits > this.options.maxTransferAmountUnits) {
|
let txHash: string;
|
||||||
log.error(
|
try {
|
||||||
{ event: 'transfer.amount_exceeds_limit', amount_units: amountUnits.toString() },
|
const tx = await this.ethereum.sendUsdtTransfer(recipient, amountUnits);
|
||||||
'usdt_amount exceeds MAX_TRANSFER_USDT'
|
txHash = tx.hash;
|
||||||
);
|
} catch (error) {
|
||||||
await this.db.markPaymentHashError(message.order_id);
|
log.error({ event: 'transfer.broadcast_failed', err: error }, 'broadcast failed');
|
||||||
return;
|
await this.db.markPaymentHashError(message.order_id);
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const recipient = await this.db.getUserEthWalletAddress(message.user_id);
|
log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK');
|
||||||
if (!recipient || !isAddress(recipient)) {
|
const persisted = await this.db.setPaymentTxHash(message.order_id, txHash);
|
||||||
log.error({ event: 'wallet.not_found_or_invalid', recipient }, 'no valid ETH wallet for user');
|
if (!persisted) {
|
||||||
await this.db.markPaymentHashError(message.order_id);
|
log.error(
|
||||||
return;
|
{ event: 'transfer.tx_hash_persist_failed', tx_hash: txHash },
|
||||||
}
|
'could not persist tx hash, check payments row status'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log);
|
||||||
await this.assertHotWalletReady(amountUnits);
|
if (!delivered) {
|
||||||
} catch (error) {
|
log.error({ event: 'transfer.balance_problem', tx_hash: txHash }, 'balance did not increase after 3 checks');
|
||||||
log.error({ event: 'hot_wallet.not_ready', err: error }, 'hot wallet pre-check failed');
|
await this.db.markPaymentBalanceProblem(message.order_id, txHash);
|
||||||
await this.db.markPaymentHashError(message.order_id);
|
return;
|
||||||
return;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let preBalance: bigint;
|
await this.db.markPaymentDelivered(message.order_id, txHash);
|
||||||
try {
|
log.info({ event: 'transfer.delivered', tx_hash: txHash }, 'marked usdt_delivered');
|
||||||
preBalance = await this.ethereum.getUsdtBalance(recipient);
|
await this.publishCompleted(message, log);
|
||||||
} catch (error) {
|
});
|
||||||
log.error({ event: 'rpc.pre_balance_failed', err: error }, 'failed to read recipient pre-balance');
|
|
||||||
await this.db.markPaymentHashError(message.order_id);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info(
|
|
||||||
{ event: 'transfer.pre_balance', pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() },
|
|
||||||
'captured pre-balance'
|
|
||||||
);
|
|
||||||
|
|
||||||
let txHash: string;
|
|
||||||
try {
|
|
||||||
const tx = await this.ethereum.sendUsdtTransfer(recipient, amountUnits);
|
|
||||||
txHash = tx.hash;
|
|
||||||
} catch (error) {
|
|
||||||
log.error({ event: 'transfer.broadcast_failed', err: error }, 'broadcast failed');
|
|
||||||
await this.db.markPaymentHashError(message.order_id);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK');
|
|
||||||
const persisted = await this.db.setPaymentTxHash(message.order_id, txHash);
|
|
||||||
if (!persisted) {
|
|
||||||
log.error(
|
|
||||||
{ event: 'transfer.tx_hash_persist_failed', tx_hash: txHash },
|
|
||||||
'could not persist tx hash, check payments row status'
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log);
|
|
||||||
if (!delivered) {
|
|
||||||
log.error({ event: 'transfer.balance_problem', tx_hash: txHash }, 'balance did not increase after 3 checks');
|
|
||||||
await this.db.markPaymentBalanceProblem(message.order_id, txHash);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.db.markPaymentDelivered(message.order_id, txHash);
|
|
||||||
log.info({ event: 'transfer.delivered', tx_hash: txHash }, 'marked usdt_delivered');
|
|
||||||
await this.publishCompleted(message, log);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async assertHotWalletReady(amountUnits: bigint): Promise<void> {
|
private async assertHotWalletReady(amountUnits: bigint): Promise<void> {
|
||||||
|
|||||||
Reference in New Issue
Block a user