4.8 KiB
USDT Transfer Worker
Production worker that consumes crypto.transfer.requested from RabbitMQ, sends an exact-amount ERC20 USDT transfer on Ethereum mainnet from a hot wallet, verifies the recipient's balance increased, and publishes crypto.transfer.completed on success. Status of every order is written into the payments table in Postgres.
Architecture
RabbitMQ (crypto.transfer.requested)
│
▼
AmqpConsumer ──▶ TransferOrchestrator
│
├─ Postgres (orders, wallets, payments)
├─ Ethereum RPC (USDT balanceOf, transfer)
└─ AmqpPublisher (crypto.transfer.completed)
Postgres credentials and RabbitMQ credentials are pulled from HashiCorp Vault (KV v2, AppRole auth). The hot-wallet private key, ETH RPC URL and USDT contract can also be pulled from Vault via the optional ethereum secret; .env remains as a fallback for those fields and runtime limits.
Message contract
Inbound — queue crypto.transfer.requested:
{
"order_id": "01K...",
"user_id": "01K...",
"trace_id": "01K...",
"message_id": "01K..."
}
All four IDs are ULIDs (26 chars, Crockford base32).
Outbound — queue crypto.transfer.completed (only on success):
{
"user_id": "01K...",
"order_id": "01K...",
"trace_id": "01K...",
"message_id": "01K..."
}
message_id in the outbound payload is freshly generated; the other three are echoed from the request.
Processing flow
- Read
paymentsrow byorder_id. Ifusdt_deliveredalready — skip. If terminal failure (web3_hash_error/web3_balance_problem) — also skip. SELECT usdt_amount FROM orders WHERE id = $order_id.SELECT address FROM wallets WHERE user_id = $user_id AND chain = 'ETH' LIMIT 1.- Pre-checks: hot-wallet ETH and USDT balances,
MAX_TRANSFER_USDTcap. - Capture pre-balance of the recipient (
usdt.balanceOf). - Broadcast
usdt.transfer(recipient, amount). If RPC fails —payments.status = web3_hash_errorand stop. - Persist tx hash into
payments.web3_transaction_hashimmediately. - Balance loop: 3 attempts × 60 s. On the first attempt where
post − pre ≥ amount, markpayments.status = usdt_delivered, setpaid_at, publish tocrypto.transfer.completed. - If all 3 attempts fail to observe the increase:
payments.status = web3_balance_problem(the hash is still kept).
Failure paths only update Postgres — the spec asks for queue publishing on success only.
Environment
Required .env (see .env.example):
PORT=3000
ETH_RPC_URL=https://ethereum-rpc.publicnode.com
HOT_WALLET_PRIVATE_KEY=
USDT_CONTRACT_ADDRESS=0xdAC17F958D2ee523a2206206994597C13D831ec7
VAULT_MOUNT_POINT=dev-secrets
VAULT_ADDR=https://corp.vault.elcsa.ru
VAULT_ROLE_ID=...
VAULT_SECRET_ID=...
LOG_LEVEL=INFO
REQUIRED_CONFIRMATIONS=12
MISMATCH_TIMEOUT_MS=300000
POLL_INTERVAL_MS=15000
MAX_TRANSFER_USDT=1000
MIN_ETH_BALANCE_WEI=1
WEBHOOK_TIMEOUT_MS=10000
REQUIRED_CONFIRMATIONS, MISMATCH_TIMEOUT_MS, POLL_INTERVAL_MS, WEBHOOK_TIMEOUT_MS are kept in the .env for compatibility with infra tooling but are not consumed by the worker — the verification cadence is fixed at 3 attempts × 60 s.
Vault layout (dev-secrets/ mount, KV v2)
dev-secrets/database:
| key | value |
|---|---|
| host | postgres host |
| port | postgres port |
| name | database name |
| user | username |
| password | password |
dev-secrets/rabbitmq:
| key | value |
|---|---|
| host | rabbit host |
| port | amqp port |
| vhost | virtual host |
| user | username |
| password | password |
Optional dev-secrets/ethereum:
| key | value |
|---|---|
| rpc_url | Ethereum RPC URL |
| hot_wallet_private_key | company wallet private key |
| usdt_contract_address | USDT contract address |
HTTP
A single endpoint is exposed for liveness/readiness probes:
GET /health— RPC block number, hot-wallet ETH/USDT balances, Postgres ping and RabbitMQ connection state. Returns 503 if any dependency is degraded.
Local development
npm install
cp .env.example .env # fill VAULT_ROLE_ID / VAULT_SECRET_ID, etc.
npm run build
npm start
With Docker:
docker compose up --build
docker-compose.yml brings up Postgres and RabbitMQ alongside the app. Vault stays external — point VAULT_ADDR at your real instance.
Tests
npm test
Covers amount parsing and gas error humanization. Integration testing (Postgres + RabbitMQ + RPC) is performed in the deploy environment.