from __future__ import annotations from typing import Any import hvac def _vault_token_renew_failed(exception: Exception) -> bool: if isinstance(exception, (hvac.exceptions.Forbidden, hvac.exceptions.Unauthorized)): return True message = getattr(exception, 'message', None) or str(exception) if isinstance(message, str): lower = message.lower() return 'permission denied' in lower or 'invalid token' in lower or '403' in lower return False class VaultClient: def __init__( self, *, addr: str, role_id: str, secret_id: str, namespace: str | None, mount_point: str, ) -> None: self._mount_point = mount_point self._addr = addr self._role_id = role_id self._secret_id = secret_id self._namespace = namespace self._client = hvac.Client(url=addr, namespace=namespace) self._approle_login() def _approle_login(self) -> None: self._client.auth.approle.login(role_id=self._role_id, secret_id=self._secret_id) def _renew_or_login(self) -> None: try: self._client.auth.token.renew_self() except Exception: self._approle_login() def read_secret(self, path: str) -> dict[str, Any]: for attempt in range(2): try: secret = self._client.secrets.kv.v2.read_secret_version( path=path, mount_point=self._mount_point, ) return dict(secret.get('data', {}).get('data', {})) except Exception as exc: if attempt == 0 and _vault_token_renew_failed(exc): self._renew_or_login() continue raise def read_secret_optional(self, path: str) -> dict[str, Any]: if not path: return {} try: return self.read_secret(path) except (hvac.exceptions.InvalidPath, hvac.exceptions.Forbidden, hvac.exceptions.Unauthorized): return {}