153 lines
4.9 KiB
TypeScript
153 lines
4.9 KiB
TypeScript
/**
|
||
* Idempotency-Key handling — anti-double-spend на retry.
|
||
*
|
||
* Storage: KeyDB / Redis (см. `config/redis.ts`).
|
||
*
|
||
* Контракт:
|
||
* Client передаёт header `Idempotency-Key: <opaque-string-up-to-128-chars>`.
|
||
* Server:
|
||
* 1. `SET NX EX 600 idem:{userId}:{key} '{requestHash,status:null,body:null}'`
|
||
* - NX (only-if-not-exists) → atomic claim
|
||
* - EX 600 → 10 минут TTL
|
||
* 2. Если NX вернул OK → fresh claim, caller proceed'ит mutation.
|
||
* 3. Если NX вернул null → retry detected. GET значение и:
|
||
* - request_hash отличается → 409 "key reuse with different body"
|
||
* - status null → 409 "in-flight, retry after a few seconds"
|
||
* - status set → return cached response (no double-broadcast)
|
||
*
|
||
* После mutation client вызывает `saveIdempotencyResponse(userId, key, status, body)`
|
||
* чтобы cache последующих retry'ев на тот же key.
|
||
*
|
||
* Trade-off vs DB:
|
||
* + Latency <1ms (single Redis round-trip vs ~5ms DB)
|
||
* + No DB pressure
|
||
* + Auto-expiry via Redis EX
|
||
* + Distributed (multi-replica work через shared cache)
|
||
* – KeyDB single point of failure → API падает на startup ping (fail-fast)
|
||
*/
|
||
|
||
import { createHash } from 'crypto';
|
||
import { getRedis } from '../config/redis';
|
||
|
||
const TTL_SECONDS = 10 * 60; // 10 minutes
|
||
|
||
interface CacheEntry {
|
||
requestHash: string;
|
||
responseStatus: number | null; // null = in-flight
|
||
responseBody: string | null;
|
||
}
|
||
|
||
export interface IdempotencyClaim {
|
||
fresh: boolean;
|
||
cached?: { status: number; body: string };
|
||
}
|
||
|
||
function cacheKey(userId: string, key: string): string {
|
||
return `idem:${userId}:${key}`;
|
||
}
|
||
|
||
/**
|
||
* Atomic claim. Returns:
|
||
* - fresh=true → caller обязан proceed mutation и save response
|
||
* - fresh=false + cached → return cached response без mutation (retry case)
|
||
*
|
||
* Throws при:
|
||
* - in-flight (другой attempt ещё не save'нул response)
|
||
* - body hash mismatch (replay с другим body на тот же key)
|
||
*/
|
||
export async function claimIdempotency(
|
||
userId: string,
|
||
key: string,
|
||
requestBody: unknown,
|
||
): Promise<IdempotencyClaim> {
|
||
const requestHash = createHash('sha256')
|
||
.update(JSON.stringify(requestBody ?? {}))
|
||
.digest('hex');
|
||
|
||
const redis = getRedis();
|
||
const k = cacheKey(userId, key);
|
||
const initial: CacheEntry = {
|
||
requestHash,
|
||
responseStatus: null,
|
||
responseBody: null,
|
||
};
|
||
|
||
// SET key value NX EX seconds — atomic claim. Returns 'OK' if set, null if existed.
|
||
const setResult = await redis.set(k, JSON.stringify(initial), 'EX', TTL_SECONDS, 'NX');
|
||
if (setResult === 'OK') {
|
||
return { fresh: true };
|
||
}
|
||
|
||
// Already exists — это retry. Читаем.
|
||
const raw = await redis.get(k);
|
||
if (!raw) {
|
||
// Race: между NX и GET значение expired. Перепопытка как fresh.
|
||
const retry = await redis.set(k, JSON.stringify(initial), 'EX', TTL_SECONDS, 'NX');
|
||
if (retry === 'OK') return { fresh: true };
|
||
throw new Error('Idempotency cache race; retry after a few seconds.');
|
||
}
|
||
|
||
let entry: CacheEntry;
|
||
try {
|
||
entry = JSON.parse(raw) as CacheEntry;
|
||
} catch {
|
||
throw new Error('Idempotency cache entry corrupt');
|
||
}
|
||
|
||
if (entry.requestHash !== requestHash) {
|
||
throw new Error('Idempotency-Key reuse with different request body. Use a new key.');
|
||
}
|
||
|
||
if (entry.responseStatus === null) {
|
||
throw new Error('Operation already in flight; retry after a few seconds.');
|
||
}
|
||
|
||
return {
|
||
fresh: false,
|
||
cached: {
|
||
status: entry.responseStatus,
|
||
body: entry.responseBody ?? '',
|
||
},
|
||
};
|
||
}
|
||
|
||
/**
|
||
* Сохранить response в cache после mutation (success или failure).
|
||
* Best-effort: если Redis недоступен — log error, не throw (mutation уже произошла,
|
||
* cache update — UX optimization для retry'ев).
|
||
*/
|
||
export async function saveIdempotencyResponse(
|
||
userId: string,
|
||
key: string,
|
||
status: number,
|
||
body: string,
|
||
): Promise<void> {
|
||
try {
|
||
const redis = getRedis();
|
||
const k = cacheKey(userId, key);
|
||
const raw = await redis.get(k);
|
||
if (!raw) return; // expired — skip
|
||
let entry: CacheEntry;
|
||
try {
|
||
entry = JSON.parse(raw) as CacheEntry;
|
||
} catch {
|
||
return;
|
||
}
|
||
entry.responseStatus = status;
|
||
entry.responseBody = body;
|
||
// Re-set with refreshed TTL чтобы retry мог получить cached response
|
||
await redis.set(k, JSON.stringify(entry), 'EX', TTL_SECONDS);
|
||
} catch {
|
||
// Cache update — non-critical
|
||
}
|
||
}
|
||
|
||
/** Validate header format. Returns null if missing/invalid. */
|
||
export function extractIdempotencyKey(headerValue: unknown): string | null {
|
||
if (typeof headerValue !== 'string') return null;
|
||
const v = headerValue.trim();
|
||
if (!v) return null;
|
||
if (!/^[A-Za-z0-9_-]{1,128}$/.test(v)) return null;
|
||
return v;
|
||
}
|