Skip to content

Secrets

Capability-token gated secret resolution.

secrets

Secret backend integrations.

AWSSecretBackend

Resolve secrets from AWS Secrets Manager.

Source code in safeai/secrets/aws.py
class AWSSecretBackend:
    """Resolve secrets from AWS Secrets Manager."""

    def __init__(
        self,
        *,
        client: Any | None = None,
        region_name: str | None = None,
    ) -> None:
        self._client = client or self._build_client(region_name=region_name)

    def get_secret(self, key: str) -> str:
        secret_id, field = _parse_key(key)
        try:
            response = self._client.get_secret_value(SecretId=secret_id)
        except Exception as exc:  # pragma: no cover - exercised through tests with fake clients
            raise KeyError(f"Secret not found: {secret_id}") from exc

        if response.get("SecretString") is not None:
            value = str(response["SecretString"])
            if not field:
                return value
            try:
                parsed = json.loads(value)
            except json.JSONDecodeError as exc:
                raise KeyError(
                    f"Secret '{secret_id}' is not JSON and cannot resolve field '{field}'"
                ) from exc
            if not isinstance(parsed, dict) or field not in parsed:
                raise KeyError(f"Secret '{secret_id}' has no field '{field}'")
            return str(parsed[field])

        if response.get("SecretBinary") is not None:
            raw = response["SecretBinary"]
            if field:
                raise KeyError(f"Binary secret '{secret_id}' cannot resolve field '{field}'")
            decoded = raw if isinstance(raw, (bytes, bytearray)) else base64.b64decode(raw)
            return decoded.decode("utf-8")

        raise KeyError(f"Secret '{secret_id}' returned no data")

    @staticmethod
    def _build_client(*, region_name: str | None) -> Any:
        try:
            import boto3  # type: ignore
        except Exception as exc:  # pragma: no cover - depends on environment extras
            raise RuntimeError("AWS backend requires optional dependency 'boto3'") from exc

        resolved_region = region_name or os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION")
        if not resolved_region:
            raise ValueError(
                "AWS region is required (pass region_name=... or set AWS_REGION/AWS_DEFAULT_REGION)"
            )
        return boto3.client("secretsmanager", region_name=resolved_region)

SecretAccessDeniedError

Bases: SecretError

Raised when capability scope does not allow secret resolution.

Source code in safeai/secrets/base.py
class SecretAccessDeniedError(SecretError):
    """Raised when capability scope does not allow secret resolution."""

SecretBackend

Bases: Protocol

Source code in safeai/secrets/base.py
@runtime_checkable
class SecretBackend(Protocol):
    def get_secret(self, key: str) -> str:
        """Return a secret value for the key or raise KeyError."""

get_secret

get_secret(key: str) -> str

Return a secret value for the key or raise KeyError.

Source code in safeai/secrets/base.py
def get_secret(self, key: str) -> str:
    """Return a secret value for the key or raise KeyError."""

SecretBackendNotFoundError

Bases: SecretError

Raised when a configured secret backend is not registered.

Source code in safeai/secrets/base.py
class SecretBackendNotFoundError(SecretError):
    """Raised when a configured secret backend is not registered."""

SecretError

Bases: RuntimeError

Base class for all secret-resolution errors.

Source code in safeai/secrets/base.py
class SecretError(RuntimeError):
    """Base class for all secret-resolution errors."""

SecretNotFoundError

Bases: SecretError

Raised when a backend cannot find a requested secret key.

Source code in safeai/secrets/base.py
class SecretNotFoundError(SecretError):
    """Raised when a backend cannot find a requested secret key."""

CapabilityTokenManager

In-memory capability token issuer and validator.

