fix: delete keydb

This commit is contained in:
2026-05-12 00:03:51 +03:00
parent ce61829ea2
commit 35f09a0fbd
15 changed files with 16 additions and 449 deletions

View File

@@ -18,7 +18,6 @@ dependencies = [
"pydantic-settings==2.12.0",
"python-jose==3.5.0",
"python-ulid==3.1.0",
"redis==7.2.0",
"sqlalchemy==2.0.46",
"uvloop==0.22.1; platform_system != 'Windows'",
]

View File

@@ -1,8 +1,7 @@
from __future__ import annotations
from datetime import datetime,timedelta,timezone
import orjson
from src.application.abstractions import IUnitOfWork
from src.application.contracts import IBeorgService,ICache,ILogger
from src.application.contracts import IBeorgService,ILogger
from src.application.domain.dto import BeorgKycCreateResponse,BeorgKycResultResponse,KycSessionResponse
from src.application.domain.exceptions import ApplicationException
from src.application.services import ensure_adult,extract_personal_data,parse_birth_date
@@ -18,12 +17,10 @@ class PassKycCommand:
*,
unit_of_work: IUnitOfWork,
logger: ILogger,
cache: ICache,
beorg_service: IBeorgService,
) -> None:
self._unit_of_work = unit_of_work
self._logger = logger
self._cache = cache
self._beorg_service = beorg_service
@@ -40,7 +37,6 @@ class PassKycCommand:
expires_at=expires_at,
error=result.error,
)
await self._cache.set(f'kyc:session:{user_id}',result.model_dump_json(),ttl=KYC_SESSION_TTL)
self._logger.info(f'KYC started for user {user_id}')
return result
@@ -52,12 +48,10 @@ class CompleteKycCommand:
*,
unit_of_work: IUnitOfWork,
logger: ILogger,
cache: ICache,
beorg_service: IBeorgService,
) -> None:
self._unit_of_work = unit_of_work
self._logger = logger
self._cache = cache
self._beorg_service = beorg_service
@@ -87,7 +81,7 @@ class CompleteKycCommand:
ensure_adult(birth_date)
async with self._unit_of_work as unit_of_work:
user = await unit_of_work.user_repository.update_kyc_data(
await unit_of_work.user_repository.update_kyc_data(
user_id=user_id,
first_name=personal_data.first_name,
last_name=personal_data.last_name,
@@ -104,17 +98,11 @@ class CompleteKycCommand:
result_data=result.data,
error=None,
)
await self._cache.set_user(user_id,user,ttl=KYC_SESSION_TTL)
await self._cache.delete(f'kyc:session:{user_id}')
self._logger.info(f'KYC completed for user {user_id}')
return result
async def _get_session(self,user_id: str) -> BeorgKycCreateResponse:
raw = await self._cache.get(f'kyc:session:{user_id}')
if raw is not None:
return BeorgKycCreateResponse.model_validate(orjson.loads(raw))
now = _utc_now()
async with self._unit_of_work as unit_of_work:
await unit_of_work.kyc_repository.expire_started_sessions(user_id=user_id,now=now)

View File

@@ -1,6 +1,5 @@
from src.application.contracts.i_logger import ILogger
from src.application.contracts.i_jwt_service import IJwtService
from src.application.contracts.i_csrf_service import ICsrfService
from src.application.contracts.i_cache import ICache
from src.application.contracts.i_hash_service import IHashService
from src.application.contracts.i_beorg_service import IBeorgService

View File

@@ -1,34 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from src.application.domain.entities.user import UserEntity
class ICache(ABC):
@abstractmethod
async def set(self, key: str, value: str, ttl: int) -> bool:
raise NotImplementedError
@abstractmethod
async def set_nx(self, key: str, value: str, ttl: int) -> bool:
raise NotImplementedError
@abstractmethod
async def get(self, key: str) -> str | None:
raise NotImplementedError
@abstractmethod
async def hget(self, key: str, field: str) -> str | None:
raise NotImplementedError
@abstractmethod
async def delete(self, key: str) -> bool:
raise NotImplementedError
@abstractmethod
async def get_user(self, user_id: str) -> dict | None:
raise NotImplementedError
@abstractmethod
async def set_user(self, user_id: str, user: UserEntity, ttl: int = 300) -> None:
raise NotImplementedError

View File

@@ -1,5 +0,0 @@
from src.infrastructure.cache.client import create_redis_client
from src.infrastructure.cache.keydb_client import KeydbCache
from src.infrastructure.cache.remote_cache import RemoteCache
__all__ = ['create_redis_client', 'KeydbCache', 'RemoteCache']

View File

@@ -1,5 +0,0 @@
from redis.asyncio import Redis
def create_redis_client(url: str) -> Redis:
return Redis.from_url(url,decode_responses=True)

View File

@@ -1,55 +0,0 @@
from __future__ import annotations
import orjson
from redis.asyncio.client import Redis
from src.application.contracts import ICache
from src.application.domain.entities.user import UserEntity
class KeydbCache(ICache):
USER_PREFIX = 'user:me'
def __init__(self, redis_client: Redis):
self._r = redis_client
async def set(self, key: str, value: str, ttl: int) -> bool:
return bool(await self._r.set(key, value, ex=ttl))
async def set_nx(self, key: str, value: str, ttl: int) -> bool:
return bool(await self._r.set(key, value, ex=ttl, nx=True))
async def get(self, key: str) -> str | None:
return await self._r.get(key)
async def hget(self, key: str, field: str) -> str | None:
return await self._r.hget(key, field)
async def delete(self, key: str) -> bool:
return (await self._r.delete(key)) > 0
async def get_user(self, user_id: str) -> dict | None:
raw = await self._r.get(f'{self.USER_PREFIX}:{user_id}')
if raw is None:
return None
return orjson.loads(raw)
async def set_user(self, user_id: str, user: UserEntity, ttl: int = 300) -> None:
data = orjson.dumps({
'id': user.id,
'email': user.email,
'first_name': user.first_name,
'middle_name': user.middle_name,
'last_name': user.last_name,
'birth_date': str(user.birth_date) if user.birth_date else None,
'crypto_wallet': user.crypto_wallet,
'phone': user.phone,
'bik': user.bik,
'account_number': user.account_number,
'card_number': user.card_number,
'inn': user.inn,
'kyc_verified': user.kyc_verified,
'is_deleted': user.is_deleted,
'created_at': user.created_at.isoformat() if user.created_at else None,
'updated_at': user.updated_at.isoformat() if user.updated_at else None,
'kyc_verified_at': user.kyc_verified_at.isoformat() if user.kyc_verified_at else None,
})
await self._r.set(f'{self.USER_PREFIX}:{user_id}', data, ex=ttl)

View File

@@ -1,65 +0,0 @@
from __future__ import annotations
import orjson
from redis.asyncio.client import Redis
from src.application.contracts import ICache
from src.application.domain.entities.user import UserEntity
class RemoteCache(ICache):
USER_PREFIX = 'user:me'
def __init__(self,redis_client: Redis) -> None:
self._r = redis_client
async def set(self,key: str,value: str,ttl: int) -> bool:
return bool(await self._r.set(key,value,ex=ttl))
async def set_nx(self,key: str,value: str,ttl: int) -> bool:
return bool(await self._r.set(key,value,ex=ttl,nx=True))
async def get(self,key: str) -> str | None:
return await self._r.get(key)
async def hget(self,key: str,field: str) -> str | None:
return await self._r.hget(key,field)
async def delete(self,key: str) -> bool:
return (await self._r.delete(key)) > 0
async def get_user(self,user_id: str) -> dict | None:
raw = await self._r.get(f'{self.USER_PREFIX}:{user_id}')
if raw is None:
return None
return orjson.loads(raw)
async def set_user(self,user_id: str,user: UserEntity,ttl: int = 300) -> None:
data = orjson.dumps({
'id': user.id,
'email': user.email,
'first_name': user.first_name,
'middle_name': user.middle_name,
'last_name': user.last_name,
'birth_date': str(user.birth_date) if user.birth_date else None,
'crypto_wallet': user.crypto_wallet,
'phone': user.phone,
'bik': user.bik,
'account_number': user.account_number,
'card_number': user.card_number,
'inn': user.inn,
'kyc_verified': user.kyc_verified,
'is_deleted': user.is_deleted,
'created_at': user.created_at.isoformat() if user.created_at else None,
'updated_at': user.updated_at.isoformat() if user.updated_at else None,
'kyc_verified_at': user.kyc_verified_at.isoformat() if user.kyc_verified_at else None,
})
await self._r.set(f'{self.USER_PREFIX}:{user_id}',data,ex=ttl)

View File

@@ -9,10 +9,10 @@ from src.infrastructure.vault import VaultClient
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file='.env',extra='ignore')
_vault_beorg_secrets: dict[str,Any] = PrivateAttr(default_factory=dict)
_vault_database_secrets: dict[str,Any] = PrivateAttr(default_factory=dict)
DOCS_USERNAME: str = 'admin'
DOCS_PASSWORD: str = 'admin'
KEYDB_URL: str = 'redis://localhost:6379/0'
VAULT_ADDR: str = 'https://corp.vault.elcsa.ru'
VAULT_ROLE_ID: str = ''
VAULT_SECRET_ID: str = ''
@@ -30,7 +30,6 @@ class Settings(BaseSettings):
JWT_ALGORITHM: str = 'RS256'
JWT_AUDIENCE: str | None = None
JWT_ISSUER: str | None = None
DATABASE_URL: str = 'postgresql+asyncpg://postgres:postgres@localhost:5432/kyc'
DATABASE_POOL_SIZE: int = 5
DATABASE_MAX_OVERFLOW: int = 10
DATABASE_POOL_TIMEOUT: int = 30
@@ -66,6 +65,11 @@ class Settings(BaseSettings):
])
@property
def DATABASE_URL(self) -> str:
return self._get_vault_secret(self._vault_database_secrets,'DATABASE_URL','database_url')
@property
def BEORG_PROJECT_ID(self) -> str:
return self._get_beorg_secret('project_id','BEORG_PROJECT_ID')
@@ -82,8 +86,12 @@ class Settings(BaseSettings):
def _get_beorg_secret(self,*keys: str) -> str:
return self._get_vault_secret(self._vault_beorg_secrets,*keys)
def _get_vault_secret(self,secrets: dict[str,Any],*keys: str) -> str:
for key in keys:
value = self._vault_beorg_secrets.get(key)
value = secrets.get(key)
if value is not None:
return str(value)
return ''
@@ -101,6 +109,7 @@ class Settings(BaseSettings):
mount_point=self.VAULT_MOUNT_POINT,
)
object.__setattr__(self,'_vault_beorg_secrets',client.read_many(self.VAULT_BEORG_SECRET_PATH))
object.__setattr__(self,'_vault_database_secrets',client.read_many(self.VAULT_DATABASE_SECRET_PATH))
secrets = client.read_many(
self.VAULT_BEORG_SECRET_PATH,
self.VAULT_DATABASE_SECRET_PATH,
@@ -108,7 +117,7 @@ class Settings(BaseSettings):
self.VAULT_DOCS_SECRET_PATH,
)
for field in type(self).model_fields:
if field.startswith('VAULT_') or field == 'KEYDB_URL':
if field.startswith('VAULT_'):
continue
value = secrets.get(field,secrets.get(field.lower()))
if value is None:

