feat: add new status in db

This commit is contained in:
2026-05-11 14:08:44 +03:00
parent 3b013d3ba0
commit 83e89f30df
4 changed files with 94 additions and 26 deletions

View File

@@ -5,3 +5,4 @@ dist
.gitignore
*.log
README.md
swagger.yaml

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@ node_modules/
dist/
.env
*.log
swagger.yaml

View File

@@ -1,13 +1,14 @@
import pg from "pg";
import type { DatabaseSecret } from "../secrets/VaultClient.js";
import pg from 'pg';
import type {DatabaseSecret} from '../secrets/VaultClient.js';
const { Pool } = pg;
export type PaymentStatus =
| "pending"
| "usdt_delivered"
| "web3_hash_error"
| "web3_balance_problem"
| 'pending'
| 'web3_processing'
| 'usdt_delivered'
| 'web3_hash_error'
| 'web3_balance_problem'
| string;
export interface PaymentRecord {
@@ -15,6 +16,11 @@ export interface PaymentRecord {
web3_transaction_hash: string | null;
}
export interface PaymentClaimResult {
claimed: boolean;
payment: PaymentRecord | null;
}
export class PostgresClient {
private readonly pool: pg.Pool;
@@ -32,12 +38,12 @@ export class PostgresClient {
}
async ping(): Promise<void> {
await this.pool.query("SELECT 1");
await this.pool.query('SELECT 1');
}
async getOrderUsdtAmount(orderId: string): Promise<string | null> {
const result = await this.pool.query<{ usdt_amount: string }>(
"SELECT usdt_amount::text AS usdt_amount FROM orders WHERE id = $1",
'SELECT usdt_amount::text AS usdt_amount FROM orders WHERE id = $1',
[orderId]
);
return result.rows[0]?.usdt_amount ?? null;
@@ -45,7 +51,7 @@ export class PostgresClient {
async getUserEthWalletAddress(userId: string): Promise<string | null> {
const result = await this.pool.query<{ address: string }>(
"SELECT address FROM wallets WHERE user_id = $1 AND chain = 'ETH' LIMIT 1",
`SELECT address FROM wallets WHERE user_id = $1 AND chain = 'ETH' LIMIT 1`,
[userId]
);
return result.rows[0]?.address ?? null;
@@ -53,15 +59,44 @@ export class PostgresClient {
async getPaymentByOrderId(orderId: string): Promise<PaymentRecord | null> {
const result = await this.pool.query<PaymentRecord>(
"SELECT status, web3_transaction_hash FROM payments WHERE order_id = $1",
'SELECT status, web3_transaction_hash FROM payments WHERE order_id = $1',
[orderId]
);
return result.rows[0] ?? null;
}
async claimPaymentForTransfer(orderId: string): Promise<PaymentClaimResult> {
const claimed = await this.pool.query<PaymentRecord>(
`UPDATE payments
SET status = 'web3_processing',
updated_at = now()
WHERE order_id = $1
AND web3_transaction_hash IS NULL
AND status NOT IN ('web3_processing', 'usdt_delivered', 'web3_hash_error', 'web3_balance_problem')
RETURNING status, web3_transaction_hash`,
[orderId]
);
if (claimed.rows[0]) {
return {
claimed: true,
payment: claimed.rows[0]
};
}
return {
claimed: false,
payment: await this.getPaymentByOrderId(orderId)
};
}
async setPaymentTxHash(orderId: string, txHash: string): Promise<void> {
await this.pool.query(
"UPDATE payments SET web3_transaction_hash = $2, updated_at = now() WHERE order_id = $1",
`UPDATE payments
SET web3_transaction_hash = $2,
updated_at = now()
WHERE order_id = $1
AND status = 'web3_processing'`,
[orderId, txHash]
);
}
@@ -73,14 +108,19 @@ export class PostgresClient {
web3_transaction_hash = $2,
paid_at = now(),
updated_at = now()
WHERE order_id = $1`,
WHERE order_id = $1
AND status = 'web3_processing'`,
[orderId, txHash]
);
}
async markPaymentHashError(orderId: string): Promise<void> {
await this.pool.query(
"UPDATE payments SET status = 'web3_hash_error', updated_at = now() WHERE order_id = $1",
`UPDATE payments
SET status = 'web3_hash_error',
updated_at = now()
WHERE order_id = $1
AND status = 'web3_processing'`,
[orderId]
);
}
@@ -91,7 +131,8 @@ export class PostgresClient {
SET status = 'web3_balance_problem',
web3_transaction_hash = $2,
updated_at = now()
WHERE order_id = $1`,
WHERE order_id = $1
AND status = 'web3_processing'`,
[orderId, txHash]
);
}

View File

@@ -28,23 +28,15 @@ export class TransferOrchestrator {
}
async handle(message: CryptoTransferRequest, log: Logger): Promise<void> {
const existing = await this.db.getPaymentByOrderId(message.order_id);
const claim = await this.db.claimPaymentForTransfer(message.order_id);
const existing = claim.payment;
if (!existing) {
log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack');
return;
}
if (existing.status === 'usdt_delivered') {
log.info({ event: 'payment.already_delivered' }, 'already delivered, publishing completion');
await this.publishCompleted(message, log);
return;
}
if (existing.status === 'web3_hash_error' || existing.status === 'web3_balance_problem') {
log.warn(
{ event: 'payment.terminal_state', status: existing.status },
'skip - payment already in terminal failure state'
);
if (!claim.claimed) {
await this.handleAlreadyClaimed(message, existing, log);
return;
}
@@ -198,6 +190,39 @@ export class TransferOrchestrator {
'published crypto.transfer.completed'
);
}
private async handleAlreadyClaimed(
message: CryptoTransferRequest,
existing: { status: string; web3_transaction_hash: string | null },
log: Logger
): Promise<void> {
if (existing.status === 'usdt_delivered') {
log.info({ event: 'payment.already_delivered' }, 'already delivered, publishing completion');
await this.publishCompleted(message, log);
return;
}
if (existing.status === 'web3_processing') {
log.warn(
{ event: 'payment.already_processing', tx_hash: existing.web3_transaction_hash },
'skip - payment is already claimed by another worker'
);
return;
}
if (existing.status === 'web3_hash_error' || existing.status === 'web3_balance_problem') {
log.warn(
{ event: 'payment.terminal_state', status: existing.status },
'skip - payment already in terminal failure state'
);
return;
}
log.warn(
{ event: 'payment.not_claimed', status: existing.status, tx_hash: existing.web3_transaction_hash },
'skip - payment was not claimed for transfer'
);
}
}