Skip to content

Audit

Append-only audit logging with rich query support.

audit

Audit event primitives and JSON logger.

AuditLogger

Append-only JSON logger for boundary decisions with log rotation.

Source code in safeai/core/audit.py
class AuditLogger:
    """Append-only JSON logger for boundary decisions with log rotation."""

    def __init__(
        self,
        file_path: str | None = None,
        *,
        max_size_mb: int = 100,
        max_age_days: int = 90,
        compress_rotated: bool = True,
        max_rotated_files: int = 10,
    ) -> None:
        self.file_path = Path(file_path).expanduser() if file_path else None
        if self.file_path:
            self.file_path.parent.mkdir(parents=True, exist_ok=True)
        self._on_emit_callbacks: list[Any] = []
        self._max_size_bytes = max_size_mb * 1024 * 1024
        self._max_age_days = max_age_days
        self._compress_rotated = compress_rotated
        self._max_rotated_files = max_rotated_files

    def register_on_emit(self, callback: Any) -> None:
        """Register a callback invoked after every emit(). Callbacks receive the event dict."""
        self._on_emit_callbacks.append(callback)

    def _rotated_files(self) -> list[Path]:
        """Return existing rotated log files sorted by index (ascending)."""
        if not self.file_path:
            return []
        parent = self.file_path.parent
        stem = self.file_path.name
        results: list[tuple[int, Path]] = []
        for p in parent.iterdir():
            name = p.name
            if name.startswith(stem + ".") and name != stem:
                suffix = name[len(stem) + 1:]
                idx_str = suffix.replace(".gz", "")
                try:
                    results.append((int(idx_str), p))
                except ValueError:
                    continue
        results.sort(key=lambda t: t[0])
        return [p for _, p in results]

    def _maybe_rotate(self) -> None:
        """Rotate the current log file if it exceeds the configured max size."""
        if not self.file_path or not self.file_path.exists():
            return
        try:
            size = self.file_path.stat().st_size
        except OSError:
            return
        if size < self._max_size_bytes:
            return

        existing = self._rotated_files()
        # Shift existing rotated files up by one index
        for rotated in reversed(existing):
            name = rotated.name
            suffix = name[len(self.file_path.name) + 1:]
            idx_str = suffix.replace(".gz", "")
            try:
                idx = int(idx_str)
            except ValueError:
                continue
            new_idx = idx + 1
            is_gz = name.endswith(".gz")
            new_name = f"{self.file_path.name}.{new_idx}" + (".gz" if is_gz else "")
            rotated.rename(rotated.parent / new_name)

        # Rename current file to .1
        dest = self.file_path.parent / f"{self.file_path.name}.1"
        shutil.move(str(self.file_path), str(dest))

        # Compress if configured
        if self._compress_rotated:
            gz_dest = dest.parent / (dest.name + ".gz")
            with open(dest, "rb") as f_in, gzip.open(gz_dest, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
            dest.unlink()

        # Delete files exceeding max_rotated_files
        current_rotated = self._rotated_files()
        if len(current_rotated) > self._max_rotated_files:
            for excess in current_rotated[self._max_rotated_files:]:
                excess.unlink(missing_ok=True)

        self._purge_expired()

    def _purge_expired(self) -> None:
        """Remove rotated files older than max_age_days."""
        if not self.file_path:
            return
        cutoff = datetime.now(timezone.utc) - timedelta(days=self._max_age_days)
        cutoff_ts = cutoff.timestamp()
        for rotated in self._rotated_files():
            try:
                mtime = rotated.stat().st_mtime
            except OSError:
                continue
            if mtime < cutoff_ts:
                rotated.unlink(missing_ok=True)

    def emit(self, event: AuditEvent) -> None:
        self._maybe_rotate()
        event_payload = asdict(event)
        if not event_payload.get("context_hash"):
            event_payload["context_hash"] = context_hash(
                {
                    "event_id": event_payload.get("event_id"),
                    "boundary": event_payload.get("boundary"),
                    "action": event_payload.get("action"),
                    "policy_name": event_payload.get("policy_name"),
                    "reason": event_payload.get("reason"),
                    "data_tags": event_payload.get("data_tags", []),
                    "agent_id": event_payload.get("agent_id"),
                    "tool_name": event_payload.get("tool_name"),
                    "session_id": event_payload.get("session_id"),
                    "source_agent_id": event_payload.get("source_agent_id"),
                    "destination_agent_id": event_payload.get("destination_agent_id"),
                    "metadata": event_payload.get("metadata", {}),
                }
            )
        validated = AuditEventModel.model_validate(event_payload)
        encoded = json.dumps(validated.model_dump(mode="json"), separators=(",", ":"), ensure_ascii=True)
        if self.file_path:
            with self.file_path.open("a", encoding="utf-8") as fh:
                fh.write(encoded + "\n")
        else:
            print(encoded)
        event_dict = validated.model_dump(mode="json")
        for callback in self._on_emit_callbacks:
            try:
                callback(event_dict)
            except Exception as exc:
                import logging as _logging

                _logging.getLogger(__name__).warning(
                    "Audit callback %s failed: %s. Fix: check your callback implementation.",
                    getattr(callback, "__name__", repr(callback)),
                    exc,
                )

    def query(
        self,
        *,
        boundary: str | None = None,
        action: str | None = None,
        policy_name: str | None = None,
        agent_id: str | None = None,
        tool_name: str | None = None,
        data_tag: str | None = None,
        phase: str | None = None,
        session_id: str | None = None,
        event_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        metadata_key: str | None = None,
        metadata_value: str | None = None,
        since: str | datetime | None = None,
        until: str | datetime | None = None,
        last: str | None = None,
        min_cost: float | None = None,
        max_cost: float | None = None,
        tenant_id: str | None = None,
        limit: int = 100,
        newest_first: bool = True,
    ) -> list[dict[str, Any]]:
        """Query audit events using in-process filters.

        ``last`` accepts compact durations like ``15m``, ``2h``, and ``7d``.
        ``since`` and ``until`` accept ISO-8601 text or ``datetime``.
        """
        if not self.file_path or not self.file_path.exists():
            return []

        effective_since, effective_until = _normalize_range(since=since, until=until, last=last)
        parsed: list[dict[str, Any]] = []

        for line in self.file_path.read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if not line:
                continue
            try:
                event = json.loads(line)
                validated = AuditEventModel.model_validate(event).model_dump(mode="json")
            except Exception:
                continue
            if not _matches_event(
                event=validated,
                boundary=boundary,
                action=action,
                policy_name=policy_name,
                agent_id=agent_id,
                tool_name=tool_name,
                data_tag=data_tag,
                phase=phase,
                session_id=session_id,
                event_id=event_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                metadata_key=metadata_key,
                metadata_value=metadata_value,
                since=effective_since,
                until=effective_until,
            ):
                continue
            if min_cost is not None or max_cost is not None:
                ec = validated.get("estimated_cost")
                if ec is None:
                    continue
                if min_cost is not None and ec < min_cost:
                    continue
                if max_cost is not None and ec > max_cost:
                    continue
            if tenant_id is not None:
                event_meta = validated.get("metadata") or {}
                event_tenant = event_meta.get("tenant_id") if isinstance(event_meta, dict) else None
                if event_tenant != tenant_id:
                    continue
            parsed.append(validated)

        parsed.sort(key=lambda item: item.get("timestamp", ""), reverse=newest_first)
        if limit <= 0:
            return parsed
        return parsed[:limit]

register_on_emit

register_on_emit(callback: Any) -> None

Register a callback invoked after every emit(). Callbacks receive the event dict.

Source code in safeai/core/audit.py
def register_on_emit(self, callback: Any) -> None:
    """Register a callback invoked after every emit(). Callbacks receive the event dict."""
    self._on_emit_callbacks.append(callback)

query

query(*, boundary: str | None = None, action: str | None = None, policy_name: str | None = None, agent_id: str | None = None, tool_name: str | None = None, data_tag: str | None = None, phase: str | None = None, session_id: str | None = None, event_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, metadata_key: str | None = None, metadata_value: str | None = None, since: str | datetime | None = None, until: str | datetime | None = None, last: str | None = None, min_cost: float | None = None, max_cost: float | None = None, tenant_id: str | None = None, limit: int = 100, newest_first: bool = True) -> list[dict[str, Any]]

Query audit events using in-process filters.

last accepts compact durations like 15m, 2h, and 7d. since and until accept ISO-8601 text or datetime.

Source code in safeai/core/audit.py
def query(
    self,
    *,
    boundary: str | None = None,
    action: str | None = None,
    policy_name: str | None = None,
    agent_id: str | None = None,
    tool_name: str | None = None,
    data_tag: str | None = None,
    phase: str | None = None,
    session_id: str | None = None,
    event_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    metadata_key: str | None = None,
    metadata_value: str | None = None,
    since: str | datetime | None = None,
    until: str | datetime | None = None,
    last: str | None = None,
    min_cost: float | None = None,
    max_cost: float | None = None,
    tenant_id: str | None = None,
    limit: int = 100,
    newest_first: bool = True,
) -> list[dict[str, Any]]:
    """Query audit events using in-process filters.

    ``last`` accepts compact durations like ``15m``, ``2h``, and ``7d``.
    ``since`` and ``until`` accept ISO-8601 text or ``datetime``.
    """
    if not self.file_path or not self.file_path.exists():
        return []

    effective_since, effective_until = _normalize_range(since=since, until=until, last=last)
    parsed: list[dict[str, Any]] = []

    for line in self.file_path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            event = json.loads(line)
            validated = AuditEventModel.model_validate(event).model_dump(mode="json")
        except Exception:
            continue
        if not _matches_event(
            event=validated,
            boundary=boundary,
            action=action,
            policy_name=policy_name,
            agent_id=agent_id,
            tool_name=tool_name,
            data_tag=data_tag,
            phase=phase,
            session_id=session_id,
            event_id=event_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            metadata_key=metadata_key,
            metadata_value=metadata_value,
            since=effective_since,
            until=effective_until,
        ):
            continue
        if min_cost is not None or max_cost is not None:
            ec = validated.get("estimated_cost")
            if ec is None:
                continue
            if min_cost is not None and ec < min_cost:
                continue
            if max_cost is not None and ec > max_cost:
                continue
        if tenant_id is not None:
            event_meta = validated.get("metadata") or {}
            event_tenant = event_meta.get("tenant_id") if isinstance(event_meta, dict) else None
            if event_tenant != tenant_id:
                continue
        parsed.append(validated)

    parsed.sort(key=lambda item: item.get("timestamp", ""), reverse=newest_first)
    if limit <= 0:
        return parsed
    return parsed[:limit]

context_hash

context_hash(value: Any) -> str

Build a deterministic hash over structured context.

Source code in safeai/core/audit.py
def context_hash(value: Any) -> str:
    """Build a deterministic hash over structured context."""
    normalized = json.dumps(value, sort_keys=True, separators=(",", ":"), default=str)
    return "sha256:" + hashlib.sha256(normalized.encode("utf-8")).hexdigest()