This commit is contained in:
ZOMBIIIIIII
2026-05-13 12:07:48 +03:00
parent 3a890b79ee
commit 762a46871b
10 changed files with 375 additions and 155 deletions

View File

@@ -53,6 +53,12 @@ export let env = {
allowCredentials: p.CORS_ALLOW_CREDENTIALS === 'true',
},
port: parseInt(p.API_PORT || '3001'),
redis: {
host: p.REDIS_HOST || 'keydb',
port: parseInt(p.REDIS_PORT || '6379'),
password: p.REDIS_PASSWORD || '',
db: parseInt(p.REDIS_DB || '0'),
},
relayApiKey: p.RELAY_API_KEY || null,
tronApiKey: p.TRON_API_KEY || null,
jupiterApiKey: p.JUPITER_API_KEY || null,

View File

@@ -0,0 +1,86 @@
/**
* KeyDB / Redis singleton client.
*
* Используется для idempotency cache (см. `lib/idempotency.ts`).
*
* Connection:
* REDIS_HOST=keydb (docker service name) / REDIS_PORT=6379 / REDIS_PASSWORD / REDIS_DB=0
*
* Startup contract: `pingRedis()` вызывается из `index.ts` и throws если KeyDB
* unreachable — fail-fast, потому что idempotency critical для money flow.
*/
import Redis, { type RedisOptions } from 'ioredis';
import { logger } from '../lib/logger';
let _client: Redis | null = null;
function buildClient(): Redis {
const host = process.env.REDIS_HOST || 'keydb';
const port = parseInt(process.env.REDIS_PORT || '6379', 10);
const password = process.env.REDIS_PASSWORD || '';
const db = parseInt(process.env.REDIS_DB || '0', 10);
if (!Number.isFinite(port) || port < 1 || port > 65535) {
throw new Error(`Invalid REDIS_PORT ${process.env.REDIS_PORT}`);
}
if (!Number.isFinite(db) || db < 0 || db > 15) {
throw new Error(`Invalid REDIS_DB ${process.env.REDIS_DB} (must be 0-15)`);
}
const opts: RedisOptions = {
host,
port,
db,
lazyConnect: true,
// Не зависать forever — fail-fast если cache недоступен
connectTimeout: 5000,
maxRetriesPerRequest: 3,
// Reconnect strategy: exponential backoff, max 5s
retryStrategy: (times) => Math.min(times * 200, 5000),
};
if (password) opts.password = password;
const client = new Redis(opts);
client.on('error', (err) => {
// Не логируем secret в случае конфигурационной ошибки
logger.error(`Redis client error: ${err.message}`);
});
client.on('connect', () => logger.info(`Redis connected (host=${host}:${port} db=${db})`));
client.on('reconnecting', (delay: number) => logger.warn(`Redis reconnecting in ${delay}ms`));
return client;
}
/** Lazily initialised singleton. */
export function getRedis(): Redis {
if (!_client) {
_client = buildClient();
}
return _client;
}
/**
* Startup ping. Throws on failure → caller process.exit(1).
* Connect-on-demand (lazyConnect=true), .ping() триггерит connect + первый round-trip.
*/
export async function pingRedis(): Promise<void> {
const client = getRedis();
try {
const pong = await client.ping();
if (pong !== 'PONG') {
throw new Error(`Redis PING returned ${pong} (expected PONG)`);
}
} catch (err: any) {
throw new Error(`Redis ping failed: ${err.message}`);
}
}
/** Graceful shutdown — closes connection cleanly. */
export async function closeRedis(): Promise<void> {
if (_client) {
await _client.quit().catch(() => _client?.disconnect());
_client = null;
}
}

View File

@@ -3,6 +3,7 @@ import { env, initEnv } from './config/env';
import { refreshAllKeys, startKeyRotation, stopKeyRotation } from './services/key-rotation.service';
import { isCryptoReady, decryptMnemonic, encryptMnemonic } from './services/crypto.service';
import { db } from './config/database';
import { pingRedis, closeRedis } from './config/redis';
import { logger } from './lib/logger';
// Global error handlers — иначе unhandled errors идут в stderr без sanitization (leak secrets)
@@ -36,6 +37,15 @@ async function main() {
// и любой /send или /sign будет тихо валиться с GCM auth error. Лучше упасть сразу.
await runCryptoIntegritySelfTest();
// KeyDB / Redis ping — idempotency critical для money flow; fail-fast если недоступен.
try {
await pingRedis();
logger.info('KeyDB/Redis self-test: PASSED');
} catch (err: any) {
logger.error(`KeyDB/Redis ping failed: ${err.message}. Refusing to start (idempotency unavailable).`);
process.exit(1);
}
startKeyRotation();
const server = app.listen(env.port, () => {
@@ -45,6 +55,7 @@ async function main() {
const shutdown = (signal: string) => {
logger.info(`${signal} received, shutting down gracefully`);
stopKeyRotation();
void closeRedis();
server.close(() => process.exit(0));
// Force exit if shutdown takes too long
setTimeout(() => process.exit(1), 10_000).unref();

View File

@@ -1,20 +1,24 @@
/**
* Audit log — durable durable durable.
* Audit log — STDOUT ONLY (best-effort).
*
* Two sinks:
* 1. **DB `audit_log` table** — primary, used by `auditLogStrict` для critical
* операций. INSERT pending → mutation → UPDATE success/failure с txid.
* Если INSERT fails — operation must NOT proceed (fail-secure).
* 2. **stdout JSON line** — для log-aggregator (Docker logs / Loki etc).
* Best-effort, всегда (даже если DB sink fails).
* ⚠️ DURABLE AUDIT REMOVED. Per design choice: `audit_log` DB-таблица убрана,
* pre-mutation INSERT pattern → not used. Audit-trail доступен только в Docker
* stdout (`level=audit` JSON lines), который log-aggregator (Loki/CloudWatch/etc.)
* подбирает.
*
* НИКОГДА не логирует mnemonic / privkey / encrypted blob.
* Trade-off: stdout не обеспечивает strict fail-secure семантику. Если Docker
* log driver buffer переполнится или log-aggregator down — записи могут потеряться.
* Если потребуется restore compliance-grade audit — вернуть `audit_log` table
* и pre-mutation INSERT/UPDATE pattern (см. git history).
*
* Public API сохраняет signatures из предыдущей DB-версии для backward compat
* без рефакторинга wallet.controller.ts callers:
* - `auditLog(entry)` — best-effort, returns void
* - `auditLogStrict(entry)` — now == auditLog + returns dummy ID для compat
* - `completeAudit(id, ...)` — теперь stdout-mirror update event
*/
import { ulid } from 'ulidx';
import { db } from '../config/database';
import { getTraceId } from './trace-store';
import { logger } from './logger';
export interface AuditEntry {
event: string;
@@ -25,7 +29,7 @@ export interface AuditEntry {
errorCode?: string;
}
function buildStdoutLine(entry: AuditEntry, status: 'pending' | 'success' | 'failure'): string {
function buildLine(entry: AuditEntry, status: string): string {
return JSON.stringify({
level: 'audit',
status,
@@ -39,54 +43,36 @@ function writeStdoutBestEffort(line: string): void {
try {
process.stdout.write(line);
} catch {
// swallow
// EPIPE / closed — swallow
}
}
/**
* Best-effort: stdout only. Используется для info-level событий
* (wallet.create success, lookup, etc). Не блокирует request на DB.
*/
/** Best-effort: stdout only. */
export async function auditLog(entry: AuditEntry): Promise<void> {
const status: 'success' | 'failure' = entry.result === 'failure' ? 'failure' : 'success';
writeStdoutBestEffort(buildStdoutLine(entry, status));
writeStdoutBestEffort(buildLine(entry, status));
}
/**
* Fail-secure audit для critical custodial операций (mnemonic.reveal, wallet.send,
* wallet.sign_raw_evm).
* Backward-compat shim. Раньше это был pre-mutation DB INSERT (fail-secure).
* Сейчас — просто stdout audit + возвращает opaque ID для совместимости с callers
* которые передают его в `completeAudit()`.
*
* Семантика: INSERT row в `audit_log` table перед mutation. Если INSERT FAILS
* (DB down, connection pool exhausted, constraint violation) — throws.
* Caller ОБЯЗАН abort'нуть mutation, не вернуть response с funds-action.
*
* Возвращает audit row id — caller использует его в `completeAudit()` после mutation.
* Никогда не throws (раньше throw'ил при DB failure → caller отказывал в operation).
* Returns timestamp-based ID; не reliable identifier, чисто для completeAudit pairing.
*/
export async function auditLogStrict(entry: AuditEntry & { status?: 'pending' | 'success' | 'failure' }): Promise<string> {
const id = ulid();
export async function auditLogStrict(entry: AuditEntry & { status?: string }): Promise<string> {
const status = entry.status ?? 'pending';
// DB INSERT — fail-secure (throws on DB failure)
await db('audit_log').insert({
id,
user_id: entry.userId,
event: entry.event,
status,
error_code: entry.errorCode ?? null,
ip: entry.ip ?? null,
trace_id: getTraceId() ?? null,
meta: entry.meta ? JSON.stringify(entry.meta) : null,
});
// Mirror to stdout (best-effort, не критично)
writeStdoutBestEffort(buildStdoutLine(entry, status));
return id;
writeStdoutBestEffort(buildLine(entry, status));
// Opaque ID: timestamp-ms + random suffix. Не store'им — только для symmetry call-site.
return `audit-${Date.now()}-${Math.floor(Math.random() * 1_000_000)}`;
}
/**
* Update audit row после mutation (success или failure с txid/error).
* Best-effort — если update fails, операция уже произошла, мы just log warning.
* Backward-compat: завершающий event audit. Раньше — DB UPDATE row.
* Сейчас — просто stdout write parallel event.
*
* `auditId` параметр игнорируется (его не было где writer'у искать в БД).
*/
export async function completeAudit(
auditId: string,
@@ -94,21 +80,10 @@ export async function completeAudit(
meta?: Record<string, unknown>,
errorCode?: string,
): Promise<void> {
try {
await db('audit_log')
.where({ id: auditId })
.update({
status: result,
error_code: errorCode ?? null,
meta: meta ? JSON.stringify(meta) : db.raw('meta'),
updated_at: db.fn.now(),
});
} catch (err: any) {
logger.error(`completeAudit failed for ${auditId}: ${err?.message}`);
}
// Mirror to stdout
writeStdoutBestEffort(buildStdoutLine(
{ event: `audit.update.${auditId}`, userId: '<see-audit-row>', meta, errorCode, result },
result,
));
writeStdoutBestEffort(
buildLine(
{ event: `audit.complete:${auditId}`, userId: '<see-original-event>', meta, errorCode, result },
result,
),
);
}

View File

@@ -1,28 +1,59 @@
/**
* Idempotency-Key handling — C3 защита от double-spend при retry.
* Idempotency-Key handling — anti-double-spend на retry.
*
* Storage: KeyDB / Redis (см. `config/redis.ts`).
*
* Контракт:
* Client передаёт header `Idempotency-Key: <opaque-string-up-to-128-chars>`.
* Server:
* 1. INSERT row (user_id, key, request_hash) — PK conflict = retry detected.
* 2. На retry: SELECT existing row. Если response_status is null — operation
* ещё in-flight → return 409 "retry too soon". Если response_status set →
* return cached response (same status, same body).
* Retention: 24h. Cleanup via cron.
* 1. `SET NX EX 600 idem:{userId}:{key} '{requestHash,status:null,body:null}'`
* - NX (only-if-not-exists) → atomic claim
* - EX 600 → 10 минут TTL
* 2. Если NX вернул OK → fresh claim, caller proceed'ит mutation.
* 3. Если NX вернул null → retry detected. GET значение и:
* - request_hash отличается → 409 "key reuse with different body"
* - status null → 409 "in-flight, retry after a few seconds"
* - status set → return cached response (no double-broadcast)
*
* После mutation client вызывает `saveIdempotencyResponse(userId, key, status, body)`
* чтобы cache последующих retry'ев на тот же key.
*
* Trade-off vs DB:
* + Latency <1ms (single Redis round-trip vs ~5ms DB)
* + No DB pressure
* + Auto-expiry via Redis EX
* + Distributed (multi-replica work через shared cache)
* KeyDB single point of failure → API падает на startup ping (fail-fast)
*/
import { createHash } from 'crypto';
import { db } from '../config/database';
import { getRedis } from '../config/redis';
const TTL_SECONDS = 10 * 60; // 10 minutes
interface CacheEntry {
requestHash: string;
responseStatus: number | null; // null = in-flight
responseBody: string | null;
}
export interface IdempotencyClaim {
fresh: boolean;
cached?: { status: number; body: string };
}
function cacheKey(userId: string, key: string): string {
return `idem:${userId}:${key}`;
}
/**
* Try to claim the key. If first time → fresh=true, caller proceeds with mutation.
* If duplicate с existing response → fresh=false + cached response.
* If duplicate с pending in-flight → throws (caller returns 409).
* Atomic claim. Returns:
* - fresh=true → caller обязан proceed mutation и save response
* - fresh=false + cached → return cached response без mutation (retry case)
*
* Throws при:
* - in-flight (другой attempt ещё не save'нул response)
* - body hash mismatch (replay с другим body на тот же key)
*/
export async function claimIdempotency(
userId: string,
@@ -33,60 +64,89 @@ export async function claimIdempotency(
.update(JSON.stringify(requestBody ?? {}))
.digest('hex');
try {
await db('idempotency_keys').insert({
user_id: userId,
key,
request_hash: requestHash,
});
const redis = getRedis();
const k = cacheKey(userId, key);
const initial: CacheEntry = {
requestHash,
responseStatus: null,
responseBody: null,
};
// SET key value NX EX seconds — atomic claim. Returns 'OK' if set, null if existed.
const setResult = await redis.set(k, JSON.stringify(initial), 'EX', TTL_SECONDS, 'NX');
if (setResult === 'OK') {
return { fresh: true };
} catch (err: any) {
// PK violation = retry
const existing = await db('idempotency_keys')
.where({ user_id: userId, key })
.first();
if (!existing) throw err;
// Verify request body matches (защита от replay с другим body)
if (existing.request_hash !== requestHash) {
throw new Error(`Idempotency-Key reuse with different request body. Use a new key.`);
}
if (existing.response_status === null || existing.response_status === undefined) {
throw new Error('Operation already in flight; retry after a few seconds.');
}
return {
fresh: false,
cached: {
status: existing.response_status as number,
body: existing.response_body as string,
},
};
}
// Already exists — это retry. Читаем.
const raw = await redis.get(k);
if (!raw) {
// Race: между NX и GET значение expired. Перепопытка как fresh.
const retry = await redis.set(k, JSON.stringify(initial), 'EX', TTL_SECONDS, 'NX');
if (retry === 'OK') return { fresh: true };
throw new Error('Idempotency cache race; retry after a few seconds.');
}
let entry: CacheEntry;
try {
entry = JSON.parse(raw) as CacheEntry;
} catch {
throw new Error('Idempotency cache entry corrupt');
}
if (entry.requestHash !== requestHash) {
throw new Error('Idempotency-Key reuse with different request body. Use a new key.');
}
if (entry.responseStatus === null) {
throw new Error('Operation already in flight; retry after a few seconds.');
}
return {
fresh: false,
cached: {
status: entry.responseStatus,
body: entry.responseBody ?? '',
},
};
}
/** Сохранить response в idempotency row (после mutation succeeds/fails). */
/**
* Сохранить response в cache после mutation (success или failure).
* Best-effort: если Redis недоступен — log error, не throw (mutation уже произошла,
* cache update — UX optimization для retry'ев).
*/
export async function saveIdempotencyResponse(
userId: string,
key: string,
status: number,
body: string,
): Promise<void> {
await db('idempotency_keys')
.where({ user_id: userId, key })
.update({
response_status: status,
response_body: body,
});
try {
const redis = getRedis();
const k = cacheKey(userId, key);
const raw = await redis.get(k);
if (!raw) return; // expired — skip
let entry: CacheEntry;
try {
entry = JSON.parse(raw) as CacheEntry;
} catch {
return;
}
entry.responseStatus = status;
entry.responseBody = body;
// Re-set with refreshed TTL чтобы retry мог получить cached response
await redis.set(k, JSON.stringify(entry), 'EX', TTL_SECONDS);
} catch {
// Cache update — non-critical
}
}
/** Validate header format. Returns null if missing/invalid (caller may make mandatory). */
/** Validate header format. Returns null if missing/invalid. */
export function extractIdempotencyKey(headerValue: unknown): string | null {
if (typeof headerValue !== 'string') return null;
const v = headerValue.trim();
if (!v) return null;
// Restrict charset: alphanum + dash/underscore, max 128
if (!/^[A-Za-z0-9_-]{1,128}$/.test(v)) return null;
return v;
}