diff --git a/.env.example b/.env.example index 631378f..86cc615 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,6 @@ PORT=3000 ETH_RPC_URL=https://ethereum-rpc.publicnode.com -HOT_WALLET_PRIVATE_KEY=0xYOUR_PRIVATE_KEY +HOT_WALLET_PRIVATE_KEY= USDT_CONTRACT_ADDRESS=0xdAC17F958D2ee523a2206206994597C13D831ec7 VAULT_MOUNT_POINT=dev-secrets diff --git a/README.md b/README.md index c7d3d10..25cbd67 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ RabbitMQ (crypto.transfer.requested) └─ AmqpPublisher (crypto.transfer.completed) ``` -Postgres credentials and RabbitMQ credentials are pulled from HashiCorp Vault (KV v2, AppRole auth). The hot-wallet private key, ETH RPC URL and limits stay in `.env`. +Postgres credentials and RabbitMQ credentials are pulled from HashiCorp Vault (KV v2, AppRole auth). The hot-wallet private key, ETH RPC URL and USDT contract can also be pulled from Vault via the optional `ethereum` secret; `.env` remains as a fallback for those fields and runtime limits. ## Message contract @@ -66,7 +66,7 @@ Required `.env` (see `.env.example`): ``` PORT=3000 ETH_RPC_URL=https://ethereum-rpc.publicnode.com -HOT_WALLET_PRIVATE_KEY=0x... +HOT_WALLET_PRIVATE_KEY= USDT_CONTRACT_ADDRESS=0xdAC17F958D2ee523a2206206994597C13D831ec7 VAULT_MOUNT_POINT=dev-secrets @@ -107,11 +107,19 @@ WEBHOOK_TIMEOUT_MS=10000 | user | username | | password | password | +Optional `dev-secrets/ethereum`: + +| key | value | +|------------------------|--------------------------| +| rpc_url | Ethereum RPC URL | +| hot_wallet_private_key | company wallet private key | +| usdt_contract_address | USDT contract address | + ## HTTP A single endpoint is exposed for liveness/readiness probes: -- `GET /health` — RPC block number + hot-wallet ETH/USDT balances. Returns 503 on RPC errors. +- `GET /health` — RPC block number, hot-wallet ETH/USDT balances, Postgres ping and RabbitMQ connection state. Returns 503 if any dependency is degraded. ## Local development diff --git a/src/config.ts b/src/config.ts index 7a3c29c..c5e56b7 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,12 +1,13 @@ -import { parseUsdtAmount } from "./domain/amount.js"; +import {parseUsdtAmount} from './domain/amount.js'; +import type {EthereumSecret} from './secrets/VaultClient.js'; -export const MAINNET_USDT_CONTRACT = "0xdAC17F958D2ee523a2206206994597C13D831ec7"; +export const MAINNET_USDT_CONTRACT = '0xdAC17F958D2ee523a2206206994597C13D831ec7'; export interface AppConfig { port: number; - ethRpcUrl: string; - hotWalletPrivateKey: string; - usdtContractAddress: string; + ethRpcUrl?: string; + hotWalletPrivateKey?: string; + usdtContractAddress?: string; maxTransferAmountUnits: bigint; minEthBalanceWei: bigint; balanceCheckAttempts: number; @@ -21,21 +22,38 @@ export interface AppConfig { export function loadConfig(env: NodeJS.ProcessEnv = process.env): AppConfig { return { port: readInteger(env.PORT, 3000), - ethRpcUrl: readRequired(env.ETH_RPC_URL, "ETH_RPC_URL"), - hotWalletPrivateKey: readRequired(env.HOT_WALLET_PRIVATE_KEY, "HOT_WALLET_PRIVATE_KEY"), + ethRpcUrl: env.ETH_RPC_URL, + hotWalletPrivateKey: readOptionalPlaceholder(env.HOT_WALLET_PRIVATE_KEY, '0xYOUR_PRIVATE_KEY'), usdtContractAddress: env.USDT_CONTRACT_ADDRESS ?? MAINNET_USDT_CONTRACT, - maxTransferAmountUnits: parseUsdtAmount(env.MAX_TRANSFER_USDT ?? "1000"), - minEthBalanceWei: BigInt(env.MIN_ETH_BALANCE_WEI ?? "1"), + maxTransferAmountUnits: parseUsdtAmount(env.MAX_TRANSFER_USDT ?? '1000'), + minEthBalanceWei: BigInt(env.MIN_ETH_BALANCE_WEI ?? '1'), balanceCheckAttempts: 3, balanceCheckIntervalMs: 60_000, - vaultAddr: readRequired(env.VAULT_ADDR, "VAULT_ADDR"), - vaultMountPoint: readRequired(env.VAULT_MOUNT_POINT, "VAULT_MOUNT_POINT"), - vaultRoleId: readRequired(env.VAULT_ROLE_ID, "VAULT_ROLE_ID"), - vaultSecretId: readRequired(env.VAULT_SECRET_ID, "VAULT_SECRET_ID"), - logLevel: env.LOG_LEVEL ?? "INFO" + vaultAddr: readRequired(env.VAULT_ADDR, 'VAULT_ADDR'), + vaultMountPoint: readRequired(env.VAULT_MOUNT_POINT, 'VAULT_MOUNT_POINT'), + vaultRoleId: readRequired(env.VAULT_ROLE_ID, 'VAULT_ROLE_ID'), + vaultSecretId: readRequired(env.VAULT_SECRET_ID, 'VAULT_SECRET_ID'), + logLevel: env.LOG_LEVEL ?? 'INFO' }; } + +export interface EthereumRuntimeConfig { + ethRpcUrl: string; + hotWalletPrivateKey: string; + usdtContractAddress: string; +} + + +export function resolveEthereumConfig(config: AppConfig, secret?: EthereumSecret): EthereumRuntimeConfig { + return { + ethRpcUrl: readRequired(secret?.rpcUrl ?? config.ethRpcUrl, 'ETH_RPC_URL or Vault ethereum.rpc_url'), + hotWalletPrivateKey: readPrivateKey(secret?.hotWalletPrivateKey ?? config.hotWalletPrivateKey), + usdtContractAddress: secret?.usdtContractAddress ?? config.usdtContractAddress ?? MAINNET_USDT_CONTRACT + }; +} + + function readRequired(value: string | undefined, name: string): string { if (!value) { throw new Error(`${name} is required`); @@ -43,6 +61,7 @@ function readRequired(value: string | undefined, name: string): string { return value; } + function readInteger(value: string | undefined, defaultValue: number): number { if (!value) { return defaultValue; @@ -54,3 +73,20 @@ function readInteger(value: string | undefined, defaultValue: number): number { } return parsed; } + + +function readOptionalPlaceholder(value: string | undefined, placeholder: string): string | undefined { + if (!value || value === placeholder) { + return undefined; + } + return value; +} + + +function readPrivateKey(value: string | undefined): string { + const privateKey = readRequired(value, 'HOT_WALLET_PRIVATE_KEY or Vault ethereum.hot_wallet_private_key'); + if (privateKey === '0xYOUR_PRIVATE_KEY' || !/^0x[0-9a-fA-F]{64}$/.test(privateKey)) { + throw new Error('HOT_WALLET_PRIVATE_KEY must be a real 32-byte hex private key'); + } + return privateKey; +} diff --git a/src/http/app.ts b/src/http/app.ts index 35286f7..5a7adb7 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -1,45 +1,68 @@ -import express, { type NextFunction, type Request, type Response } from "express"; -import type { Logger } from "pino"; -import { formatUsdtAmount } from "../domain/amount.js"; -import type { EthereumGateway } from "../ethereum/EthereumGateway.js"; +import express,{type NextFunction,type Request,type Response} from 'express'; +import type {Logger} from 'pino'; +import type {PostgresClient} from '../db/PostgresClient.js'; +import {formatUsdtAmount} from '../domain/amount.js'; +import type {EthereumGateway} from '../ethereum/EthereumGateway.js'; +import type {AmqpClient} from '../queue/AmqpClient.js'; interface AppOptions { ethereum: EthereumGateway; + db: PostgresClient; + amqp: AmqpClient; logger: Logger; } export function createApp(options: AppOptions) { const app = express(); - const { ethereum, logger } = options; + const { ethereum, db, amqp, logger } = options; - app.get("/health", async (_req, res) => { - try { - const hotWalletAddress = ethereum.getHotWalletAddress(); - const [blockNumber, ethBalanceWei, usdtBalanceUnits] = await Promise.all([ - ethereum.getCurrentBlockNumber(), - ethereum.getEthBalance(hotWalletAddress), - ethereum.getUsdtBalance(hotWalletAddress) - ]); + app.get('/health', async (_req, res) => { + const hotWalletAddress = ethereum.getHotWalletAddress(); + const [blockNumber, ethBalanceWei, usdtBalanceUnits, postgres] = await Promise.allSettled([ + ethereum.getCurrentBlockNumber(), + ethereum.getEthBalance(hotWalletAddress), + ethereum.getUsdtBalance(hotWalletAddress), + db.ping() + ]); + const rabbitmqConnected = amqp.isConnected(); + if ( + blockNumber.status === 'fulfilled' + && ethBalanceWei.status === 'fulfilled' + && usdtBalanceUnits.status === 'fulfilled' + && postgres.status === 'fulfilled' + && rabbitmqConnected + ) { res.json({ - status: "ok", - blockNumber, + status: 'ok', + blockNumber: blockNumber.value, hotWalletAddress, - hotWalletEthBalanceWei: ethBalanceWei.toString(), - hotWalletUsdtBalanceUnits: usdtBalanceUnits.toString(), - hotWalletUsdtBalance: formatUsdtAmount(usdtBalanceUnits) - }); - } catch (error) { - logger.error({ err: error }, "/health degraded"); - res.status(503).json({ - status: "degraded", - error: error instanceof Error ? error.message : String(error) + hotWalletEthBalanceWei: ethBalanceWei.value.toString(), + hotWalletUsdtBalanceUnits: usdtBalanceUnits.value.toString(), + hotWalletUsdtBalance: formatUsdtAmount(usdtBalanceUnits.value), + postgres: 'ok', + rabbitmq: 'ok' }); + return; } + + const error = { + ethereum: firstRejectedReason([blockNumber, ethBalanceWei, usdtBalanceUnits]), + postgres: postgres.status === 'rejected' ? getErrorMessage(postgres.reason) : undefined, + rabbitmq: rabbitmqConnected ? undefined : 'not connected' + }; + logger.error({ err: error }, '/health degraded'); + res.status(503).json({ + status: 'degraded', + hotWalletAddress, + postgres: postgres.status === 'fulfilled' ? 'ok' : 'degraded', + rabbitmq: rabbitmqConnected ? 'ok' : 'degraded', + error + }); }); app.use((error: unknown, _req: Request, res: Response, _next: NextFunction) => { - logger.error({ err: error }, "unhandled http error"); + logger.error({ err: error }, 'unhandled http error'); res.status(500).json({ error: error instanceof Error ? error.message : String(error) }); @@ -47,3 +70,14 @@ export function createApp(options: AppOptions) { return app; } + + +function firstRejectedReason(results: PromiseSettledResult[]): string | undefined { + const failed = results.find((result) => result.status === 'rejected'); + return failed?.status === 'rejected' ? getErrorMessage(failed.reason) : undefined; +} + + +function getErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/src/index.ts b/src/index.ts index 13ff47a..2554af8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,18 +1,18 @@ -import "dotenv/config"; -import type { Server } from "node:http"; -import { loadConfig } from "./config.js"; -import { PostgresClient } from "./db/PostgresClient.js"; -import { EthersEthereumGateway } from "./ethereum/EthersEthereumGateway.js"; -import { createApp } from "./http/app.js"; -import { createLogger } from "./logger.js"; -import { AmqpClient } from "./queue/AmqpClient.js"; -import { VaultClient } from "./secrets/VaultClient.js"; -import { TransferOrchestrator } from "./services/TransferOrchestrator.js"; +import 'dotenv/config'; +import type {Server} from 'node:http'; +import {loadConfig,resolveEthereumConfig} from './config.js'; +import {PostgresClient} from './db/PostgresClient.js'; +import {EthersEthereumGateway} from './ethereum/EthersEthereumGateway.js'; +import {createApp} from './http/app.js'; +import {createLogger} from './logger.js'; +import {AmqpClient} from './queue/AmqpClient.js'; +import {VaultClient} from './secrets/VaultClient.js'; +import {TransferOrchestrator} from './services/TransferOrchestrator.js'; async function main(): Promise { const config = loadConfig(); const logger = createLogger(config.logLevel); - logger.info({ event: "bootstrap.start" }, "starting usdt-transfer service"); + logger.info({ event: 'bootstrap.start' }, 'starting usdt-transfer service'); const vault = new VaultClient({ addr: config.vaultAddr, @@ -21,16 +21,17 @@ async function main(): Promise { secretId: config.vaultSecretId }); const secrets = await vault.bootstrap(); - logger.info({ event: "bootstrap.vault_loaded" }, "vault secrets loaded"); + logger.info({ event: 'bootstrap.vault_loaded' }, 'vault secrets loaded'); + const ethereumConfig = resolveEthereumConfig(config, secrets.ethereum); const db = new PostgresClient(secrets.database); await db.ping(); - logger.info({ event: "bootstrap.pg_ready" }, "postgres pool ready"); + logger.info({ event: 'bootstrap.pg_ready' }, 'postgres pool ready'); const ethereum = new EthersEthereumGateway({ - rpcUrl: config.ethRpcUrl, - privateKey: config.hotWalletPrivateKey, - usdtContractAddress: config.usdtContractAddress + rpcUrl: ethereumConfig.ethRpcUrl, + privateKey: ethereumConfig.hotWalletPrivateKey, + usdtContractAddress: ethereumConfig.usdtContractAddress }); let triggerShutdown: (reason: string) => void = () => {}; @@ -38,7 +39,7 @@ async function main(): Promise { const amqp = new AmqpClient({ secret: secrets.rabbitmq, logger, - onConnectionLost: () => triggerShutdown("amqp_connection_lost") + onConnectionLost: () => triggerShutdown('amqp_connection_lost') }); await amqp.connect(); @@ -51,11 +52,11 @@ async function main(): Promise { await amqp.startConsumer((message, log) => orchestrator.handle(message, log)); - const app = createApp({ ethereum, logger }); + const app = createApp({ ethereum, db, amqp, logger }); const server: Server = await new Promise((resolve) => { const s = app.listen(config.port, () => resolve(s)); }); - logger.info({ event: "bootstrap.http_listening", port: config.port }, "http listening"); + logger.info({ event: 'bootstrap.http_listening', port: config.port }, 'http listening'); let shuttingDown = false; triggerShutdown = (reason: string) => { @@ -63,29 +64,27 @@ async function main(): Promise { return; } shuttingDown = true; - logger.warn({ event: "shutdown.start", reason }, "shutting down"); + logger.warn({ event: 'shutdown.start', reason }, 'shutting down'); void (async () => { try { await new Promise((resolve) => server.close(() => resolve())); await amqp.close(); await db.close(); - logger.info({ event: "shutdown.done" }, "shutdown complete"); - process.exit(reason === "amqp_connection_lost" ? 1 : 0); + logger.info({ event: 'shutdown.done' }, 'shutdown complete'); + process.exit(reason === 'amqp_connection_lost' ? 1 : 0); } catch (error) { - logger.error({ event: "shutdown.error", err: error }, "shutdown error"); + logger.error({ event: 'shutdown.error', err: error }, 'shutdown error'); process.exit(1); } })(); }; - process.on("SIGINT", () => triggerShutdown("SIGINT")); - process.on("SIGTERM", () => triggerShutdown("SIGTERM")); + process.on('SIGINT', () => triggerShutdown('SIGINT')); + process.on('SIGTERM', () => triggerShutdown('SIGTERM')); } main().catch((error) => { - // Logger may not be ready yet; fall back to stderr. - // eslint-disable-next-line no-console - console.error("fatal bootstrap error:", error); + console.error('fatal bootstrap error:', error); process.exit(1); }); diff --git a/src/queue/AmqpClient.ts b/src/queue/AmqpClient.ts index 8bc172b..a50d322 100644 --- a/src/queue/AmqpClient.ts +++ b/src/queue/AmqpClient.ts @@ -1,11 +1,12 @@ -import amqplib, { type ChannelModel, type Channel, type ConsumeMessage } from "amqplib"; -import type { Logger } from "pino"; -import type { RabbitMqSecret } from "../secrets/VaultClient.js"; -import type { CryptoTransferCompleted, CryptoTransferRequest } from "./messageSchema.js"; -import { parseCryptoTransferRequest } from "./messageSchema.js"; +import {once} from 'node:events'; +import amqplib,{type ChannelModel,type ConfirmChannel,type ConsumeMessage} from 'amqplib'; +import type {Logger} from 'pino'; +import type {RabbitMqSecret} from '../secrets/VaultClient.js'; +import type {CryptoTransferCompleted,CryptoTransferRequest} from './messageSchema.js'; +import {parseCryptoTransferRequest} from './messageSchema.js'; -export const REQUEST_QUEUE = "crypto.transfer.requested"; -export const COMPLETED_QUEUE = "crypto.transfer.completed"; +export const REQUEST_QUEUE = 'crypto.transfer.requested'; +export const COMPLETED_QUEUE = 'crypto.transfer.completed'; export type TransferRequestHandler = ( message: CryptoTransferRequest, @@ -23,17 +24,18 @@ export class AmqpClient { private readonly secret: RabbitMqSecret; private readonly onConnectionLost: () => void; private connection?: ChannelModel; - private channel?: Channel; + private channel?: ConfirmChannel; + private connected = false; constructor(options: AmqpClientOptions) { this.secret = options.secret; - this.logger = options.logger.child({ component: "amqp" }); + this.logger = options.logger.child({ component: 'amqp' }); this.onConnectionLost = options.onConnectionLost; } async connect(): Promise { this.connection = await amqplib.connect({ - protocol: "amqp", + protocol: 'amqp', hostname: this.secret.host, port: this.secret.port, username: this.secret.user, @@ -41,28 +43,30 @@ export class AmqpClient { vhost: this.secret.vhost }); - this.connection.on("error", (error) => { - this.logger.error({ err: error }, "amqp connection error"); + this.connection.on('error', (error) => { + this.logger.error({ err: error }, 'amqp connection error'); }); - this.connection.on("close", () => { - this.logger.error("amqp connection closed"); + this.connection.on('close', () => { + this.connected = false; + this.logger.error('amqp connection closed'); this.onConnectionLost(); }); - this.channel = await this.connection.createChannel(); + this.channel = await this.connection.createConfirmChannel(); await this.channel.prefetch(1); await this.channel.assertQueue(REQUEST_QUEUE, { durable: true }); await this.channel.assertQueue(COMPLETED_QUEUE, { durable: true }); + this.connected = true; this.logger.info( { host: this.secret.host, port: this.secret.port, vhost: this.secret.vhost }, - "amqp connected" + 'amqp connected' ); } async startConsumer(handler: TransferRequestHandler): Promise { if (!this.channel) { - throw new Error("AmqpClient not connected"); + throw new Error('AmqpClient not connected'); } await this.channel.consume(REQUEST_QUEUE, (msg) => { @@ -72,32 +76,37 @@ export class AmqpClient { void this.dispatch(msg, handler); }); - this.logger.info({ queue: REQUEST_QUEUE }, "amqp consumer started"); + this.logger.info({ queue: REQUEST_QUEUE }, 'amqp consumer started'); } - publishCompleted(payload: CryptoTransferCompleted): boolean { + async publishCompleted(payload: CryptoTransferCompleted): Promise { if (!this.channel) { - throw new Error("AmqpClient not connected"); + throw new Error('AmqpClient not connected'); } - return this.channel.sendToQueue( + const accepted = this.channel.sendToQueue( COMPLETED_QUEUE, Buffer.from(JSON.stringify(payload)), - { contentType: "application/json", persistent: true } + { contentType: 'application/json', persistent: true } ); + if (!accepted) { + await once(this.channel, 'drain'); + } + await this.channel.waitForConfirms(); + } + + isConnected(): boolean { + return this.connected; } async close(): Promise { + this.connected = false; try { await this.channel?.close(); - } catch { - /* ignore */ - } + } catch {} try { await this.connection?.close(); - } catch { - /* ignore */ - } + } catch {} } private async dispatch(msg: ConsumeMessage, handler: TransferRequestHandler): Promise { @@ -108,12 +117,12 @@ export class AmqpClient { let parsed: CryptoTransferRequest; try { - const body = JSON.parse(msg.content.toString("utf8")) as unknown; + const body = JSON.parse(msg.content.toString('utf8')) as unknown; parsed = parseCryptoTransferRequest(body); } catch (error) { this.logger.error( { err: error, raw: safeRaw(msg) }, - "rejected malformed message" + 'rejected malformed message' ); channel.ack(msg); return; @@ -126,22 +135,23 @@ export class AmqpClient { message_id: parsed.message_id }); - log.info({ event: "transfer.requested.received" }, "received from queue"); + log.info({ event: 'transfer.requested.received' }, 'received from queue'); try { await handler(parsed, log); channel.ack(msg); } catch (error) { - log.error({ err: error }, "infrastructure error during message handling, requeue=false"); - channel.nack(msg, false, false); + log.error({ err: error }, 'infrastructure error during message handling, requeue=true'); + channel.nack(msg, false, true); } } } + function safeRaw(msg: ConsumeMessage): string { try { - return msg.content.toString("utf8").slice(0, 512); + return msg.content.toString('utf8').slice(0, 512); } catch { - return ""; + return ''; } } diff --git a/src/secrets/VaultClient.ts b/src/secrets/VaultClient.ts index 08df37f..b72eaac 100644 --- a/src/secrets/VaultClient.ts +++ b/src/secrets/VaultClient.ts @@ -22,9 +22,16 @@ export interface RabbitMqSecret { password: string; } +export interface EthereumSecret { + rpcUrl?: string; + hotWalletPrivateKey?: string; + usdtContractAddress?: string; +} + export interface BootstrappedSecrets { database: DatabaseSecret; rabbitmq: RabbitMqSecret; + ethereum?: EthereumSecret; } export class VaultClient { @@ -36,7 +43,7 @@ export class VaultClient { private token?: string; constructor(options: VaultClientOptions) { - this.baseUrl = options.addr.replace(/\/+$/, ""); + this.baseUrl = options.addr.replace(/\/+$/, ''); this.mountPoint = options.mountPoint; this.roleId = options.roleId; this.secretId = options.secretId; @@ -45,58 +52,71 @@ export class VaultClient { async bootstrap(): Promise { await this.login(); - const [database, rabbitmq] = await Promise.all([ - this.readSecret("database"), - this.readSecret("rabbitmq") + const [database, rabbitmq, ethereum] = await Promise.all([ + this.readSecret('database'), + this.readSecret('rabbitmq'), + this.readOptionalSecret('ethereum') ]); return { database: parseDatabaseSecret(database), - rabbitmq: parseRabbitMqSecret(rabbitmq) + rabbitmq: parseRabbitMqSecret(rabbitmq), + ethereum: ethereum ? parseEthereumSecret(ethereum) : undefined }; } private async login(): Promise { const response = await this.fetch(`${this.baseUrl}/v1/auth/approle/login`, { - method: "POST", - headers: { "content-type": "application/json" }, + method: 'POST', + headers: { 'content-type': 'application/json' }, body: JSON.stringify({ role_id: this.roleId, secret_id: this.secretId }) }); const json = (await response.json()) as { auth?: { client_token?: string } }; const token = json.auth?.client_token; if (!token) { - throw new Error("Vault AppRole login did not return a client_token"); + throw new Error('Vault AppRole login did not return a client_token'); } this.token = token; } private async readSecret(path: string): Promise> { if (!this.token) { - throw new Error("Vault token is missing; call bootstrap() first"); + throw new Error('Vault token is missing; call bootstrap() first'); } const url = `${this.baseUrl}/v1/${this.mountPoint}/data/${path}`; const response = await this.fetch(url, { - method: "GET", - headers: { "X-Vault-Token": this.token } + method: 'GET', + headers: { 'X-Vault-Token': this.token } }); const json = (await response.json()) as { data?: { data?: Record } }; const data = json.data?.data; - if (!data || typeof data !== "object") { + if (!data || typeof data !== 'object') { throw new Error(`Vault secret ${this.mountPoint}/${path} is empty or malformed`); } return data; } + private async readOptionalSecret(path: string): Promise | null> { + try { + return await this.readSecret(path); + } catch (error) { + if (error instanceof Error && error.message.includes('HTTP 404')) { + return null; + } + throw error; + } + } + private async fetch(url: string, init: RequestInit): Promise { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), this.timeoutMs); try { const response = await fetch(url, { ...init, signal: controller.signal }); if (!response.ok) { - const text = await response.text().catch(() => ""); + const text = await response.text().catch(() => ''); throw new Error(`Vault request to ${url} failed with HTTP ${response.status}: ${text}`); } return response; @@ -106,37 +126,62 @@ export class VaultClient { } } + function parseDatabaseSecret(raw: Record): DatabaseSecret { return { - host: requireString(raw, "host", "database"), - port: requirePort(raw, "port", "database"), - name: requireString(raw, "name", "database"), - user: requireString(raw, "user", "database"), - password: requireString(raw, "password", "database") + host: requireString(raw, 'host', 'database'), + port: requirePort(raw, 'port', 'database'), + name: requireString(raw, 'name', 'database'), + user: requireString(raw, 'user', 'database'), + password: requireString(raw, 'password', 'database') }; } + function parseRabbitMqSecret(raw: Record): RabbitMqSecret { return { - host: requireString(raw, "host", "rabbitmq"), - port: requirePort(raw, "port", "rabbitmq"), - vhost: requireString(raw, "vhost", "rabbitmq"), - user: requireString(raw, "user", "rabbitmq"), - password: requireString(raw, "password", "rabbitmq") + host: requireString(raw, 'host', 'rabbitmq'), + port: requirePort(raw, 'port', 'rabbitmq'), + vhost: requireString(raw, 'vhost', 'rabbitmq'), + user: requireString(raw, 'user', 'rabbitmq'), + password: requireString(raw, 'password', 'rabbitmq') }; } + +function parseEthereumSecret(raw: Record): EthereumSecret { + return { + rpcUrl: optionalString(raw, 'rpc_url', 'ethereum'), + hotWalletPrivateKey: optionalString(raw, 'hot_wallet_private_key', 'ethereum'), + usdtContractAddress: optionalString(raw, 'usdt_contract_address', 'ethereum') + }; +} + + function requireString(raw: Record, key: string, secretName: string): string { const value = raw[key]; - if (typeof value !== "string" || !value) { + if (typeof value !== 'string' || !value) { throw new Error(`Vault secret '${secretName}' is missing string field '${key}'`); } return value; } + +function optionalString(raw: Record, key: string, secretName: string): string | undefined { + const value = raw[key]; + if (value === undefined || value === null || value === '') { + return undefined; + } + if (typeof value !== 'string') { + throw new Error(`Vault secret '${secretName}' field '${key}' must be a string`); + } + return value; +} + + function requirePort(raw: Record, key: string, secretName: string): number { const value = raw[key]; - const numeric = typeof value === "number" ? value : Number(value); + const numeric = typeof value === 'number' ? value : Number(value); if (!Number.isFinite(numeric) || numeric <= 0 || numeric > 65535) { throw new Error(`Vault secret '${secretName}' has invalid port '${String(value)}'`); } diff --git a/src/services/TransferOrchestrator.ts b/src/services/TransferOrchestrator.ts index d395e5e..a680bca 100644 --- a/src/services/TransferOrchestrator.ts +++ b/src/services/TransferOrchestrator.ts @@ -1,11 +1,11 @@ -import { isAddress } from "ethers"; -import type { Logger } from "pino"; -import { ulid } from "ulid"; -import { parseUsdtAmount } from "../domain/amount.js"; -import type { PostgresClient } from "../db/PostgresClient.js"; -import type { EthereumGateway } from "../ethereum/EthereumGateway.js"; -import type { AmqpClient } from "../queue/AmqpClient.js"; -import type { CryptoTransferRequest } from "../queue/messageSchema.js"; +import {isAddress} from 'ethers'; +import type {Logger} from 'pino'; +import {ulid} from 'ulid'; +import {parseUsdtAmount} from '../domain/amount.js'; +import type {PostgresClient} from '../db/PostgresClient.js'; +import type {EthereumGateway} from '../ethereum/EthereumGateway.js'; +import type {AmqpClient} from '../queue/AmqpClient.js'; +import type {CryptoTransferRequest} from '../queue/messageSchema.js'; export interface TransferOrchestratorOptions { maxTransferAmountUnits: bigint; @@ -30,26 +30,27 @@ export class TransferOrchestrator { async handle(message: CryptoTransferRequest, log: Logger): Promise { const existing = await this.db.getPaymentByOrderId(message.order_id); if (!existing) { - log.error({ event: "payment.missing" }, "no payment row for order_id, ack"); + log.error({ event: 'payment.missing' }, 'no payment row for order_id, ack'); return; } - if (existing.status === "usdt_delivered") { - log.info({ event: "payment.already_delivered" }, "skip — already delivered"); + if (existing.status === 'usdt_delivered') { + log.info({ event: 'payment.already_delivered' }, 'already delivered, publishing completion'); + await this.publishCompleted(message, log); return; } - if (existing.status === "web3_hash_error" || existing.status === "web3_balance_problem") { + if (existing.status === 'web3_hash_error' || existing.status === 'web3_balance_problem') { log.warn( - { event: "payment.terminal_state", status: existing.status }, - "skip — payment already in terminal failure state" + { event: 'payment.terminal_state', status: existing.status }, + 'skip - payment already in terminal failure state' ); return; } const usdtAmountString = await this.db.getOrderUsdtAmount(message.order_id); if (!usdtAmountString) { - log.error({ event: "order.not_found" }, "order missing for order_id"); + log.error({ event: 'order.not_found' }, 'order missing for order_id'); await this.db.markPaymentHashError(message.order_id); return; } @@ -59,8 +60,8 @@ export class TransferOrchestrator { amountUnits = parseUsdtAmount(usdtAmountString); } catch (error) { log.error( - { event: "order.invalid_amount", err: error, usdt_amount: usdtAmountString }, - "could not parse usdt_amount" + { event: 'order.invalid_amount', err: error, usdt_amount: usdtAmountString }, + 'could not parse usdt_amount' ); await this.db.markPaymentHashError(message.order_id); return; @@ -68,8 +69,8 @@ export class TransferOrchestrator { if (amountUnits > this.options.maxTransferAmountUnits) { log.error( - { event: "transfer.amount_exceeds_limit", amount_units: amountUnits.toString() }, - "usdt_amount exceeds MAX_TRANSFER_USDT" + { event: 'transfer.amount_exceeds_limit', amount_units: amountUnits.toString() }, + 'usdt_amount exceeds MAX_TRANSFER_USDT' ); await this.db.markPaymentHashError(message.order_id); return; @@ -77,7 +78,7 @@ export class TransferOrchestrator { const recipient = await this.db.getUserEthWalletAddress(message.user_id); if (!recipient || !isAddress(recipient)) { - log.error({ event: "wallet.not_found_or_invalid", recipient }, "no valid ETH wallet for user"); + log.error({ event: 'wallet.not_found_or_invalid', recipient }, 'no valid ETH wallet for user'); await this.db.markPaymentHashError(message.order_id); return; } @@ -85,7 +86,7 @@ export class TransferOrchestrator { try { await this.assertHotWalletReady(amountUnits); } catch (error) { - log.error({ event: "hot_wallet.not_ready", err: error }, "hot wallet pre-check failed"); + log.error({ event: 'hot_wallet.not_ready', err: error }, 'hot wallet pre-check failed'); await this.db.markPaymentHashError(message.order_id); return; } @@ -94,14 +95,14 @@ export class TransferOrchestrator { try { preBalance = await this.ethereum.getUsdtBalance(recipient); } catch (error) { - log.error({ event: "rpc.pre_balance_failed", err: error }, "failed to read recipient pre-balance"); + log.error({ event: 'rpc.pre_balance_failed', err: error }, 'failed to read recipient pre-balance'); await this.db.markPaymentHashError(message.order_id); return; } log.info( - { event: "transfer.pre_balance", pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() }, - "captured pre-balance" + { event: 'transfer.pre_balance', pre_balance_units: preBalance.toString(), amount_units: amountUnits.toString() }, + 'captured pre-balance' ); let txHash: string; @@ -109,35 +110,24 @@ export class TransferOrchestrator { const tx = await this.ethereum.sendUsdtTransfer(recipient, amountUnits); txHash = tx.hash; } catch (error) { - log.error({ event: "transfer.broadcast_failed", err: error }, "broadcast failed"); + log.error({ event: 'transfer.broadcast_failed', err: error }, 'broadcast failed'); await this.db.markPaymentHashError(message.order_id); return; } - log.info({ event: "transfer.broadcasted", tx_hash: txHash }, "broadcast OK"); + log.info({ event: 'transfer.broadcasted', tx_hash: txHash }, 'broadcast OK'); await this.db.setPaymentTxHash(message.order_id, txHash); const delivered = await this.pollForBalance(recipient, preBalance, amountUnits, log); if (!delivered) { - log.error({ event: "transfer.balance_problem", tx_hash: txHash }, "balance did not increase after 3 checks"); + log.error({ event: 'transfer.balance_problem', tx_hash: txHash }, 'balance did not increase after 3 checks'); await this.db.markPaymentBalanceProblem(message.order_id, txHash); return; } await this.db.markPaymentDelivered(message.order_id, txHash); - log.info({ event: "transfer.delivered", tx_hash: txHash }, "marked usdt_delivered"); - - const completedMessageId = ulid(); - this.amqp.publishCompleted({ - user_id: message.user_id, - order_id: message.order_id, - trace_id: message.trace_id, - message_id: completedMessageId - }); - log.info( - { event: "transfer.completed.published", message_id: completedMessageId }, - "published crypto.transfer.completed" - ); + log.info({ event: 'transfer.delivered', tx_hash: txHash }, 'marked usdt_delivered'); + await this.publishCompleted(message, log); } private async assertHotWalletReady(amountUnits: bigint): Promise { @@ -148,10 +138,10 @@ export class TransferOrchestrator { ]); if (ethBalanceWei < this.options.minEthBalanceWei) { - throw new Error("Hot wallet ETH balance is below gas threshold"); + throw new Error('Hot wallet ETH balance is below gas threshold'); } if (usdtBalanceUnits < amountUnits) { - throw new Error("Hot wallet USDT balance is insufficient"); + throw new Error('Hot wallet USDT balance is insufficient'); } } @@ -169,8 +159,8 @@ export class TransferOrchestrator { postBalance = await this.ethereum.getUsdtBalance(recipient); } catch (error) { log.warn( - { event: "balance_check.rpc_error", attempt, err: error }, - "balance check RPC error" + { event: 'balance_check.rpc_error', attempt, err: error }, + 'balance check RPC error' ); continue; } @@ -178,13 +168,13 @@ export class TransferOrchestrator { const delta = postBalance - preBalance; log.info( { - event: "balance_check.result", + event: 'balance_check.result', attempt, attempts_total: this.options.balanceCheckAttempts, post_balance_units: postBalance.toString(), delta_units: delta.toString() }, - "balance check" + 'balance check' ); if (delta >= amountUnits) { @@ -194,8 +184,23 @@ export class TransferOrchestrator { return false; } + + private async publishCompleted(message: CryptoTransferRequest, log: Logger): Promise { + const completedMessageId = ulid(); + await this.amqp.publishCompleted({ + user_id: message.user_id, + order_id: message.order_id, + trace_id: message.trace_id, + message_id: completedMessageId + }); + log.info( + { event: 'transfer.completed.published', message_id: completedMessageId }, + 'published crypto.transfer.completed' + ); + } } + function defaultSleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }