This commit is contained in:
2026-06-03 13:52:45 +03:00
commit f7309c4b4a
140 changed files with 7134 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
from src.presentation.middleware.trace_id import TraceIDMiddleware
from src.presentation.middleware.security_headers import SecurityHeadersMiddleware

View File

@@ -0,0 +1,51 @@
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app,
*,
hsts: bool = True,
hsts_max_age: int = 31536000, # 1 год
hsts_include_subdomains: bool = True,
hsts_preload: bool = False,
frame_options: str = 'DENY', # или 'SAMEORIGIN'
referrer_policy: str = 'strict-origin-when-cross-origin',
content_security_policy: str | None = None,
):
super().__init__(app)
self.hsts = hsts
self.hsts_max_age = hsts_max_age
self.hsts_include_subdomains = hsts_include_subdomains
self.hsts_preload = hsts_preload
self.frame_options = frame_options
self.referrer_policy = referrer_policy
self.csp = content_security_policy
async def dispatch(self, request: Request, call_next) -> Response:
response: Response = await call_next(request)
if request.url.path in ('/docs', '/redoc', '/openapi.json'):
return response
if self.hsts and request.url.scheme == 'https':
hsts = f'max-age={self.hsts_max_age}'
if self.hsts_include_subdomains:
hsts += '; includeSubDomains'
if self.hsts_preload:
hsts += '; preload'
response.headers['Strict-Transport-Security'] = hsts
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = self.frame_options
response.headers['Referrer-Policy'] = self.referrer_policy
if self.csp:
response.headers['Content-Security-Policy'] = self.csp
return response

View File

@@ -0,0 +1,69 @@
from __future__ import annotations
from typing import Optional
from contextvars import Token
from starlette.requests import Request
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from ulid import ULID
from src.application.contracts import ILogger
from src.infrastructure.config import settings
from src.infrastructure.context_vars import trace_id_var
class TraceIDMiddleware:
def __init__(
self,
app: ASGIApp,
logger: ILogger,
response_header_name: str = "X-Trace-ID",
attach_response_header: bool = True,
) -> None:
self.app = app
self.logger = logger
self.response_header_name = response_header_name
self.attach_response_header = attach_response_header
def _is_excluded(self, path: str) -> bool:
return any(path == p or path.startswith(p.rstrip("/") + "/") for p in settings.EXCLUDED_PATHS)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope)
if self._is_excluded(request.url.path):
await self.app(scope, receive, send)
return
trace_id = request.headers.get("X-Trace-ID") or request.headers.get("X-Request-ID")
if not trace_id:
trace_id = str(ULID())
request.state.trace_id = trace_id
token: Token = trace_id_var.set(trace_id)
self.logger.debug(f"Request started: {request.method} {request.url} - TraceID: {trace_id}")
status_code_holder: dict[str, Optional[int]] = {"status": None}
async def send_wrapper(message: Message) -> None:
if message["type"] == "http.response.start":
status_code_holder["status"] = int(message["status"])
if self.attach_response_header:
headers = list(message.get("headers", []))
headers.append((self.response_header_name.lower().encode(), trace_id.encode()))
message["headers"] = headers
await send(message)
try:
await self.app(scope, receive, send_wrapper)
finally:
status = status_code_holder["status"]
status_part = f"{status}" if status is not None else "unknown"
self.logger.debug(
f"Request finished: {request.method} {request.url} - TraceID: {trace_id} - Status: {status_part}"
)
trace_id_var.reset(token)