fix: update for main logic

This commit is contained in:
2026-05-01 12:49:24 +03:00
parent b61739cae8
commit f1a68a7bf8
8 changed files with 314 additions and 177 deletions

View File

@@ -1,6 +1,6 @@
PORT=3000 PORT=3000
ETH_RPC_URL=https://ethereum-rpc.publicnode.com ETH_RPC_URL=https://ethereum-rpc.publicnode.com
HOT_WALLET_PRIVATE_KEY=0xYOUR_PRIVATE_KEY HOT_WALLET_PRIVATE_KEY=
USDT_CONTRACT_ADDRESS=0xdAC17F958D2ee523a2206206994597C13D831ec7 USDT_CONTRACT_ADDRESS=0xdAC17F958D2ee523a2206206994597C13D831ec7
VAULT_MOUNT_POINT=dev-secrets VAULT_MOUNT_POINT=dev-secrets

View File

@@ -15,7 +15,7 @@ RabbitMQ (crypto.transfer.requested)
└─ AmqpPublisher (crypto.transfer.completed) └─ AmqpPublisher (crypto.transfer.completed)
``` ```
Postgres credentials and RabbitMQ credentials are pulled from HashiCorp Vault (KV v2, AppRole auth). The hot-wallet private key, ETH RPC URL and limits stay in `.env`. Postgres credentials and RabbitMQ credentials are pulled from HashiCorp Vault (KV v2, AppRole auth). The hot-wallet private key, ETH RPC URL and USDT contract can also be pulled from Vault via the optional `ethereum` secret; `.env` remains as a fallback for those fields and runtime limits.
## Message contract ## Message contract
@@ -66,7 +66,7 @@ Required `.env` (see `.env.example`):
``` ```
PORT=3000 PORT=3000
ETH_RPC_URL=https://ethereum-rpc.publicnode.com ETH_RPC_URL=https://ethereum-rpc.publicnode.com
HOT_WALLET_PRIVATE_KEY=0x... HOT_WALLET_PRIVATE_KEY=
USDT_CONTRACT_ADDRESS=0xdAC17F958D2ee523a2206206994597C13D831ec7 USDT_CONTRACT_ADDRESS=0xdAC17F958D2ee523a2206206994597C13D831ec7
VAULT_MOUNT_POINT=dev-secrets VAULT_MOUNT_POINT=dev-secrets
@@ -107,11 +107,19 @@ WEBHOOK_TIMEOUT_MS=10000
| user | username | | user | username |
| password | password | | password | password |
Optional `dev-secrets/ethereum`:
| key | value |
|------------------------|--------------------------|
| rpc_url | Ethereum RPC URL |
| hot_wallet_private_key | company wallet private key |
| usdt_contract_address | USDT contract address |
## HTTP ## HTTP
A single endpoint is exposed for liveness/readiness probes: A single endpoint is exposed for liveness/readiness probes:
- `GET /health` — RPC block number + hot-wallet ETH/USDT balances. Returns 503 on RPC errors. - `GET /health` — RPC block number, hot-wallet ETH/USDT balances, Postgres ping and RabbitMQ connection state. Returns 503 if any dependency is degraded.
## Local development ## Local development

View File

