Skip to content

Intelligence Layer

API reference for the intelligence advisory agents.

Backend

backend

BYOM backend abstraction for AI inference.

AIBackendNotConfiguredError

Bases: RuntimeError

Raised when no AI backend is configured.

Source code in safeai/intelligence/backend.py
class AIBackendNotConfiguredError(RuntimeError):
    """Raised when no AI backend is configured."""

OllamaBackend

Local inference via Ollama REST API.

Source code in safeai/intelligence/backend.py
class OllamaBackend:
    """Local inference via Ollama REST API."""

    def __init__(self, model: str = "llama3.2", base_url: str = "http://localhost:11434") -> None:
        self._model = model
        self._base_url = base_url.rstrip("/")

    @property
    def model_name(self) -> str:
        return self._model

    def complete(self, messages: list[AIMessage], **kwargs: Any) -> AIResponse:
        payload: dict[str, Any] = {
            "model": self._model,
            "messages": [{"role": m.role, "content": m.content} for m in messages],
            "stream": False,
        }
        payload.update(kwargs)
        with httpx.Client(timeout=120.0) as client:
            resp = client.post(f"{self._base_url}/api/chat", json=payload)
            resp.raise_for_status()
        data = resp.json()
        msg = data.get("message", {})
        return AIResponse(
            content=msg.get("content", ""),
            model=data.get("model", self._model),
            usage={
                "prompt_tokens": data.get("prompt_eval_count", 0),
                "completion_tokens": data.get("eval_count", 0),
            },
            raw=data,
        )

OpenAICompatibleBackend

OpenAI-compatible chat completions endpoint (OpenAI, Anthropic, Azure, vLLM, etc.).

Source code in safeai/intelligence/backend.py
class OpenAICompatibleBackend:
    """OpenAI-compatible chat completions endpoint (OpenAI, Anthropic, Azure, vLLM, etc.)."""

    def __init__(
        self,
        model: str,
        api_key: str = "",
        base_url: str = "https://api.openai.com/v1",
    ) -> None:
        self._model = model
        self._api_key = api_key
        self._base_url = base_url.rstrip("/")

    @property
    def model_name(self) -> str:
        return self._model

    def complete(self, messages: list[AIMessage], **kwargs: Any) -> AIResponse:
        headers: dict[str, str] = {"Content-Type": "application/json"}
        if self._api_key:
            headers["Authorization"] = f"Bearer {self._api_key}"
        payload: dict[str, Any] = {
            "model": self._model,
            "messages": [{"role": m.role, "content": m.content} for m in messages],
        }
        payload.update(kwargs)
        with httpx.Client(timeout=120.0) as client:
            resp = client.post(
                f"{self._base_url}/chat/completions",
                json=payload,
                headers=headers,
            )
            if resp.status_code != 200:
                try:
                    detail = resp.json()
                    err_msg = detail.get("error", {}).get("message", "") or resp.text
                except Exception:
                    err_msg = resp.text
                status = resp.status_code
                hints = {
                    401: "Fix: Verify your API key is set and valid for this provider.",
                    403: "Fix: Your API key lacks permission for this model or endpoint.",
                    404: f"Fix: Check that model '{self._model}' exists on this endpoint.",
                    429: "Fix: Rate limit hit. Reduce request frequency or upgrade your plan.",
                }
                hint = hints.get(status, "Fix: Check service health and provider documentation.")
                raise RuntimeError(
                    f"AI backend error (HTTP {status}) from {self._base_url}: {err_msg}\n"
                    f"{hint}"
                )
        data = resp.json()
        choices = data.get("choices", [])
        content = choices[0]["message"]["content"] if choices else ""
        usage_data = data.get("usage", {})
        return AIResponse(
            content=content,
            model=data.get("model", self._model),
            usage={
                "prompt_tokens": usage_data.get("prompt_tokens", 0),
                "completion_tokens": usage_data.get("completion_tokens", 0),
            },
            raw=data,
        )

AIBackendRegistry

Named registry of AI backends with a default.

Source code in safeai/intelligence/backend.py
class AIBackendRegistry:
    """Named registry of AI backends with a default."""

    def __init__(self) -> None:
        self._backends: dict[str, AIBackend] = {}
        self._default: str | None = None

    def register(self, name: str, backend: AIBackend, *, default: bool = False) -> None:
        self._backends[name] = backend
        if default or len(self._backends) == 1:
            self._default = name

    def get(self, name: str | None = None) -> AIBackend:
        key = name or self._default
        if key is None or key not in self._backends:
            raise AIBackendNotConfiguredError(
                "No AI backend configured. Register one with "
                "safeai.register_ai_backend() or set intelligence.backend in safeai.yaml."
            )
        return self._backends[key]

    def list_backends(self) -> list[str]:
        return list(self._backends.keys())

    @property
    def default_name(self) -> str | None:
        return self._default

Sanitizer

sanitizer

Metadata sanitizer — ensures AI agents never see raw protected data.

CodebaseStructure dataclass

Structural metadata extracted from a project — no file body content.

Source code in safeai/intelligence/sanitizer.py
@dataclass(frozen=True)
class CodebaseStructure:
    """Structural metadata extracted from a project — no file body content."""

    file_paths: tuple[str, ...] = ()
    imports: tuple[str, ...] = ()
    class_names: tuple[str, ...] = ()
    function_names: tuple[str, ...] = ()
    decorators: tuple[str, ...] = ()
    dependencies: tuple[str, ...] = ()
    framework_hints: tuple[str, ...] = ()

MetadataSanitizer

Strips raw values from audit events and extracts safe metadata.

Source code in safeai/intelligence/sanitizer.py
class MetadataSanitizer:
    """Strips raw values from audit events and extracts safe metadata."""

    def __init__(self, *, metadata_only: bool = True) -> None:
        self._metadata_only = metadata_only

    def sanitize_event(self, event: dict[str, Any]) -> SanitizedAuditEvent:
        safe_meta: dict[str, Any] = {}
        raw_meta = event.get("metadata") or {}
        for k, v in raw_meta.items():
            if k in BANNED_METADATA_KEYS:
                continue
            if k in SAFE_METADATA_KEYS:
                safe_meta[k] = v

        kwargs: dict[str, Any] = {}
        for key in _EVENT_PASSTHROUGH_KEYS:
            val = event.get(key, "")
            if key == "data_tags":
                kwargs[key] = tuple(val) if isinstance(val, (list, tuple)) else ()
            else:
                kwargs[key] = str(val) if val is not None else ""

        return SanitizedAuditEvent(**kwargs, safe_metadata=safe_meta)

    def aggregate_events(self, events: list[dict[str, Any]]) -> SanitizedAuditAggregate:
        by_action: Counter[str] = Counter()
        by_boundary: Counter[str] = Counter()
        by_policy: Counter[str] = Counter()
        by_agent: Counter[str] = Counter()
        by_tool: Counter[str] = Counter()
        by_tag: Counter[str] = Counter()

        for ev in events:
            by_action[ev.get("action", "unknown")] += 1
            by_boundary[ev.get("boundary", "unknown")] += 1
            policy = ev.get("policy_name", "unknown")
            if policy:
                by_policy[policy] += 1
            agent = ev.get("agent_id", "unknown")
            if agent:
                by_agent[agent] += 1
            tool = ev.get("tool_name", "")
            if tool:
                by_tool[tool] += 1
            for tag in ev.get("data_tags", []):
                by_tag[tag] += 1

        return SanitizedAuditAggregate(
            total_events=len(events),
            events_by_action=dict(by_action),
            events_by_boundary=dict(by_boundary),
            events_by_policy=dict(by_policy),
            events_by_agent=dict(by_agent),
            events_by_tool=dict(by_tool),
            events_by_tag=dict(by_tag),
        )

    def extract_codebase_structure(self, project_path: str | Path) -> CodebaseStructure:
        root = Path(project_path).resolve()
        if not root.exists() or not root.is_dir():
            raise FileNotFoundError(
                f"Project path not found: {root}\n"
                f"Fix: Provide an absolute path to an existing directory with source files."
            )
        file_paths: list[str] = []
        all_imports: list[str] = []
        all_classes: list[str] = []
        all_functions: list[str] = []
        all_decorators: list[str] = []
        dependencies: list[str] = []
        framework_hints: list[str] = []

        # Collect Python files
        for py_file in sorted(root.rglob("*.py")):
            rel = str(py_file.relative_to(root))
            if any(part.startswith(".") for part in py_file.parts):
                continue
            if "node_modules" in rel or "__pycache__" in rel:
                continue
            file_paths.append(rel)
            try:
                tree = ast.parse(py_file.read_text(encoding="utf-8", errors="replace"))
            except (SyntaxError, UnicodeDecodeError):
                continue
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        all_imports.append(alias.name)
                elif isinstance(node, ast.ImportFrom):
                    if node.module:
                        all_imports.append(node.module)
                elif isinstance(node, ast.ClassDef):
                    all_classes.append(node.name)
                    for dec in node.decorator_list:
                        all_decorators.append(_decorator_name(dec))
                elif isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
                    all_functions.append(node.name)
                    for dec in node.decorator_list:
                        all_decorators.append(_decorator_name(dec))

        # Collect non-Python file names
        for ext in ("*.yaml", "*.yml", "*.toml", "*.json", "*.cfg"):
            for f in sorted(root.rglob(ext)):
                rel = str(f.relative_to(root))
                if not any(part.startswith(".") for part in f.parts):
                    file_paths.append(rel)

        # Parse pyproject.toml for deps
        pyproject = root / "pyproject.toml"
        if pyproject.exists():
            try:
                text = pyproject.read_text(encoding="utf-8")
                dependencies = _extract_toml_deps(text)
            except Exception:
                pass

        # Detect frameworks from imports
        framework_map = {
            "langchain": "langchain",
            "crewai": "crewai",
            "autogen": "autogen",
            "openai": "openai",
            "anthropic": "anthropic",
            "fastapi": "fastapi",
            "flask": "flask",
            "django": "django",
        }
        unique_imports = set(all_imports)
        for imp in unique_imports:
            root_pkg = imp.split(".")[0].lower()
            if root_pkg in framework_map:
                framework_hints.append(framework_map[root_pkg])

        return CodebaseStructure(
            file_paths=tuple(sorted(set(file_paths))),
            imports=tuple(sorted(set(all_imports))),
            class_names=tuple(sorted(set(all_classes))),
            function_names=tuple(sorted(set(all_functions))),
            decorators=tuple(sorted(set(all_decorators))),
            dependencies=tuple(sorted(set(dependencies))),
            framework_hints=tuple(sorted(set(framework_hints))),
        )

Advisor

advisor

Base advisor abstraction for all intelligence agents.

BaseAdvisor

Bases: ABC

Abstract base class for all intelligence advisory agents.

Source code in safeai/intelligence/advisor.py
class BaseAdvisor(ABC):
    """Abstract base class for all intelligence advisory agents."""

    def __init__(
        self,
        backend: AIBackend,
        sanitizer: MetadataSanitizer | None = None,
    ) -> None:
        self._backend = backend
        self._sanitizer = sanitizer or MetadataSanitizer()

    @property
    @abstractmethod
    def name(self) -> str: ...

    @abstractmethod
    def advise(self, **kwargs: Any) -> AdvisorResult: ...

    def _error_result(self, message: str) -> AdvisorResult:
        return AdvisorResult(
            advisor_name=self.name,
            status="error",
            summary=message,
        )

Auto-Config

auto_config

Auto-Config advisory agent — generates SafeAI configuration from codebase structure.

AutoConfigAdvisor

Bases: BaseAdvisor

Reads codebase structure and generates SafeAI configuration files.

Source code in safeai/intelligence/auto_config.py
class AutoConfigAdvisor(BaseAdvisor):
    """Reads codebase structure and generates SafeAI configuration files."""

    def __init__(
        self,
        backend: AIBackend,
        sanitizer: MetadataSanitizer | None = None,
    ) -> None:
        super().__init__(backend, sanitizer)

    @property
    def name(self) -> str:
        return "auto-config"

    def advise(self, **kwargs: Any) -> AdvisorResult:
        project_path = kwargs.get("project_path", ".")
        framework_hint = kwargs.get("framework_hint")

        try:
            structure = self._sanitizer.extract_codebase_structure(project_path)
        except Exception as exc:
            return self._error_result(f"Failed to analyze project: {exc}")

        hint_extra = ""
        if framework_hint:
            hint_extra = f"User-specified framework: {framework_hint}"

        user_prompt = USER_PROMPT_TEMPLATE.format(
            file_paths=", ".join(structure.file_paths[:100]),
            imports=", ".join(structure.imports[:100]),
            class_names=", ".join(structure.class_names[:100]),
            function_names=", ".join(structure.function_names[:100]),
            decorators=", ".join(structure.decorators[:50]),
            dependencies=", ".join(structure.dependencies[:50]),
            framework_hints=", ".join(structure.framework_hints) or "none detected",
            framework_hint_extra=hint_extra,
        )

        messages = [
            AIMessage(role="system", content=SYSTEM_PROMPT),
            AIMessage(role="user", content=user_prompt),
        ]

        try:
            response = self._backend.complete(messages)
        except Exception as exc:
            return self._error_result(f"AI backend error: {exc}")

        artifacts = _parse_file_artifacts(response.content)

        return AdvisorResult(
            advisor_name=self.name,
            status="success",
            summary=f"Generated {len(artifacts)} configuration file(s) for project at {project_path}",
            artifacts=artifacts,
            raw_response=response.content,
            model_used=response.model,
            metadata={
                "project_path": str(project_path),
                "framework_hints": list(structure.framework_hints),
                "file_count": len(structure.file_paths),
                "class_count": len(structure.class_names),
                "function_count": len(structure.function_names),
            },
        )

Recommender

recommender

Policy Recommender advisory agent — suggests improvements from audit aggregates.

RecommenderAdvisor

Bases: BaseAdvisor

Reads audit aggregates and suggests policy improvements.

Source code in safeai/intelligence/recommender.py
class RecommenderAdvisor(BaseAdvisor):
    """Reads audit aggregates and suggests policy improvements."""

    def __init__(
        self,
        backend: AIBackend,
        sanitizer: MetadataSanitizer | None = None,
    ) -> None:
        super().__init__(backend, sanitizer)

    @property
    def name(self) -> str:
        return "recommender"

    def advise(self, **kwargs: Any) -> AdvisorResult:
        events = kwargs.get("events")
        since = kwargs.get("since", "7d")
        config_path = kwargs.get("config_path")

        # If no events passed directly, load from config
        if events is None and config_path:
            try:
                from safeai.api import SafeAI

                sai = SafeAI.from_config(config_path)
                events = sai.query_audit(last=since)
            except Exception as exc:
                return self._error_result(f"Failed to load audit data: {exc}")

        if events is None:
            events = []

        aggregate = self._sanitizer.aggregate_events(events)

        def _format_counts(d: dict[str, int]) -> str:
            if not d:
                return "(none)"
            return "\n".join(f"  {k}: {v}" for k, v in sorted(d.items(), key=lambda x: -x[1]))

        config_summary = "(no config loaded)"
        if config_path:
            try:
                from safeai.config.loader import load_config

                cfg = load_config(config_path)
                config_summary = (
                    f"Policy files: {cfg.paths.policy_files}\n"
                    f"Contract files: {cfg.paths.contract_files}\n"
                    f"Identity files: {cfg.paths.identity_files}"
                )
            except Exception:
                pass

        user_prompt = USER_PROMPT_TEMPLATE.format(
            since=since,
            total_events=aggregate.total_events,
            events_by_action=_format_counts(aggregate.events_by_action),
            events_by_boundary=_format_counts(aggregate.events_by_boundary),
            events_by_policy=_format_counts(aggregate.events_by_policy),
            events_by_agent=_format_counts(aggregate.events_by_agent),
            events_by_tool=_format_counts(aggregate.events_by_tool),
            events_by_tag=_format_counts(aggregate.events_by_tag),
            config_summary=config_summary,
        )

        messages = [
            AIMessage(role="system", content=SYSTEM_PROMPT),
            AIMessage(role="user", content=user_prompt),
        ]

        try:
            response = self._backend.complete(messages)
        except Exception as exc:
            return self._error_result(f"AI backend error: {exc}")

        artifacts = _parse_file_artifacts(response.content)

        return AdvisorResult(
            advisor_name=self.name,
            status="success",
            summary=(
                f"Analyzed {aggregate.total_events} events over {since}. "
                f"Generated {len(artifacts)} recommendation file(s)."
            ),
            artifacts=artifacts,
            raw_response=response.content,
            model_used=response.model,
            metadata={
                "since": since,
                "total_events": aggregate.total_events,
                "action_counts": aggregate.events_by_action,
                "boundary_counts": aggregate.events_by_boundary,
            },
        )

Incident Response

incident

Incident Response advisory agent — classifies and explains security events.

IncidentAdvisor

Bases: BaseAdvisor

Reads a sanitized audit event and classifies/explains the incident.

Source code in safeai/intelligence/incident.py
class IncidentAdvisor(BaseAdvisor):
    """Reads a sanitized audit event and classifies/explains the incident."""

    def __init__(
        self,
        backend: AIBackend,
        sanitizer: MetadataSanitizer | None = None,
    ) -> None:
        super().__init__(backend, sanitizer)

    @property
    def name(self) -> str:
        return "incident"

    def advise(self, **kwargs: Any) -> AdvisorResult:
        event = kwargs.get("event")
        context_events = kwargs.get("context_events", [])
        event_id = kwargs.get("event_id")
        config_path = kwargs.get("config_path")

        # If given raw event dict, use it directly; else look up by event_id
        if event is None and event_id and config_path:
            try:
                from safeai.api import SafeAI

                sai = SafeAI.from_config(config_path)
                events = sai.query_audit(event_id=event_id)
                if not events:
                    return self._error_result(f"Event '{event_id}' not found.")
                event = events[0]
                context_events = sai.query_audit(last="1h", limit=5)
            except Exception as exc:
                return self._error_result(f"Failed to load event: {exc}")

        if event is None:
            return self._error_result("No event provided.")

        sanitized = self._sanitizer.sanitize_event(event)

        metadata_lines = []
        for k, v in sanitized.safe_metadata.items():
            metadata_lines.append(f"- {k}: {v}")
        metadata_section = "\n".join(metadata_lines) if metadata_lines else "(none)"

        context_lines = []
        for ctx_event in context_events[:5]:
            ctx = self._sanitizer.sanitize_event(ctx_event)
            context_lines.append(
                f"- [{ctx.timestamp}] {ctx.boundary}/{ctx.action} "
                f"policy={ctx.policy_name} agent={ctx.agent_id} "
                f"tags={','.join(ctx.data_tags)}"
            )
        context_section = "\n".join(context_lines) if context_lines else "(no context events)"

        user_prompt = USER_PROMPT_TEMPLATE.format(
            event_id=sanitized.event_id,
            timestamp=sanitized.timestamp,
            boundary=sanitized.boundary,
            action=sanitized.action,
            policy_name=sanitized.policy_name,
            reason=sanitized.reason,
            data_tags=", ".join(sanitized.data_tags),
            agent_id=sanitized.agent_id,
            tool_name=sanitized.tool_name,
            session_id=sanitized.session_id,
            metadata_section=metadata_section,
            context_section=context_section,
        )

        messages = [
            AIMessage(role="system", content=SYSTEM_PROMPT),
            AIMessage(role="user", content=user_prompt),
        ]

        try:
            response = self._backend.complete(messages)
        except Exception as exc:
            return self._error_result(f"AI backend error: {exc}")

        return AdvisorResult(
            advisor_name=self.name,
            status="success",
            summary=f"Incident analysis for event {sanitized.event_id}",
            raw_response=response.content,
            model_used=response.model,
            metadata={
                "event_id": sanitized.event_id,
                "boundary": sanitized.boundary,
                "action": sanitized.action,
                "policy_name": sanitized.policy_name,
                "context_event_count": len(context_events),
            },
        )

Compliance

compliance

Compliance advisory agent — generates compliance policy sets.

ComplianceAdvisor

Bases: BaseAdvisor

Maps compliance frameworks to SafeAI policy sets.

Source code in safeai/intelligence/compliance.py
class ComplianceAdvisor(BaseAdvisor):
    """Maps compliance frameworks to SafeAI policy sets."""

    def __init__(
        self,
        backend: AIBackend,
        sanitizer: MetadataSanitizer | None = None,
    ) -> None:
        super().__init__(backend, sanitizer)

    @property
    def name(self) -> str:
        return "compliance"

    def advise(self, **kwargs: Any) -> AdvisorResult:
        framework = kwargs.get("framework", "hipaa").lower()
        config_path = kwargs.get("config_path")

        requirements = COMPLIANCE_REQUIREMENTS.get(framework)
        if not requirements:
            return self._error_result(
                f"Unknown compliance framework: {framework}. "
                f"Supported: {', '.join(COMPLIANCE_REQUIREMENTS.keys())}"
            )

        config_summary = "(no config loaded)"
        if config_path:
            try:
                from safeai.config.loader import load_config

                cfg = load_config(config_path)
                config_summary = (
                    f"Policy files: {cfg.paths.policy_files}\n"
                    f"Contract files: {cfg.paths.contract_files}\n"
                    f"Identity files: {cfg.paths.identity_files}"
                )
            except Exception:
                pass

        user_prompt = USER_PROMPT_TEMPLATE.format(
            framework=framework.upper(),
            requirements=requirements,
            config_summary=config_summary,
            framework_lower=framework,
        )

        messages = [
            AIMessage(role="system", content=SYSTEM_PROMPT),
            AIMessage(role="user", content=user_prompt),
        ]

        try:
            response = self._backend.complete(messages)
        except Exception as exc:
            return self._error_result(f"AI backend error: {exc}")

        artifacts = _parse_file_artifacts(response.content)

        return AdvisorResult(
            advisor_name=self.name,
            status="success",
            summary=f"Generated {framework.upper()} compliance policies ({len(artifacts)} file(s))",
            artifacts=artifacts,
            raw_response=response.content,
            model_used=response.model,
            metadata={"framework": framework},
        )

Integration

integration

Integration advisory agent — generates framework-specific integration code.

IntegrationAdvisor

Bases: BaseAdvisor

Reads project structure and generates integration code for target frameworks.

Source code in safeai/intelligence/integration.py
class IntegrationAdvisor(BaseAdvisor):
    """Reads project structure and generates integration code for target frameworks."""

    def __init__(
        self,
        backend: AIBackend,
        sanitizer: MetadataSanitizer | None = None,
    ) -> None:
        super().__init__(backend, sanitizer)

    @property
    def name(self) -> str:
        return "integration"

    def advise(self, **kwargs: Any) -> AdvisorResult:
        target = kwargs.get("target", "generic").lower()
        project_path = kwargs.get("project_path", ".")

        try:
            structure = self._sanitizer.extract_codebase_structure(project_path)
        except Exception as exc:
            return self._error_result(f"Failed to analyze project: {exc}")

        framework_desc = FRAMEWORK_DESCRIPTIONS.get(target, FRAMEWORK_DESCRIPTIONS["generic"])

        user_prompt = USER_PROMPT_TEMPLATE.format(
            target=target,
            file_paths=", ".join(structure.file_paths[:80]),
            dependencies=", ".join(structure.dependencies[:40]),
            framework_hints=", ".join(structure.framework_hints) or "none detected",
            framework_description=framework_desc,
            target_lower=target.replace("-", "_"),
        )

        messages = [
            AIMessage(role="system", content=SYSTEM_PROMPT),
            AIMessage(role="user", content=user_prompt),
        ]

        try:
            response = self._backend.complete(messages)
        except Exception as exc:
            return self._error_result(f"AI backend error: {exc}")

        artifacts = _parse_file_artifacts(response.content)

        return AdvisorResult(
            advisor_name=self.name,
            status="success",
            summary=f"Generated {target} integration code ({len(artifacts)} file(s))",
            artifacts=artifacts,
            raw_response=response.content,
            model_used=response.model,
            metadata={
                "target": target,
                "project_path": str(project_path),
                "framework_hints": list(structure.framework_hints),
            },
        )