Skip to content

Middleware

Framework adapters for LangChain, CrewAI, AutoGen, Claude ADK, and Google ADK.

middleware

Framework adapter interfaces.

SafeAIAutoGenAdapter

Bases: BaseMiddleware

Adapter that wraps AutoGen tool execution paths.

Source code in safeai/middleware/autogen.py
class SafeAIAutoGenAdapter(BaseMiddleware):
    """Adapter that wraps AutoGen tool execution paths."""

    def wrap_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap a synchronous AutoGen tool callable."""

        @wraps(fn)
        def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])
            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="autogen_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = _invoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="autogen_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

    def wrap_async_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap an asynchronous AutoGen tool callable."""

        @wraps(fn)
        async def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])
            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="autogen_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="autogen_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

wrap_tool

wrap_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap a synchronous AutoGen tool callable.

Source code in safeai/middleware/autogen.py
def wrap_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap a synchronous AutoGen tool callable."""

    @wraps(fn)
    def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="autogen_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = _invoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="autogen_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

wrap_async_tool

wrap_async_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap an asynchronous AutoGen tool callable.

Source code in safeai/middleware/autogen.py
def wrap_async_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap an asynchronous AutoGen tool callable."""

    @wraps(fn)
    async def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="autogen_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="autogen_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

SafeAIClaudeADKAdapter

Bases: BaseMiddleware

Adapter that wraps Claude ADK tool execution paths.

Source code in safeai/middleware/claude_adk.py
class SafeAIClaudeADKAdapter(BaseMiddleware):
    """Adapter that wraps Claude ADK tool execution paths."""

    def wrap_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap a synchronous Claude ADK tool callable."""

        @wraps(fn)
        def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])

            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="claude_adk_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = _invoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="claude_adk_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

    def wrap_async_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap an asynchronous Claude ADK tool callable."""

        @wraps(fn)
        async def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])
            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="claude_adk_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="claude_adk_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

wrap_tool

wrap_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap a synchronous Claude ADK tool callable.

Source code in safeai/middleware/claude_adk.py
def wrap_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap a synchronous Claude ADK tool callable."""

    @wraps(fn)
    def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])

        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="claude_adk_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = _invoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="claude_adk_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

wrap_async_tool

wrap_async_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap an asynchronous Claude ADK tool callable.

Source code in safeai/middleware/claude_adk.py
def wrap_async_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap an asynchronous Claude ADK tool callable."""

    @wraps(fn)
    async def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="claude_adk_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="claude_adk_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

SafeAICrewAIAdapter

Bases: BaseMiddleware

Adapter that wraps CrewAI tool execution paths.

Source code in safeai/middleware/crewai.py
class SafeAICrewAIAdapter(BaseMiddleware):
    """Adapter that wraps CrewAI tool execution paths."""

    def wrap_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap a synchronous CrewAI tool callable."""

        @wraps(fn)
        def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])
            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="crewai_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = _invoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="crewai_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

    def wrap_async_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap an asynchronous CrewAI tool callable."""

        @wraps(fn)
        async def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])
            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="crewai_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="crewai_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

wrap_tool

wrap_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap a synchronous CrewAI tool callable.

Source code in safeai/middleware/crewai.py
def wrap_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap a synchronous CrewAI tool callable."""

    @wraps(fn)
    def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="crewai_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = _invoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="crewai_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

wrap_async_tool

wrap_async_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap an asynchronous CrewAI tool callable.

Source code in safeai/middleware/crewai.py
def wrap_async_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap an asynchronous CrewAI tool callable."""

    @wraps(fn)
    async def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="crewai_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="crewai_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

SafeAIGoogleADKAdapter

Bases: BaseMiddleware

Adapter that wraps Google ADK tool execution paths.

Source code in safeai/middleware/google_adk.py
class SafeAIGoogleADKAdapter(BaseMiddleware):
    """Adapter that wraps Google ADK tool execution paths."""

    def wrap_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap a synchronous Google ADK tool callable."""

        @wraps(fn)
        def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])
            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="google_adk_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = _invoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="google_adk_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

    def wrap_async_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap an asynchronous Google ADK tool callable."""

        @wraps(fn)
        async def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            tags = list(request_data_tags or [])
            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="google_adk_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=_normalize_response(result),
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="google_adk_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

wrap_tool

wrap_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap a synchronous Google ADK tool callable.

Source code in safeai/middleware/google_adk.py
def wrap_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap a synchronous Google ADK tool callable."""

    @wraps(fn)
    def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="google_adk_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = _invoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="google_adk_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

wrap_async_tool

wrap_async_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap an asynchronous Google ADK tool callable.

Source code in safeai/middleware/google_adk.py
def wrap_async_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap an asynchronous Google ADK tool callable."""

    @wraps(fn)
    async def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        tags = list(request_data_tags or [])
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="google_adk_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=_normalize_response(result),
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="google_adk_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

SafeAIBlockedError

Bases: RuntimeError

Raised when SafeAI blocks a LangChain tool invocation.

Source code in safeai/middleware/langchain.py
class SafeAIBlockedError(RuntimeError):
    """Raised when SafeAI blocks a LangChain tool invocation."""

    def __init__(self, *, action: str, policy_name: str | None, reason: str) -> None:
        super().__init__(f"SafeAI blocked tool call ({action}): {reason}")
        self.action = action
        self.policy_name = policy_name
        self.reason = reason

SafeAICallback

Bases: BaseCallbackHandler

LangChain callback helper for explicit request/response interception.

Source code in safeai/middleware/langchain.py
class SafeAICallback(BaseCallbackHandler):
    """LangChain callback helper for explicit request/response interception."""

    def __init__(
        self,
        safeai: Any,
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
    ) -> None:
        self.safeai = safeai
        self.agent_id = agent_id
        self.session_id = session_id
        self.source_agent_id = source_agent_id
        self.destination_agent_id = destination_agent_id

    def intercept_tool_call(
        self,
        *,
        tool_name: str,
        parameters: dict[str, Any],
        response: dict[str, Any],
        data_tags: list[str],
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> dict[str, Any]:
        """Run explicit request + response interception for callback-driven flows."""
        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=parameters,
            data_tags=data_tags,
            agent_id=self.agent_id,
            session_id=self.session_id,
            source_agent_id=self.source_agent_id,
            destination_agent_id=self.destination_agent_id,
            action_type="langchain_callback",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=response,
            agent_id=self.agent_id,
            request_data_tags=data_tags,
            session_id=self.session_id,
            source_agent_id=self.source_agent_id,
            destination_agent_id=self.destination_agent_id,
            action_type="langchain_callback",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return guarded.filtered_response

intercept_tool_call

intercept_tool_call(*, tool_name: str, parameters: dict[str, Any], response: dict[str, Any], data_tags: list[str], capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> dict[str, Any]

Run explicit request + response interception for callback-driven flows.

Source code in safeai/middleware/langchain.py
def intercept_tool_call(
    self,
    *,
    tool_name: str,
    parameters: dict[str, Any],
    response: dict[str, Any],
    data_tags: list[str],
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> dict[str, Any]:
    """Run explicit request + response interception for callback-driven flows."""
    request = self.safeai.intercept_tool_request(
        tool_name=tool_name,
        parameters=parameters,
        data_tags=data_tags,
        agent_id=self.agent_id,
        session_id=self.session_id,
        source_agent_id=self.source_agent_id,
        destination_agent_id=self.destination_agent_id,
        action_type="langchain_callback",
        capability_token_id=capability_token_id,
        capability_action=capability_action,
        approval_request_id=approval_request_id,
    )
    if request.decision.action != "allow":
        raise SafeAIBlockedError(
            action=request.decision.action,
            policy_name=request.decision.policy_name,
            reason=request.decision.reason,
        )

    guarded = self.safeai.intercept_tool_response(
        tool_name=tool_name,
        response=response,
        agent_id=self.agent_id,
        request_data_tags=data_tags,
        session_id=self.session_id,
        source_agent_id=self.source_agent_id,
        destination_agent_id=self.destination_agent_id,
        action_type="langchain_callback",
    )
    if guarded.decision.action in {"block", "require_approval"}:
        raise SafeAIBlockedError(
            action=guarded.decision.action,
            policy_name=guarded.decision.policy_name,
            reason=guarded.decision.reason,
        )
    return guarded.filtered_response

SafeAILangChainAdapter

Bases: BaseMiddleware

Adapter that wraps LangChain tool invocations with SafeAI checks.

Source code in safeai/middleware/langchain.py
class SafeAILangChainAdapter(BaseMiddleware):
    """Adapter that wraps LangChain tool invocations with SafeAI checks."""

    def wrap_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        tag_extractor: TagExtractor | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap a synchronous tool callable.

        Input payload is normalized to key/value form, validated on request,
        then the tool response is filtered before returning to caller.
        """

        @wraps(fn)
        def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            inferred_tags = _infer_tags(payload, safeai=self.safeai, extractor=tag_extractor)
            tags = list(request_data_tags or inferred_tags)

            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="langchain_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = _invoke_with_shape(fn, request.filtered_params, shape)
            response_payload = _normalize_response(result)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=response_payload,
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="langchain_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

    def wrap_async_tool(
        self,
        tool_name: str,
        fn: Callable[..., Any],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        tag_extractor: TagExtractor | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Callable[..., Any]:
        """Wrap an async tool callable."""

        @wraps(fn)
        async def _wrapped(*args: Any, **kwargs: Any) -> Any:
            payload, shape = _normalize_input(args, kwargs)
            inferred_tags = _infer_tags(payload, safeai=self.safeai, extractor=tag_extractor)
            tags = list(request_data_tags or inferred_tags)

            request = self.safeai.intercept_tool_request(
                tool_name=tool_name,
                parameters=payload,
                data_tags=tags,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="langchain_tool",
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
            if request.decision.action != "allow":
                raise SafeAIBlockedError(
                    action=request.decision.action,
                    policy_name=request.decision.policy_name,
                    reason=request.decision.reason,
                )

            result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
            response_payload = _normalize_response(result)
            guarded = self.safeai.intercept_tool_response(
                tool_name=tool_name,
                response=response_payload,
                agent_id=agent_id,
                request_data_tags=tags,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type="langchain_tool",
            )
            if guarded.decision.action in {"block", "require_approval"}:
                raise SafeAIBlockedError(
                    action=guarded.decision.action,
                    policy_name=guarded.decision.policy_name,
                    reason=guarded.decision.reason,
                )
            return _restore_response_shape(result, guarded.filtered_response)

        return _wrapped

    def wrap_langchain_tool(
        self,
        tool: Any,
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        request_data_tags: list[str] | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> Any:
        """Patch `invoke`/`ainvoke` on a LangChain-like tool object in place."""
        tool_name = str(getattr(tool, "name", "") or getattr(tool, "__name__", "tool")).strip() or "tool"
        if hasattr(tool, "invoke") and callable(tool.invoke):
            tool.invoke = self.wrap_tool(
                tool_name=tool_name,
                fn=tool.invoke,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                request_data_tags=request_data_tags,
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
        if hasattr(tool, "ainvoke") and callable(tool.ainvoke):
            tool.ainvoke = self.wrap_async_tool(
                tool_name=tool_name,
                fn=tool.ainvoke,
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                request_data_tags=request_data_tags,
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
        return tool

wrap_tool

wrap_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, tag_extractor: TagExtractor | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap a synchronous tool callable.

Input payload is normalized to key/value form, validated on request, then the tool response is filtered before returning to caller.

Source code in safeai/middleware/langchain.py
def wrap_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    tag_extractor: TagExtractor | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap a synchronous tool callable.

    Input payload is normalized to key/value form, validated on request,
    then the tool response is filtered before returning to caller.
    """

    @wraps(fn)
    def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        inferred_tags = _infer_tags(payload, safeai=self.safeai, extractor=tag_extractor)
        tags = list(request_data_tags or inferred_tags)

        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="langchain_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = _invoke_with_shape(fn, request.filtered_params, shape)
        response_payload = _normalize_response(result)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=response_payload,
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="langchain_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

wrap_async_tool

wrap_async_tool(tool_name: str, fn: Callable[..., Any], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, tag_extractor: TagExtractor | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Callable[..., Any]

Wrap an async tool callable.

Source code in safeai/middleware/langchain.py
def wrap_async_tool(
    self,
    tool_name: str,
    fn: Callable[..., Any],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    tag_extractor: TagExtractor | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Callable[..., Any]:
    """Wrap an async tool callable."""

    @wraps(fn)
    async def _wrapped(*args: Any, **kwargs: Any) -> Any:
        payload, shape = _normalize_input(args, kwargs)
        inferred_tags = _infer_tags(payload, safeai=self.safeai, extractor=tag_extractor)
        tags = list(request_data_tags or inferred_tags)

        request = self.safeai.intercept_tool_request(
            tool_name=tool_name,
            parameters=payload,
            data_tags=tags,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="langchain_tool",
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
        if request.decision.action != "allow":
            raise SafeAIBlockedError(
                action=request.decision.action,
                policy_name=request.decision.policy_name,
                reason=request.decision.reason,
            )

        result = await _ainvoke_with_shape(fn, request.filtered_params, shape)
        response_payload = _normalize_response(result)
        guarded = self.safeai.intercept_tool_response(
            tool_name=tool_name,
            response=response_payload,
            agent_id=agent_id,
            request_data_tags=tags,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type="langchain_tool",
        )
        if guarded.decision.action in {"block", "require_approval"}:
            raise SafeAIBlockedError(
                action=guarded.decision.action,
                policy_name=guarded.decision.policy_name,
                reason=guarded.decision.reason,
            )
        return _restore_response_shape(result, guarded.filtered_response)

    return _wrapped

wrap_langchain_tool

wrap_langchain_tool(tool: Any, *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, request_data_tags: list[str] | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> Any

Patch invoke/ainvoke on a LangChain-like tool object in place.

Source code in safeai/middleware/langchain.py
def wrap_langchain_tool(
    self,
    tool: Any,
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    request_data_tags: list[str] | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> Any:
    """Patch `invoke`/`ainvoke` on a LangChain-like tool object in place."""
    tool_name = str(getattr(tool, "name", "") or getattr(tool, "__name__", "tool")).strip() or "tool"
    if hasattr(tool, "invoke") and callable(tool.invoke):
        tool.invoke = self.wrap_tool(
            tool_name=tool_name,
            fn=tool.invoke,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            request_data_tags=request_data_tags,
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
    if hasattr(tool, "ainvoke") and callable(tool.ainvoke):
        tool.ainvoke = self.wrap_async_tool(
            tool_name=tool_name,
            fn=tool.ainvoke,
            agent_id=agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            request_data_tags=request_data_tags,
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
    return tool