@@ -1,12 +1,13 @@
import { parseUsdtAmount } from "./domain/amount.js"; import {parseUsdtAmount} from './domain/amount.js';
import type {EthereumSecret} from './secrets/VaultClient.js';
export const MAINNET_USDT_CONTRACT = "0xdAC17F958D2ee523a2206206994597C13D831ec7"; export const MAINNET_USDT_CONTRACT = '0xdAC17F958D2ee523a2206206994597C13D831ec7';
export interface AppConfig { export interface AppConfig {
port: number; port: number;
ethRpcUrl: string; ethRpcUrl?: string;
hotWalletPrivateKey: string; hotWalletPrivateKey?: string;
usdtContractAddress: string; usdtContractAddress?: string;
maxTransferAmountUnits: bigint; maxTransferAmountUnits: bigint;
minEthBalanceWei: bigint; minEthBalanceWei: bigint;
balanceCheckAttempts: number; balanceCheckAttempts: number;
@@ -21,21 +22,38 @@ export interface AppConfig {
export function loadConfig(env: NodeJS.ProcessEnv = process.env): AppConfig { export function loadConfig(env: NodeJS.ProcessEnv = process.env): AppConfig {
return { return {
port: readInteger(env.PORT, 3000), port: readInteger(env.PORT, 3000),
ethRpcUrl: readRequired(env.ETH_RPC_URL, "ETH_RPC_URL"), ethRpcUrl: env.ETH_RPC_URL,
hotWalletPrivateKey: readRequired(env.HOT_WALLET_PRIVATE_KEY, "HOT_WALLET_PRIVATE_KEY"), hotWalletPrivateKey: readOptionalPlaceholder(env.HOT_WALLET_PRIVATE_KEY, '0xYOUR_PRIVATE_KEY'),
usdtContractAddress: env.USDT_CONTRACT_ADDRESS ?? MAINNET_USDT_CONTRACT, usdtContractAddress: env.USDT_CONTRACT_ADDRESS ?? MAINNET_USDT_CONTRACT,
maxTransferAmountUnits: parseUsdtAmount(env.MAX_TRANSFER_USDT ?? "1000"), maxTransferAmountUnits: parseUsdtAmount(env.MAX_TRANSFER_USDT ?? '1000'),
minEthBalanceWei: BigInt(env.MIN_ETH_BALANCE_WEI ?? "1"), minEthBalanceWei: BigInt(env.MIN_ETH_BALANCE_WEI ?? '1'),
balanceCheckAttempts: 3, balanceCheckAttempts: 3,
balanceCheckIntervalMs: 60_000, balanceCheckIntervalMs: 60_000,
vaultAddr: readRequired(env.VAULT_ADDR, "VAULT_ADDR"), vaultAddr: readRequired(env.VAULT_ADDR, 'VAULT_ADDR'),
vaultMountPoint: readRequired(env.VAULT_MOUNT_POINT, "VAULT_MOUNT_POINT"), vaultMountPoint: readRequired(env.VAULT_MOUNT_POINT, 'VAULT_MOUNT_POINT'),
vaultRoleId: readRequired(env.VAULT_ROLE_ID, "VAULT_ROLE_ID"), vaultRoleId: readRequired(env.VAULT_ROLE_ID, 'VAULT_ROLE_ID'),
vaultSecretId: readRequired(env.VAULT_SECRET_ID, "VAULT_SECRET_ID"), vaultSecretId: readRequired(env.VAULT_SECRET_ID, 'VAULT_SECRET_ID'),
logLevel: env.LOG_LEVEL ?? "INFO" logLevel: env.LOG_LEVEL ?? 'INFO'
}; };
} }
export interface EthereumRuntimeConfig {
ethRpcUrl: string;
hotWalletPrivateKey: string;
usdtContractAddress: string;
}
export function resolveEthereumConfig(config: AppConfig, secret?: EthereumSecret): EthereumRuntimeConfig {
return {
ethRpcUrl: readRequired(secret?.rpcUrl ?? config.ethRpcUrl, 'ETH_RPC_URL or Vault ethereum.rpc_url'),
hotWalletPrivateKey: readPrivateKey(secret?.hotWalletPrivateKey ?? config.hotWalletPrivateKey),
usdtContractAddress: secret?.usdtContractAddress ?? config.usdtContractAddress ?? MAINNET_USDT_CONTRACT
};
}
function readRequired(value: string | undefined, name: string): string { function readRequired(value: string | undefined, name: string): string {
if (!value) { if (!value) {
throw new Error(`${name} is required`); throw new Error(`${name} is required`);
@@ -43,6 +61,7 @@ function readRequired(value: string | undefined, name: string): string {
return value; return value;
} }
function readInteger(value: string | undefined, defaultValue: number): number { function readInteger(value: string | undefined, defaultValue: number): number {
if (!value) { if (!value) {
return defaultValue; return defaultValue;
@@ -54,3 +73,20 @@ function readInteger(value: string | undefined, defaultValue: number): number {
} }
return parsed; return parsed;
} }
function readOptionalPlaceholder(value: string | undefined, placeholder: string): string | undefined {
if (!value || value === placeholder) {
return undefined;
}
return value;
}
function readPrivateKey(value: string | undefined): string {
const privateKey = readRequired(value, 'HOT_WALLET_PRIVATE_KEY or Vault ethereum.hot_wallet_private_key');
if (privateKey === '0xYOUR_PRIVATE_KEY' || !/^0x[0-9a-fA-F]{64}$/.test(privateKey)) {
throw new Error('HOT_WALLET_PRIVATE_KEY must be a real 32-byte hex private key');
}
return privateKey;
}

View File

@@ -1,45 +1,68 @@
import express, { type NextFunction, type Request, type Response } from "express"; import express,{type NextFunction,type Request,type Response} from 'express';
import type { Logger } from "pino"; import type {Logger} from 'pino';
import { formatUsdtAmount } from "../domain/amount.js"; import type {PostgresClient} from '../db/PostgresClient.js';
import type { EthereumGateway } from "../ethereum/EthereumGateway.js"; import {formatUsdtAmount} from '../domain/amount.js';
import type {EthereumGateway} from '../ethereum/EthereumGateway.js';
import type {AmqpClient} from '../queue/AmqpClient.js';
interface AppOptions { interface AppOptions {
ethereum: EthereumGateway; ethereum: EthereumGateway;
db: PostgresClient;
amqp: AmqpClient;
logger: Logger; logger: Logger;
} }
export function createApp(options: AppOptions) { export function createApp(options: AppOptions) {
const app = express(); const app = express();
const { ethereum, logger } = options; const { ethereum, db, amqp, logger } = options;
app.get("/health", async (_req, res) => { app.get('/health', async (_req, res) => {
try { const hotWalletAddress = ethereum.getHotWalletAddress();
const hotWalletAddress = ethereum.getHotWalletAddress(); const [blockNumber, ethBalanceWei, usdtBalanceUnits, postgres] = await Promise.allSettled([
const [blockNumber, ethBalanceWei, usdtBalanceUnits] = await Promise.all([ ethereum.getCurrentBlockNumber(),
ethereum.getCurrentBlockNumber(), ethereum.getEthBalance(hotWalletAddress),
ethereum.getEthBalance(hotWalletAddress), ethereum.getUsdtBalance(hotWalletAddress),
ethereum.getUsdtBalance(hotWalletAddress) db.ping()
]); ]);
const rabbitmqConnected = amqp.isConnected();
if (
blockNumber.status === 'fulfilled'
&& ethBalanceWei.status === 'fulfilled'
&& usdtBalanceUnits.status === 'fulfilled'
&& postgres.status === 'fulfilled'
&& rabbitmqConnected
) {
res.json({ res.json({
status: "ok", status: 'ok',
blockNumber, blockNumber: blockNumber.value,
hotWalletAddress, hotWalletAddress,
hotWalletEthBalanceWei: ethBalanceWei.toString(), hotWalletEthBalanceWei: ethBalanceWei.value.toString(),
hotWalletUsdtBalanceUnits: usdtBalanceUnits.toString(), hotWalletUsdtBalanceUnits: usdtBalanceUnits.value.toString(),
hotWalletUsdtBalance: formatUsdtAmount(usdtBalanceUnits) hotWalletUsdtBalance: formatUsdtAmount(usdtBalanceUnits.value),
}); postgres: 'ok',
} catch (error) { rabbitmq: 'ok'
logger.error({ err: error }, "/health degraded");
res.status(503).json({
status: "degraded",
error: error instanceof Error ? error.message : String(error)
}); });
return;
} }
const error = {
ethereum: firstRejectedReason([blockNumber, ethBalanceWei, usdtBalanceUnits]),
postgres: postgres.status === 'rejected' ? getErrorMessage(postgres.reason) : undefined,
rabbitmq: rabbitmqConnected ? undefined : 'not connected'
};
logger.error({ err: error }, '/health degraded');
res.status(503).json({
status: 'degraded',
hotWalletAddress,
postgres: postgres.status === 'fulfilled' ? 'ok' : 'degraded',
rabbitmq: rabbitmqConnected ? 'ok' : 'degraded',
error
});
}); });
app.use((error: unknown, _req: Request, res: Response, _next: NextFunction) => { app.use((error: unknown, _req: Request, res: Response, _next: NextFunction) => {
logger.error({ err: error }, "unhandled http error"); logger.error({ err: error }, 'unhandled http error');
res.status(500).json({ res.status(500).json({
error: error instanceof Error ? error.message : String(error) error: error instanceof Error ? error.message : String(error)
}); });
@@ -47,3 +70,14 @@ export function createApp(options: AppOptions) {
return app; return app;
} }
function firstRejectedReason(results: PromiseSettledResult<unknown>[]): string | undefined {
const failed = results.find((result) => result.status === 'rejected');
return failed?.status === 'rejected' ? getErrorMessage(failed.reason) : undefined;
}
function getErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

View File

@@ -1,18 +1,18 @@
import "dotenv/config"; import 'dotenv/config';
import type { Server } from "node:http"; import type {Server} from 'node:http';
import { loadConfig } from "./config.js"; import {loadConfig,resolveEthereumConfig} from './config.js';
import { PostgresClient } from "./db/PostgresClient.js"; import {PostgresClient} from './db/PostgresClient.js';
import { EthersEthereumGateway } from "./ethereum/EthersEthereumGateway.js"; import {EthersEthereumGateway} from './ethereum/EthersEthereumGateway.js';
import { createApp } from "./http/app.js"; import {createApp} from './http/app.js';
import { createLogger } from "./logger.js"; import {createLogger} from './logger.js';
import { AmqpClient } from "./queue/AmqpClient.js"; import {AmqpClient} from './queue/AmqpClient.js';
import { VaultClient } from "./secrets/VaultClient.js"; import {VaultClient} from './secrets/VaultClient.js';
import { TransferOrchestrator } from "./services/TransferOrchestrator.js"; import {TransferOrchestrator} from './services/TransferOrchestrator.js';
async function main(): Promise<void> { async function main(): Promise<void> {
const config = loadConfig(); const config = loadConfig();
const logger = createLogger(config.logLevel); const logger = createLogger(config.logLevel);
logger.info({ event: "bootstrap.start" }, "starting usdt-transfer service"); logger.info({ event: 'bootstrap.start' }, 'starting usdt-transfer service');
const vault = new VaultClient({ const vault = new VaultClient({
addr: config.vaultAddr, addr: config.vaultAddr,
@@ -21,16 +21,17 @@ async function main(): Promise<void> {
secretId: config.vaultSecretId secretId: config.vaultSecretId
}); });
const secrets = await vault.bootstrap(); const secrets = await vault.bootstrap();
logger.info({ event: "bootstrap.vault_loaded" }, "vault secrets loaded"); logger.info({ event: 'bootstrap.vault_loaded' }, 'vault secrets loaded');
const ethereumConfig = resolveEthereumConfig(config, secrets.ethereum);
const db = new PostgresClient(secrets.database); const db = new PostgresClient(secrets.database);
await db.ping(); await db.ping();
logger.info({ event: "bootstrap.pg_ready" }, "postgres pool ready"); logger.info({ event: 'bootstrap.pg_ready' }, 'postgres pool ready');
const ethereum = new EthersEthereumGateway({ const ethereum = new EthersEthereumGateway({
rpcUrl: config.ethRpcUrl, rpcUrl: ethereumConfig.ethRpcUrl,
privateKey: config.hotWalletPrivateKey, privateKey: ethereumConfig.hotWalletPrivateKey,
usdtContractAddress: config.usdtContractAddress usdtContractAddress: ethereumConfig.usdtContractAddress
}); });
let triggerShutdown: (reason: string) => void = () => {}; let triggerShutdown: (reason: string) => void = () => {};
@@ -38,7 +39,7 @@ async function main(): Promise<void> {
const amqp = new AmqpClient({ const amqp = new AmqpClient({
secret: secrets.rabbitmq, secret: secrets.rabbitmq,
logger, logger,
onConnectionLost: () => triggerShutdown("amqp_connection_lost") onConnectionLost: () => triggerShutdown('amqp_connection_lost')
}); });
await amqp.connect(); await amqp.connect();
@@ -51,11 +52,11 @@ async function main(): Promise<void> {
await amqp.startConsumer((message, log) => orchestrator.handle(message, log)); await amqp.startConsumer((message, log) => orchestrator.handle(message, log));
const app = createApp({ ethereum, logger }); const app = createApp({ ethereum, db, amqp, logger });
const server: Server = await new Promise((resolve) => { const server: Server = await new Promise((resolve) => {
const s = app.listen(config.port, () => resolve(s)); const s = app.listen(config.port, () => resolve(s));
}); });
logger.info({ event: "bootstrap.http_listening", port: config.port }, "http listening"); logger.info({ event: 'bootstrap.http_listening', port: config.port }, 'http listening');
let shuttingDown = false; let shuttingDown = false;
triggerShutdown = (reason: string) => { triggerShutdown = (reason: string) => {
@@ -63,29 +64,27 @@ async function main(): Promise<void> {
return; return;
} }
shuttingDown = true; shuttingDown = true;
logger.warn({ event: "shutdown.start", reason }, "shutting down"); logger.warn({ event: 'shutdown.start', reason }, 'shutting down');
void (async () => { void (async () => {
try { try {
await new Promise<void>((resolve) => server.close(() => resolve())); await new Promise<void>((resolve) => server.close(() => resolve()));
await amqp.close(); await amqp.close();
await db.close(); await db.close();
logger.info({ event: "shutdown.done" }, "shutdown complete"); logger.info({ event: 'shutdown.done' }, 'shutdown complete');
process.exit(reason === "amqp_connection_lost" ? 1 : 0); process.exit(reason === 'amqp_connection_lost' ? 1 : 0);
} catch (error) { } catch (error) {
logger.error({ event: "shutdown.error", err: error }, "shutdown error"); logger.error({ event: 'shutdown.error', err: error }, 'shutdown error');
process.exit(1); process.exit(1);
} }
})(); })();
}; };
process.on("SIGINT", () => triggerShutdown("SIGINT")); process.on('SIGINT', () => triggerShutdown('SIGINT'));
process.on("SIGTERM", () => triggerShutdown("SIGTERM")); process.on('SIGTERM', () => triggerShutdown('SIGTERM'));
} }
main().catch((error) => { main().catch((error) => {
// Logger may not be ready yet; fall back to stderr. console.error('fatal bootstrap error:', error);
// eslint-disable-next-line no-console
console.error("fatal bootstrap error:", error);
process.exit(1); process.exit(1);
}); });

View File

@@ -1,11 +1,12 @@
import amqplib, { type ChannelModel, type Channel, type ConsumeMessage } from "amqplib"; import {once} from 'node:events';
import type { Logger } from "pino"; import amqplib,{type ChannelModel,type ConfirmChannel,type ConsumeMessage} from 'amqplib';
import type { RabbitMqSecret } from "../secrets/VaultClient.js"; import type {Logger} from 'pino';
import type { CryptoTransferCompleted, CryptoTransferRequest } from "./messageSchema.js"; import type {RabbitMqSecret} from '../secrets/VaultClient.js';
import { parseCryptoTransferRequest } from "./messageSchema.js"; import type {CryptoTransferCompleted,CryptoTransferRequest} from './messageSchema.js';
import {parseCryptoTransferRequest} from './messageSchema.js';
export const REQUEST_QUEUE = "crypto.transfer.requested"; export const REQUEST_QUEUE = 'crypto.transfer.requested';
export const COMPLETED_QUEUE = "crypto.transfer.completed"; export const COMPLETED_QUEUE = 'crypto.transfer.completed';
export type TransferRequestHandler = ( export type TransferRequestHandler = (
message: CryptoTransferRequest, message: CryptoTransferRequest,
@@ -23,17 +24,18 @@ export class AmqpClient {
private readonly secret: RabbitMqSecret; private readonly secret: RabbitMqSecret;
private readonly onConnectionLost: () => void; private readonly onConnectionLost: () => void;
private connection?: ChannelModel; private connection?: ChannelModel;
private channel?: Channel; private channel?: ConfirmChannel;
private connected = false;
constructor(options: AmqpClientOptions) { constructor(options: AmqpClientOptions) {
this.secret = options.secret; this.secret = options.secret;
this.logger = options.logger.child({ component: "amqp" }); this.logger = options.logger.child({ component: 'amqp' });
this.onConnectionLost = options.onConnectionLost; this.onConnectionLost = options.onConnectionLost;
} }
async connect(): Promise<void> { async connect(): Promise<void> {
this.connection = await amqplib.connect({ this.connection = await amqplib.connect({
protocol: "amqp", protocol: 'amqp',
hostname: this.secret.host, hostname: this.secret.host,
port: this.secret.port, port: this.secret.port,
username: this.secret.user, username: this.secret.user,
@@ -41,28 +43,30 @@ export class AmqpClient {
vhost: this.secret.vhost vhost: this.secret.vhost
}); });
this.connection.on("error", (error) => { this.connection.on('error', (error) => {
this.logger.error({ err: error }, "amqp connection error"); this.logger.error({ err: error }, 'amqp connection error');
}); });
this.connection.on("close", () => { this.connection.on('close', () => {
this.logger.error("amqp connection closed"); this.connected = false;
this.logger.error('amqp connection closed');
this.onConnectionLost(); this.onConnectionLost();
}); });
this.channel = await this.connection.createChannel(); this.channel = await this.connection.createConfirmChannel();
await this.channel.prefetch(1); await this.channel.prefetch(1);
await this.channel.assertQueue(REQUEST_QUEUE, { durable: true }); await this.channel.assertQueue(REQUEST_QUEUE, { durable: true });
await this.channel.assertQueue(COMPLETED_QUEUE, { durable: true }); await this.channel.assertQueue(COMPLETED_QUEUE, { durable: true });
this.connected = true;
this.logger.info( this.logger.info(
{ host: this.secret.host, port: this.secret.port, vhost: this.secret.vhost }, { host: this.secret.host, port: this.secret.port, vhost: this.secret.vhost },
"amqp connected" 'amqp connected'
); );
} }
async startConsumer(handler: TransferRequestHandler): Promise<void> { async startConsumer(handler: TransferRequestHandler): Promise<void> {
if (!this.channel) { if (!this.channel) {
throw new Error("AmqpClient not connected"); throw new Error('AmqpClient not connected');
} }
await this.channel.consume(REQUEST_QUEUE, (msg) => { await this.channel.consume(REQUEST_QUEUE, (msg) => {
@@ -72,32 +76,37 @@ export class AmqpClient {
void this.dispatch(msg, handler); void this.dispatch(msg, handler);
}); });
this.logger.info({ queue: REQUEST_QUEUE }, "amqp consumer started"); this.logger.info({ queue: REQUEST_QUEUE }, 'amqp consumer started');
} }
publishCompleted(payload: CryptoTransferCompleted): boolean { async publishCompleted(payload: CryptoTransferCompleted): Promise<void> {
if (!this.channel) { if (!this.channel) {
throw new Error("AmqpClient not connected"); throw new Error('AmqpClient not connected');
} }
return this.channel.sendToQueue( const accepted = this.channel.sendToQueue(
COMPLETED_QUEUE, COMPLETED_QUEUE,
Buffer.from(JSON.stringify(payload)), Buffer.from(JSON.stringify(payload)),
{ contentType: "application/json", persistent: true } { contentType: 'application/json', persistent: true }
); );
if (!accepted) {
await once(this.channel, 'drain');
}
await this.channel.waitForConfirms();
}
isConnected(): boolean {
return this.connected;
} }
async close(): Promise<void> { async close(): Promise<void> {
this.connected = false;
try { try {
await this.channel?.close(); await this.channel?.close();
} catch { } catch {}
/* ignore */
}
try { try {
await this.connection?.close(); await this.connection?.close();
} catch { } catch {}
/* ignore */
}
} }
private async dispatch(msg: ConsumeMessage, handler: TransferRequestHandler): Promise<void> { private async dispatch(msg: ConsumeMessage, handler: TransferRequestHandler): Promise<void> {
@@ -108,12 +117,12 @@ export class AmqpClient {
let parsed: CryptoTransferRequest; let parsed: CryptoTransferRequest;
try { try {
const body = JSON.parse(msg.content.toString("utf8")) as unknown; const body = JSON.parse(msg.content.toString('utf8')) as unknown;
parsed = parseCryptoTransferRequest(body); parsed = parseCryptoTransferRequest(body);
} catch (error) { } catch (error) {
this.logger.error( this.logger.error(
{ err: error, raw: safeRaw(msg) }, { err: error, raw: safeRaw(msg) },
"rejected malformed message" 'rejected malformed message'
); );
channel.ack(msg); channel.ack(msg);
return; return;
@@ -126,22 +135,23 @@ export class AmqpClient {
message_id: parsed.message_id message_id: parsed.message_id
}); });
log.info({ event: "transfer.requested.received" }, "received from queue"); log.info({ event: 'transfer.requested.received' }, 'received from queue');
try { try {
await handler(parsed, log); await handler(parsed, log);
channel.ack(msg); channel.ack(msg);
} catch (error) { } catch (error) {
log.error({ err: error }, "infrastructure error during message handling, requeue=false"); log.error({ err: error }, 'infrastructure error during message handling, requeue=true');
channel.nack(msg, false, false); channel.nack(msg, false, true);
} }
} }
} }
function safeRaw(msg: ConsumeMessage): string { function safeRaw(msg: ConsumeMessage): string {
try { try {
return msg.content.toString("utf8").slice(0, 512); return msg.content.toString('utf8').slice(0, 512);
} catch { } catch {
return "<unreadable>"; return '<unreadable>';
} }
} }

View File

@@ -22,9 +22,16 @@ export interface RabbitMqSecret {
password: string; password: string;
} }
export interface EthereumSecret {
rpcUrl?: string;
hotWalletPrivateKey?: string;
usdtContractAddress?: string;
}
export interface BootstrappedSecrets { export interface BootstrappedSecrets {
database: DatabaseSecret; database: DatabaseSecret;
rabbitmq: RabbitMqSecret; rabbitmq: RabbitMqSecret;
ethereum?: EthereumSecret;
} }
export class VaultClient { export class VaultClient {
@@ -36,7 +43,7 @@ export class VaultClient {
private token?: string; private token?: string;
constructor(options: VaultClientOptions) { constructor(options: VaultClientOptions) {
this.baseUrl = options.addr.replace(/\/+$/, ""); this.baseUrl = options.addr.replace(/\/+$/, '');
this.mountPoint = options.mountPoint; this.mountPoint = options.mountPoint;
this.roleId = options.roleId; this.roleId = options.roleId;
this.secretId = options.secretId; this.secretId = options.secretId;
@@ -45,58 +52,71 @@ export class VaultClient {
async bootstrap(): Promise<BootstrappedSecrets> { async bootstrap(): Promise<BootstrappedSecrets> {
await this.login(); await this.login();
const [database, rabbitmq] = await Promise.all([ const [database, rabbitmq, ethereum] = await Promise.all([
this.readSecret("database"), this.readSecret('database'),
this.readSecret("rabbitmq") this.readSecret('rabbitmq'),
this.readOptionalSecret('ethereum')
]); ]);
return { return {
database: parseDatabaseSecret(database), database: parseDatabaseSecret(database),
rabbitmq: parseRabbitMqSecret(rabbitmq) rabbitmq: parseRabbitMqSecret(rabbitmq),
ethereum: ethereum ? parseEthereumSecret(ethereum) : undefined
}; };
} }
private async login(): Promise<void> { private async login(): Promise<void> {
const response = await this.fetch(`${this.baseUrl}/v1/auth/approle/login`, { const response = await this.fetch(`${this.baseUrl}/v1/auth/approle/login`, {
method: "POST", method: 'POST',
headers: { "content-type": "application/json" }, headers: { 'content-type': 'application/json' },
body: JSON.stringify({ role_id: this.roleId, secret_id: this.secretId }) body: JSON.stringify({ role_id: this.roleId, secret_id: this.secretId })
}); });
const json = (await response.json()) as { auth?: { client_token?: string } }; const json = (await response.json()) as { auth?: { client_token?: string } };
const token = json.auth?.client_token; const token = json.auth?.client_token;
if (!token) { if (!token) {
throw new Error("Vault AppRole login did not return a client_token"); throw new Error('Vault AppRole login did not return a client_token');
} }
this.token = token; this.token = token;
} }
private async readSecret(path: string): Promise<Record<string, unknown>> { private async readSecret(path: string): Promise<Record<string, unknown>> {
if (!this.token) { if (!this.token) {
throw new Error("Vault token is missing; call bootstrap() first"); throw new Error('Vault token is missing; call bootstrap() first');
} }
const url = `${this.baseUrl}/v1/${this.mountPoint}/data/${path}`; const url = `${this.baseUrl}/v1/${this.mountPoint}/data/${path}`;
const response = await this.fetch(url, { const response = await this.fetch(url, {
method: "GET", method: 'GET',
headers: { "X-Vault-Token": this.token } headers: { 'X-Vault-Token': this.token }
}); });
const json = (await response.json()) as { data?: { data?: Record<string, unknown> } }; const json = (await response.json()) as { data?: { data?: Record<string, unknown> } };
const data = json.data?.data; const data = json.data?.data;
if (!data || typeof data !== "object") { if (!data || typeof data !== 'object') {
throw new Error(`Vault secret ${this.mountPoint}/${path} is empty or malformed`); throw new Error(`Vault secret ${this.mountPoint}/${path} is empty or malformed`);
} }
return data; return data;
} }
private async readOptionalSecret(path: string): Promise<Record<string, unknown> | null> {
try {
return await this.readSecret(path);
} catch (error) {
if (error instanceof Error && error.message.includes('HTTP 404')) {
return null;
}
throw error;
}
}
private async fetch(url: string, init: RequestInit): Promise<Response> { private async fetch(url: string, init: RequestInit): Promise<Response> {
const controller = new AbortController(); const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), this.timeoutMs); const timeout = setTimeout(() => controller.abort(), this.timeoutMs);
try { try {
const response = await fetch(url, { ...init, signal: controller.signal }); const response = await fetch(url, { ...init, signal: controller.signal });
if (!response.ok) { if (!response.ok) {
const text = await response.text().catch(() => ""); const text = await response.text().catch(() => '');
throw new Error(`Vault request to ${url} failed with HTTP ${response.status}: ${text}`); throw new Error(`Vault request to ${url} failed with HTTP ${response.status}: ${text}`);
} }
return response; return response;
@@ -106,37 +126,62 @@ export class VaultClient {
} }
} }
function parseDatabaseSecret(raw: Record<string, unknown>): DatabaseSecret { function parseDatabaseSecret(raw: Record<string, unknown>): DatabaseSecret {
return { return {
host: requireString(raw, "host", "database"), host: requireString(raw, 'host', 'database'),
port: requirePort(raw, "port", "database"), port: requirePort(raw, 'port', 'database'),
name: requireString(raw, "name", "database"), name: requireString(raw, 'name', 'database'),
user: requireString(raw, "user", "database"), user: requireString(raw, 'user', 'database'),
password: requireString(raw, "password", "database") password: requireString(raw, 'password', 'database')
}; };
} }
function parseRabbitMqSecret(raw: Record<string, unknown>): RabbitMqSecret { function parseRabbitMqSecret(raw: Record<string, unknown>): RabbitMqSecret {
return { return {
host: requireString(raw, "host", "rabbitmq"), host: requireString(raw, 'host', 'rabbitmq'),
port: requirePort(raw, "port", "rabbitmq"), port: requirePort(raw, 'port', 'rabbitmq'),
vhost: requireString(raw, "vhost", "rabbitmq"), vhost: requireString(raw, 'vhost', 'rabbitmq'),
user: requireString(raw, "user", "rabbitmq"), user: requireString(raw, 'user', 'rabbitmq'),
password: requireString(raw, "password", "rabbitmq") password: requireString(raw, 'password', 'rabbitmq')
}; };
} }
function parseEthereumSecret(raw: Record<string, unknown>): EthereumSecret {
return {
rpcUrl: optionalString(raw, 'rpc_url', 'ethereum'),
hotWalletPrivateKey: optionalString(raw, 'hot_wallet_private_key', 'ethereum'),
usdtContractAddress: optionalString(raw, 'usdt_contract_address', 'ethereum')
};
}
function requireString(raw: Record<string, unknown>, key: string, secretName: string): string { function requireString(raw: Record<string, unknown>, key: string, secretName: string): string {
const value = raw[key]; const value = raw[key];
if (typeof value !== "string" || !value) { if (typeof value !== 'string' || !value) {
throw new Error(`Vault secret '${secretName}' is missing string field '${key}'`); throw new Error(`Vault secret '${secretName}' is missing string field '${key}'`);
} }
return value; return value;
} }
function optionalString(raw: Record<string, unknown>, key: string, secretName: string): string | undefined {
const value = raw[key];
if (value === undefined || value === null || value === '') {
return undefined;
}
if (typeof value !== 'string') {
throw new Error(`Vault secret '${secretName}' field '${key}' must be a string`);
}
return value;
}
function requirePort(raw: Record<string, unknown>, key: string, secretName: string): number { function requirePort(raw: Record<string, unknown>, key: string, secretName: string): number {
const value = raw[key]; const value = raw[key];
const numeric = typeof value === "number" ? value : Number(value); const numeric = typeof value === 'number' ? value : Number(value);
if (!Number.isFinite(numeric) || numeric <= 0 || numeric > 65535) { if (!Number.isFinite(numeric) || numeric <= 0 || numeric > 65535) {
throw new Error(`Vault secret '${secretName}' has invalid port '${String(value)}'`); throw new Error(`Vault secret '${secretName}' has invalid port '${String(value)}'`);
} }

View File

@@ -1,11 +1,11 @@
import { isAddress } from "ethers"; import {isAddress} from 'ethers';
import type { Logger } from "pino"; import type {Logger} from 'pino';
import { ulid } from "ulid"; import {ulid} from 'ulid';
import { parseUsdtAmount } from "../domain/amount.js"; import {parseUsdtAmount} from '../domain/amount.js';
import type { PostgresClient } from "../db/PostgresClient.js"; import type {PostgresClient} from '../db/PostgresClient.js';
import type { EthereumGateway } from "../ethereum/EthereumGateway.js"; import type {EthereumGateway} from '../ethereum/EthereumGateway.js';
import type { AmqpClient } from "../queue/AmqpClient.js"; import type {AmqpClient} from '../queue/AmqpClient.js';
import type { CryptoTransferRequest } from "../queue/messageSchema.js"; import type {CryptoTransferRequest} from '../queue/messageSchema.js';
export interface TransferOrchestratorOptions { export interface TransferOrchestratorOptions {
maxTransferAmountUnits: bigint; maxTransferAmountUnits: bigint;
@@ -30,26 +30,27 @@ export class TransferOrchestrator {
async handle(message: CryptoTransferRequest, log: Logger): Promise<void> { async handle(message: CryptoTransferRequest, log: Logger): Promise<void> {
const existing = await this.db.getPaymentByOrderId(message.order_id); const existing = await this.db.getPaymentByOrderId(message.order_id);
if (!existing) { if (!existing) {
log.error({ event: "payment.missing" }, "no payment row for order_id, ack"); log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack');
return; return;
} }
if (existing.status === "usdt_delivered") { if (existing.status === 'usdt_delivered') {
log.info({ event: "payment.already_delivered" }, "skip — already delivered"); log.info({ event: 'payment.already_delivered' }, 'already delivered, publishing completion');
await this.publishCompleted(message, log);
return; return;
} }
if (existing.status === "web3_hash_error" || existing.status === "web3_balance_problem") { if (existing.status === 'web3_hash_error' || existing.status === 'web3_balance_problem') {
log.warn( log.warn(
{ event: "payment.terminal_state", status: existing.status }, { event: 'payment.terminal_state', status: existing.status },
"skip payment already in terminal failure state" 'skip - payment already in terminal failure state'
); );
return; return;
} }
const usdtAmountString = await this.db.getOrderUsdtAmount(message.order_id); const usdtAmountString = await this.db.getOrderUsdtAmount(message.order_id);
if (!usdtAmountString) { if (!usdtAmountString) {
log.error({ event: "order.not_found" }, "order missing for order_id"); log.error({ event: 'order.not_found' }, 'order missing for order_id');
await this.db.markPaymentHashError(message.order_id); await this.db.markPaymentHashError(message.order_id);
return; return;
} }
@@ -59,8 +60,8 @@ export class TransferOrchestrator {
amountUnits = parseUsdtAmount(usdtAmountString); amountUnits = parseUsdtAmount(usdtAmountString);
} catch (error) { } catch (error) {
log.error( log.error(
{ event: "order.invalid_amount", err: error, usdt_amount: usdtAmountString }, { event: 'order.invalid_amount', err: error, usdt_amount: usdtAmountString },
"could not parse usdt_amount" 'could not parse usdt_amount'
); );
await this.db.markPaymentHashError(message.order_id); await this.db.markPaymentHashError(message.order_id);
return; return;
@@ -68,8 +69,8 @@ export class TransferOrchestrator {
if (amountUnits > this.options.maxTransferAmountUnits) { if (amountUnits > this.options.maxTransferAmountUnits) {
log.error( log.error(
{ event: "transfer.amount_exceeds_limit", amount_units: amountUnits.toString() }, { event: 'transfer.amount_exceeds_limit', amount_units: amountUnits.toString() },
"usdt_amount exceeds MAX_TRANSFER_USDT" 'usdt_amount exceeds MAX_TRANSFER_USDT'
); );
await this.db.markPaymentHashError(message.order_id); await this.db.markPaymentHashError(message.order_id);
return; return;
@@ -77,7 +78,7 @@ export class TransferOrchestrator {
const recipient = await this.db.getUserEthWalletAddress(message.user_id); const recipient = await this.db.getUserEthWalletAddress(message.user_id);
if (!recipient || !isAddress(recipient)) { if (!recipient || !isAddress(recipient)) {
log.error({ event: "wallet.not_found_or_invalid", recipient }, "no valid ETH wallet for user"); log.error({ event: 'wallet.not_found_or_invalid', recipient }, 'no valid ETH wallet for user');
await this.db.markPaymentHashError(message.order_id); await this.db.markPaymentHashError(message.order_id);
return; return;
} }
@@ -85,7 +86,7 @@ export class TransferOrchestrator {
try { try {
await this.assertHotWalletReady(amountUnits); await this.assertHotWalletReady(amountUnits);
} catch (error) { } catch (error) {
log.error({ event: "hot_wallet.not_ready", err: error }, "hot wallet pre-check failed"); log.error({ event: 'hot_wallet.not_ready', err: error }, 'hot wallet pre-check failed');
await this.db.markPaymentHashError(message.order_id); await this.db.markPaymentHashError(message.order_id);
return; return;
} }
@@ -94,14 +95,14 @@ export class TransferOrchestrator {
try { try {
preBalance = await this.ethereum.getUsdtBalance(recipient); preBalance = await this.ethereum.getUsdtBalance(recipient);
} catch (error) { } catch (error) {
log.error({ event: "rpc.pre_balance_failed", err: error }, "failed to read recipient pre-balance"); log.error({ event: 'rpc.pre_balance_failed', err: error }, 'failed to read recipient pre-balance');
await this.db.markPaymentHashError(message.order_id); await this.db.markPaymentHashError(message.order_id);
return; return;
} }
log.info( log.info(
{ event: "transfer.pre_balance", pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() }, { event: 'transfer.pre_balance', pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() },
"captured pre-balance" 'captured pre-balance'
); );
let txHash: string; let txHash: string;
@@ -109,35 +110,24 @@ export class TransferOrchestrator {
const tx = await this.ethereum.sendUsdtTransfer(recipient, amountUnits); const tx = await this.ethereum.sendUsdtTransfer(recipient, amountUnits);
txHash = tx.hash; txHash = tx.hash;
} catch (error) { } catch (error) {
log.error({ event: "transfer.broadcast_failed", err: error }, "broadcast failed"); log.error({ event: 'transfer.broadcast_failed', err: error }, 'broadcast failed');
await this.db.markPaymentHashError(message.order_id); await this.db.markPaymentHashError(message.order_id);
return; return;
} }
log.info({ event: "transfer.broadcasted", tx_hash: txHash }, "broadcast OK"); log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK');
await this.db.setPaymentTxHash(message.order_id, txHash); await this.db.setPaymentTxHash(message.order_id, txHash);
const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log); const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log);
if (!delivered) { if (!delivered) {
log.error({ event: "transfer.balance_problem", tx_hash: txHash }, "balance did not increase after 3 checks"); log.error({ event: 'transfer.balance_problem', tx_hash: txHash }, 'balance did not increase after 3 checks');
await this.db.markPaymentBalanceProblem(message.order_id, txHash); await this.db.markPaymentBalanceProblem(message.order_id, txHash);
return; return;
} }
await this.db.markPaymentDelivered(message.order_id, txHash); await this.db.markPaymentDelivered(message.order_id, txHash);
log.info({ event: "transfer.delivered", tx_hash: txHash }, "marked usdt_delivered"); log.info({ event: 'transfer.delivered', tx_hash: txHash }, 'marked usdt_delivered');
await this.publishCompleted(message, log);
const completedMessageId = ulid();
this.amqp.publishCompleted({
user_id: message.user_id,
order_id: message.order_id,
trace_id: message.trace_id,
message_id: completedMessageId
});
log.info(
{ event: "transfer.completed.published", message_id: completedMessageId },
"published crypto.transfer.completed"
);
} }
private async assertHotWalletReady(amountUnits: bigint): Promise<void> { private async assertHotWalletReady(amountUnits: bigint): Promise<void> {
@@ -148,10 +138,10 @@ export class TransferOrchestrator {
]); ]);
if (ethBalanceWei < this.options.minEthBalanceWei) { if (ethBalanceWei < this.options.minEthBalanceWei) {
throw new Error("Hot wallet ETH balance is below gas threshold"); throw new Error('Hot wallet ETH balance is below gas threshold');
} }
if (usdtBalanceUnits < amountUnits) { if (usdtBalanceUnits < amountUnits) {
throw new Error("Hot wallet USDT balance is insufficient"); throw new Error('Hot wallet USDT balance is insufficient');
} }
} }
@@ -169,8 +159,8 @@ export class TransferOrchestrator {
postBalance = await this.ethereum.getUsdtBalance(recipient); postBalance = await this.ethereum.getUsdtBalance(recipient);
} catch (error) { } catch (error) {
log.warn( log.warn(
{ event: "balance_check.rpc_error", attempt, err: error }, { event: 'balance_check.rpc_error', attempt, err: error },
"balance check RPC error" 'balance check RPC error'
); );
continue; continue;
} }
@@ -178,13 +168,13 @@ export class TransferOrchestrator {
const delta = postBalance - preBalance; const delta = postBalance - preBalance;
log.info( log.info(
{ {
event: "balance_check.result", event: 'balance_check.result',
attempt, attempt,
attempts_total: this.options.balanceCheckAttempts, attempts_total: this.options.balanceCheckAttempts,
post_balance_units: postBalance.toString(), post_balance_units: postBalance.toString(),
delta_units: delta.toString() delta_units: delta.toString()
}, },
"balance check" 'balance check'
); );
if (delta >= amountUnits) { if (delta >= amountUnits) {
@@ -194,8 +184,23 @@ export class TransferOrchestrator {
return false; return false;
} }
private async publishCompleted(message: CryptoTransferRequest, log: Logger): Promise<void> {
const completedMessageId = ulid();
await this.amqp.publishCompleted({
user_id: message.user_id,
order_id: message.order_id,
trace_id: message.trace_id,
message_id: completedMessageId
});
log.info(
{ event: 'transfer.completed.published', message_id: completedMessageId },
'published crypto.transfer.completed'
);
}
} }
function defaultSleep(ms: number): Promise<void> { function defaultSleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }