43 lines
1.7 KiB
Python
43 lines
1.7 KiB
Python
from fastapi import Depends, Request
|
|
from fastapi.security.utils import get_authorization_scheme_param
|
|
from src.application.contracts import IJwtService
|
|
from src.application.domain.exceptions import InvalidTokenException,NotAuthenticatedException
|
|
from src.application.domain.dto import AccessTokenPayload, AuthContext
|
|
from src.presentation.dependencies import get_jwt_service
|
|
from src.presentation.dependencies.logger import get_logger
|
|
|
|
|
|
def _extract_access_token(request: Request) -> str | None:
|
|
token = request.cookies.get('access_token')
|
|
|
|
if token:
|
|
return token
|
|
|
|
auth = request.headers.get('Authorization')
|
|
if auth:
|
|
scheme, param = get_authorization_scheme_param(auth)
|
|
if scheme.lower() == 'bearer' and param:
|
|
return param
|
|
|
|
return None
|
|
|
|
|
|
async def require_access_token(
|
|
request: Request,
|
|
jwt_service: IJwtService = Depends(get_jwt_service),
|
|
) -> AuthContext:
|
|
logger = get_logger()
|
|
logger.info(f'JWT auth started path={request.url.path}')
|
|
token = _extract_access_token(request)
|
|
if not token:
|
|
logger.warning(f'JWT auth failed path={request.url.path} reason=missing_token')
|
|
raise NotAuthenticatedException()
|
|
|
|
payload: AccessTokenPayload = await jwt_service.decode_access_token(token)
|
|
if payload.type != 'access':
|
|
logger.warning(f'JWT auth failed path={request.url.path} user_id={payload.sub} reason=invalid_token_type type={payload.type}')
|
|
raise InvalidTokenException('Invalid token type')
|
|
|
|
logger.info(f'JWT auth completed path={request.url.path} user_id={payload.sub} sid={payload.sid}')
|
|
return AuthContext(user_id=payload.sub, sid=payload.sid, token=payload)
|