View File

@@ -8,7 +8,6 @@ from fastapi.responses import HTMLResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from src.application.domain.enums import LogFormat,LogLevel
from src.application.domain.exceptions import ApplicationException
from src.infrastructure.cache import create_redis_client
from src.infrastructure.config.settings import get_settings
from src.infrastructure.vault import JwtKeyStore, start_jwt_keys_scheduler
from src.infrastructure.utils import generate_instance_id
@@ -42,8 +41,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.set_instance_id(instance_id)
logger.info(f'Users service instance started with id {instance_id}')
app.state.redis = create_redis_client(settings.KEYDB_URL)
jwt_store = JwtKeyStore(
vault_addr=settings.VAULT_ADDR,
vault_role_id=settings.VAULT_ROLE_ID,
@@ -61,7 +58,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
app.state.jwt_key_store = jwt_store
app.state.jwt_keys_scheduler = jwt_scheduler
yield
await app.state.redis.aclose()
logger.info(f'Users service instance ended with id {instance_id}')

View File

@@ -1,45 +0,0 @@
from __future__ import annotations
import functools
from typing import Any,Awaitable,Callable
from fastapi import Request
from fastapi.responses import ORJSONResponse
from src.presentation.dependencies.cache import get_cache_remote
from src.presentation.dependencies.logger import get_logger
def cached(*, prefix: str) -> Callable:
def decorator(func: Callable[..., Awaitable[Any]]):
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
logger = get_logger()
request = kwargs.get('request')
if not isinstance(request, Request):
for a in args:
if isinstance(a, Request):
request = a
break
auth = kwargs.get('auth')
user_id = getattr(auth, 'user_id', None) if auth else None
if request is None or user_id is None:
return await func(*args, **kwargs)
cache_key = f'{prefix}:{user_id}'
try:
cache = get_cache_remote(request)
hit = await cache.get_user(user_id)
if hit is not None:
logger.debug(f'Cache hit key={cache_key}')
return ORJSONResponse(status_code=200, content=hit)
except Exception as e:
logger.warning(f'Cache read failed key={cache_key} error={e}')
return await func(*args, **kwargs)
return wrapper
return decorator

View File

@@ -1,171 +0,0 @@
from __future__ import annotations
import functools
import inspect
import hashlib
from typing import Any, Awaitable, Callable, Literal, Optional, Protocol, runtime_checkable
from fastapi import Request
from redis.asyncio.client import Redis
from src.application.contracts import ILogger
from src.application.domain.exceptions import ApplicationException
from src.infrastructure.logger import get_logger
from src.presentation.dependencies import get_redis
def _find_request(args: tuple[Any, ...], kwargs: dict[str, Any]) -> Request:
req = kwargs.get('request')
if isinstance(req, Request):
return req
for a in args:
if isinstance(a, Request):
return a
raise RuntimeError('rate_limit decorator requires fastapi.Request argument')
def _client_ip(request: Request) -> str:
xff = request.headers.get('x-forwarded-for')
if xff:
return xff.split(',')[0].strip()
if request.client:
return request.client.host
return 'unknown'
_LUA_INCR_EXPIRE_TTL = '''
local key = KEYS[1]
local window = tonumber(ARGV[1])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
local ttl = redis.call('TTL', key)
return { current, ttl }
'''
Scope = Literal['ip', 'device', 'user', 'key']
@runtime_checkable
class KeyBuilder1(Protocol):
def __call__(self, request: Request) -> str: ...
@runtime_checkable
class KeyBuilder3(Protocol):
def __call__(self, request: Request, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str: ...
KeyBuilder = KeyBuilder1 | KeyBuilder3
def _call_key_builder(builder: KeyBuilder, request: Request, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
try:
sig = inspect.signature(builder)
if len(sig.parameters) >= 3:
return builder(request, args, kwargs)
return builder(request)
except Exception as e:
try:
return builder(request, args, kwargs)
except Exception:
raise e
def _email_rl_key(request: Request, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
body = kwargs.get('body')
if body is None and args:
for a in args:
if hasattr(a, 'email'):
body = a
break
email = (getattr(body, 'email', '') or '').strip().lower()
if not email:
email = _client_ip(request)
digest = hashlib.sha256(email.encode('utf-8')).hexdigest()[:24]
return f'email:{digest}'
def rate_limit(
*,
limit: int,
window_seconds: int,
scope: Scope = 'ip',
key_prefix: str = 'rl',
key_builder: Optional[KeyBuilder] = None,
fail_open: bool = True,
) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
if limit <= 0:
raise ValueError('rate_limit: limit must be > 0')
if window_seconds <= 0:
raise ValueError('rate_limit: window_seconds must be > 0')
if scope == 'key' and not key_builder:
raise ValueError('rate_limit: scope="key" requires key_builder')
def decorator(func: Callable[..., Awaitable[Any]]):
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any):
request = _find_request(args, kwargs)
logger: ILogger = get_logger()
if scope == 'ip':
ident = _client_ip(request)
elif scope == 'device':
ident = request.cookies.get('device_id') or _client_ip(request)
elif scope == 'user':
user = getattr(request.state, 'user', None)
user_id = getattr(user, 'id', None) if user else None
ident = str(user_id) if user_id else _client_ip(request)
else:
try:
ident = _call_key_builder(key_builder, request, args, kwargs) # type: ignore[arg-type]
except Exception as e:
logger.error(f'RateLimit key_builder failed error={str(e)}')
raise ApplicationException(500, 'Rate limiter key_builder failed')
route = request.url.path
method = request.method
redis_key = f'{key_prefix}:{scope}:{method}:{route}:{ident}'
logger.debug(f'RateLimit check key={redis_key} limit={limit} window={window_seconds}')
try:
redis: Redis = get_redis(request)
result = await redis.eval(
_LUA_INCR_EXPIRE_TTL,
1,
redis_key,
str(window_seconds),
)
count = int(result[0])
ttl_raw = int(result[1]) if result and len(result) > 1 else window_seconds
ttl = window_seconds if ttl_raw < 0 else ttl_raw
except Exception as e:
logger.error(f'RateLimit redis failure key={redis_key} error={str(e)}')
if fail_open:
logger.warning(f'RateLimit fail-open activated key={redis_key}')
return await func(*args, **kwargs)
raise ApplicationException(503, 'Rate limiter unavailable')
if count > limit:
retry_after = max(ttl, 0)
logger.warning(f'RateLimit exceeded key={redis_key} count={count} limit={limit} retry_after={retry_after}')
raise ApplicationException(
status_code=429,
message='Too Many Requests',
headers={'Retry-After': str(retry_after)},
)
logger.debug(f'RateLimit passed key={redis_key} count={count}')
return await func(*args, **kwargs)
return wrapper
return decorator

View File

@@ -1,28 +0,0 @@
from __future__ import annotations
from fastapi import Depends,Request
from redis.asyncio.client import Redis
from src.application.contracts import ICache
from src.infrastructure.cache import KeydbCache
def get_redis_remote(request: Request) -> Redis:
return request.app.state.redis
def get_redis(request: Request) -> Redis:
return request.app.state.redis
def get_cache_remote(redis_client: Redis = Depends(get_redis_remote)) -> ICache:
return KeydbCache(redis_client)
def get_remote_cache(redis_client: Redis = Depends(get_redis_remote)) -> ICache:
return KeydbCache(redis_client)
def get_cache(redis_client: Redis = Depends(get_redis)) -> ICache:
return KeydbCache(redis_client)

View File

@@ -2,10 +2,9 @@ from __future__ import annotations
from fastapi import Depends
from src.application.abstractions import IUnitOfWork
from src.application.commands import CompleteKycCommand,GetKycSessionCommand,PassKycCommand
from src.application.contracts import IBeorgService,ICache,ILogger
from src.application.contracts import IBeorgService,ILogger
from src.infrastructure.config import settings
from src.infrastructure.beorg import BeorgService
from src.presentation.dependencies.cache import get_cache
from src.presentation.dependencies.logger import get_logger
from src.presentation.dependencies.unit_of_work import get_unit_of_work
@@ -23,13 +22,11 @@ def get_beorg_service() -> IBeorgService:
def get_pass_kyc_command(
logger: ILogger = Depends(get_logger),
unit_of_work: IUnitOfWork = Depends(get_unit_of_work),
cache: ICache = Depends(get_cache),
beorg_service: IBeorgService = Depends(get_beorg_service),
) -> PassKycCommand:
return PassKycCommand(
unit_of_work=unit_of_work,
logger=logger,
cache=cache,
beorg_service=beorg_service,
)
@@ -45,12 +42,10 @@ def get_kyc_session_command(
def get_complete_kyc_command(
logger: ILogger = Depends(get_logger),
unit_of_work: IUnitOfWork = Depends(get_unit_of_work),
cache: ICache = Depends(get_cache),
beorg_service: IBeorgService = Depends(get_beorg_service),
) -> CompleteKycCommand:
return CompleteKycCommand(
unit_of_work=unit_of_work,
logger=logger,
cache=cache,
beorg_service=beorg_service,
)

11
uv.lock generated
View File

@@ -393,7 +393,6 @@ dependencies = [
{ name = "pydantic-settings" },
{ name = "python-jose" },
{ name = "python-ulid" },
{ name = "redis" },
{ name = "sqlalchemy" },
{ name = "uvloop", marker = "sys_platform != 'win32'" },
]
@@ -414,7 +413,6 @@ requires-dist = [
{ name = "pydantic-settings", specifier = "==2.12.0" },
{ name = "python-jose", specifier = "==3.5.0" },
{ name = "python-ulid", specifier = "==3.1.0" },
{ name = "redis", specifier = "==7.2.0" },
{ name = "sqlalchemy", specifier = "==2.0.46" },
{ name = "uvloop", marker = "sys_platform != 'win32'", specifier = "==0.22.1" },
]
@@ -593,15 +591,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6c/a0/4ed6632b70a52de845df056654162acdebaf97c20e3212c559ac43e7216e/python_ulid-3.1.0-py3-none-any.whl", hash = "sha256:e2cdc979c8c877029b4b7a38a6fba3bc4578e4f109a308419ff4d3ccf0a46619", size = 11577, upload-time = "2025-08-18T16:09:25.047Z" },
]
[[package]]
name = "redis"
version = "7.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/9f/32/6fac13a11e73e1bc67a2ae821a72bfe4c2d8c4c48f0267e4a952be0f1bae/redis-7.2.0.tar.gz", hash = "sha256:4dd5bf4bd4ae80510267f14185a15cba2a38666b941aff68cccf0256b51c1f26", size = 4901247, upload-time = "2026-02-16T17:16:22.797Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/86/cf/f6180b67f99688d83e15c84c5beda831d1d341e95872d224f87ccafafe61/redis-7.2.0-py3-none-any.whl", hash = "sha256:01f591f8598e483f1842d429e8ae3a820804566f1c73dca1b80e23af9fba0497", size = 394898, upload-time = "2026-02-16T17:16:20.693Z" },
]
[[package]]
name = "requests"
version = "2.33.1"