67 lines
2.3 KiB
Python
67 lines
2.3 KiB
Python
from __future__ import annotations
|
|
import secrets
|
|
from typing import Any, Optional, Mapping
|
|
from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature
|
|
from src.application.contracts import ICsrfService
|
|
from src.application.domain.exceptions import ForbiddenException
|
|
from src.infrastructure.config.settings import settings
|
|
|
|
|
|
class CsrfService(ICsrfService):
|
|
COOKIE_NAME = 'csrf_token'
|
|
HEADER_NAME = 'X-CSRF-Token'
|
|
SALT = 'csrf'
|
|
TTL_SECONDS = 3600
|
|
|
|
def __init__(self) -> None:
|
|
self._serializer = URLSafeTimedSerializer(
|
|
secret_key=settings.CSRF_SECRET_KEY,
|
|
salt=self.SALT,
|
|
)
|
|
|
|
@property
|
|
def cookie_name(self) -> str:
|
|
return self.COOKIE_NAME
|
|
|
|
@property
|
|
def header_name(self) -> str:
|
|
return self.HEADER_NAME
|
|
|
|
@property
|
|
def ttl_seconds(self) -> int:
|
|
return self.TTL_SECONDS
|
|
|
|
def issue(self, subject: Optional[str] = None) -> str:
|
|
payload = {
|
|
'sub': subject,
|
|
'nonce': secrets.token_urlsafe(32),
|
|
}
|
|
return self._serializer.dumps(payload)
|
|
|
|
def verify(self, token: str, expected_subject: Optional[str] = None) -> dict[str, Any]:
|
|
try:
|
|
data = self._serializer.loads(token, max_age=self.TTL_SECONDS)
|
|
except SignatureExpired:
|
|
raise ForbiddenException(message='CSRF token expired')
|
|
except BadSignature:
|
|
raise ForbiddenException(message='CSRF token invalid')
|
|
|
|
if expected_subject is not None and data.get('sub') != expected_subject:
|
|
raise ForbiddenException(message='CSRF token subject mismatch')
|
|
|
|
return data
|
|
|
|
def extract(self, cookies: Mapping[str, str], headers: Mapping[str, str]) -> tuple[Optional[str], Optional[str]]:
|
|
cookie_token = cookies.get(self.COOKIE_NAME)
|
|
header_token = headers.get(self.HEADER_NAME)
|
|
return cookie_token, header_token
|
|
|
|
def verify_pair(self, cookie_token: Optional[str], header_token: Optional[str], expected_subject: Optional[str] = None) -> None:
|
|
if not cookie_token or not header_token:
|
|
raise ForbiddenException(message='CSRF token missing')
|
|
|
|
if not secrets.compare_digest(cookie_token, header_token):
|
|
raise ForbiddenException(message='CSRF token mismatch')
|
|
|
|
self.verify(cookie_token, expected_subject=expected_subject)
|