Source code in safeai/secrets/capability.py
class CapabilityTokenManager:
    """In-memory capability token issuer and validator."""

    def __init__(self, *, clock: Clock | None = None) -> None:
        self._clock = clock or (lambda: datetime.now(timezone.utc))
        self._tokens: dict[str, CapabilityTokenModel] = {}

    def issue(
        self,
        *,
        agent_id: str,
        tool_name: str,
        actions: list[str],
        ttl: str = "10m",
        secret_keys: list[str] | None = None,
        session_id: str | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> CapabilityTokenModel:
        issued_at = self._clock()
        expires_at = issued_at + _parse_duration(ttl)
        token = CapabilityTokenModel(
            token_id=f"cap_{uuid4().hex[:24]}",
            agent_id=agent_id,
            issued_at=issued_at,
            expires_at=expires_at,
            session_id=session_id,
            scope=CapabilityScopeModel(
                tool_name=tool_name,
                actions=actions,
                secret_keys=secret_keys or [],
            ),
            metadata=dict(metadata or {}),
        )
        self._tokens[token.token_id] = token
        return token

    def get(self, token_id: str) -> CapabilityTokenModel | None:
        token = self._tokens.get(str(token_id).strip())
        if token is None:
            return None
        if token.revoked_at is not None:
            return None
        if _is_expired(token, now=self._clock()):
            return None
        return token

    def validate(
        self,
        token_id: str,
        *,
        agent_id: str,
        tool_name: str,
        action: str = "invoke",
        session_id: str | None = None,
    ) -> CapabilityValidationResult:
        token = self._tokens.get(str(token_id).strip())
        if token is None:
            return CapabilityValidationResult(
                allowed=False,
                reason=f"capability token '{token_id}' not found",
                token=None,
            )
        if token.revoked_at is not None:
            return CapabilityValidationResult(
                allowed=False,
                reason=f"capability token '{token_id}' is revoked",
                token=token,
            )
        if _is_expired(token, now=self._clock()):
            return CapabilityValidationResult(
                allowed=False,
                reason=f"capability token '{token_id}' is expired",
                token=token,
            )
        if token.agent_id != str(agent_id).strip():
            return CapabilityValidationResult(
                allowed=False,
                reason="capability token agent binding mismatch",
                token=token,
            )
        if token.scope.tool_name != str(tool_name).strip():
            return CapabilityValidationResult(
                allowed=False,
                reason="capability token tool binding mismatch",
                token=token,
            )
        normalized_action = str(action).strip().lower()
        if normalized_action not in token.scope.actions:
            return CapabilityValidationResult(
                allowed=False,
                reason=f"capability token does not allow action '{normalized_action}'",
                token=token,
            )
        requested_session = str(session_id).strip() if session_id else None
        if token.session_id and token.session_id != requested_session:
            return CapabilityValidationResult(
                allowed=False,
                reason="capability token session binding mismatch",
                token=token,
            )
        return CapabilityValidationResult(
            allowed=True,
            reason="capability token valid",
            token=token,
        )

    def revoke(self, token_id: str) -> bool:
        token = self._tokens.get(str(token_id).strip())
        if token is None:
            return False
        if token.revoked_at is not None:
            return False
        updated = token.model_copy(update={"revoked_at": self._clock()})
        self._tokens[updated.token_id] = CapabilityTokenModel.model_validate(updated.model_dump())
        return True

    def purge_expired(self) -> int:
        now = self._clock()
        purged = 0
        for token_id in list(self._tokens.keys()):
            token = self._tokens[token_id]
            if token.revoked_at is not None or _is_expired(token, now=now):
                self._tokens.pop(token_id, None)
                purged += 1
        return purged

    def list_active(
        self,
        *,
        agent_id: str | None = None,
        tool_name: str | None = None,
    ) -> list[CapabilityTokenModel]:
        now = self._clock()
        rows: list[CapabilityTokenModel] = []
        for token in self._tokens.values():
            if token.revoked_at is not None or _is_expired(token, now=now):
                continue
            if agent_id and token.agent_id != str(agent_id).strip():
                continue
            if tool_name and token.scope.tool_name != str(tool_name).strip():
                continue
            rows.append(token)
        rows.sort(key=lambda item: item.issued_at, reverse=True)
        return rows

ResolvedSecret dataclass

Secret payload and metadata for controlled tool injection.

Source code in safeai/secrets/manager.py
@dataclass(frozen=True, repr=False)
class ResolvedSecret:
    """Secret payload and metadata for controlled tool injection."""

    key: str
    value: str
    backend: str
    token_id: str
    agent_id: str
    tool_name: str
    action: str
    session_id: str | None

    def __repr__(self) -> str:
        return (
            "ResolvedSecret("
            f"key={self.key!r}, "
            f"backend={self.backend!r}, "
            f"token_id={self.token_id!r}, "
            f"agent_id={self.agent_id!r}, "
            f"tool_name={self.tool_name!r}, "
            f"action={self.action!r}, "
            f"session_id={self.session_id!r}, "
            "value='***'"
            ")"
        )

SecretManager

Resolves scoped secrets from registered backends.

Source code in safeai/secrets/manager.py
class SecretManager:
    """Resolves scoped secrets from registered backends."""

    def __init__(
        self,
        *,
        capability_manager: CapabilityTokenManager | None = None,
        backends: Mapping[str, SecretBackend] | None = None,
    ) -> None:
        self._capabilities = capability_manager or CapabilityTokenManager()
        self._backends: dict[str, SecretBackend] = {"env": EnvSecretBackend()}
        if backends:
            for name, backend in backends.items():
                self.register_backend(name, backend, replace=True)

    def register_backend(self, name: str, backend: SecretBackend, *, replace: bool = False) -> None:
        normalized = _normalize_backend_name(name)
        if normalized in self._backends and not replace:
            raise ValueError(f"secret backend '{normalized}' is already registered")
        self._backends[normalized] = backend

    def list_backends(self) -> list[str]:
        return sorted(self._backends.keys())

    def has_backend(self, name: str) -> bool:
        return _normalize_backend_name(name) in self._backends

    def resolve_secret(
        self,
        *,
        token_id: str,
        secret_key: str,
        agent_id: str,
        tool_name: str,
        action: str = "invoke",
        session_id: str | None = None,
        backend: str = "env",
    ) -> ResolvedSecret:
        normalized_key = _normalize_secret_key(secret_key)
        resolved_backend = self._get_backend(backend)
        validated = self._capabilities.validate(
            token_id,
            agent_id=agent_id,
            tool_name=tool_name,
            action=action,
            session_id=session_id,
        )
        if not validated.allowed:
            raise SecretAccessDeniedError(validated.reason)
        token = validated.token
        if token is None:
            raise SecretAccessDeniedError("capability token is unavailable for secret resolution")

        allowed_keys = set(token.scope.secret_keys)
        if not allowed_keys:
            raise SecretAccessDeniedError("capability token does not grant secret-key access")
        if normalized_key not in allowed_keys:
            raise SecretAccessDeniedError(
                f"capability token does not allow secret key '{normalized_key}'"
            )

        try:
            value = resolved_backend.get_secret(normalized_key)
        except KeyError as exc:
            raise SecretNotFoundError(f"secret '{normalized_key}' not found in backend '{backend}'") from exc

        return ResolvedSecret(
            key=normalized_key,
            value=str(value),
            backend=_normalize_backend_name(backend),
            token_id=token.token_id,
            agent_id=token.agent_id,
            tool_name=token.scope.tool_name,
            action=str(action).strip().lower(),
            session_id=token.session_id,
        )

    def resolve_secrets(
        self,
        *,
        token_id: str,
        secret_keys: list[str],
        agent_id: str,
        tool_name: str,
        action: str = "invoke",
        session_id: str | None = None,
        backend: str = "env",
    ) -> dict[str, ResolvedSecret]:
        rows: dict[str, ResolvedSecret] = {}
        for key in secret_keys:
            resolved = self.resolve_secret(
                token_id=token_id,
                secret_key=key,
                agent_id=agent_id,
                tool_name=tool_name,
                action=action,
                session_id=session_id,
                backend=backend,
            )
            rows[resolved.key] = resolved
        return rows

    def _get_backend(self, name: str) -> SecretBackend:
        normalized = _normalize_backend_name(name)
        backend = self._backends.get(normalized)
        if backend is None:
            raise SecretBackendNotFoundError(f"secret backend '{normalized}' is not registered")
        return backend

VaultSecretBackend

Resolve secrets from HashiCorp Vault KV mounts.

Source code in safeai/secrets/vault.py
class VaultSecretBackend:
    """Resolve secrets from HashiCorp Vault KV mounts."""

    def __init__(
        self,
        *,
        client: Any | None = None,
        url: str | None = None,
        token: str | None = None,
        namespace: str | None = None,
        verify: bool | str = True,
        timeout: int = 5,
        mount_point: str = "secret",
        kv_version: int = 2,
    ) -> None:
        if kv_version not in {1, 2}:
            raise ValueError("kv_version must be 1 or 2")
        self.mount_point = str(mount_point).strip() or "secret"
        self.kv_version = kv_version
        self._client = client or self._build_client(
            url=url,
            token=token,
            namespace=namespace,
            verify=verify,
            timeout=timeout,
        )
        self._assert_authenticated()

    def get_secret(self, key: str) -> str:
        path, field = _parse_key(key)
        payload = self._read_payload(path)
        if field not in payload:
            raise KeyError(f"Secret not found: {path}#{field}")
        return str(payload[field])

    def _read_payload(self, path: str) -> dict[str, Any]:
        try:
            if self.kv_version == 2:
                response = self._client.secrets.kv.v2.read_secret_version(
                    path=path,
                    mount_point=self.mount_point,
                )
                data = response.get("data", {})
                payload = data.get("data", {})
            else:
                response = self._client.secrets.kv.v1.read_secret(
                    path=path,
                    mount_point=self.mount_point,
                )
                payload = response.get("data", {})
        except Exception as exc:  # pragma: no cover - exercised through tests with fake clients
            raise KeyError(f"Secret not found: {path}") from exc

        if not isinstance(payload, dict):
            raise KeyError(f"Secret payload is invalid for path '{path}'")
        return payload

    def _assert_authenticated(self) -> None:
        checker = getattr(self._client, "is_authenticated", None)
        if checker is None:
            return
        try:
            authenticated = bool(checker())
        except Exception as exc:  # pragma: no cover - defensive path
            raise RuntimeError("Vault authentication check failed") from exc
        if not authenticated:
            raise RuntimeError("Vault authentication failed")

    @staticmethod
    def _build_client(
        *,
        url: str | None,
        token: str | None,
        namespace: str | None,
        verify: bool | str,
        timeout: int,
    ) -> Any:
        try:
            import hvac  # type: ignore
        except Exception as exc:  # pragma: no cover - depends on environment extras
            raise RuntimeError("Vault backend requires optional dependency 'hvac'") from exc

        resolved_url = url or os.getenv("VAULT_ADDR")
        resolved_token = token or os.getenv("VAULT_TOKEN")
        if not resolved_url:
            raise ValueError("Vault URL is required (pass url=... or set VAULT_ADDR)")
        if not resolved_token:
            raise ValueError("Vault token is required (pass token=... or set VAULT_TOKEN)")
        return hvac.Client(
            url=resolved_url,
            token=resolved_token,
            namespace=namespace,
            verify=verify,
            timeout=timeout,
        )