Skip to content

Dashboard

service

Phase 5 dashboard and enterprise service layer.

TenantPolicySetManager

File-backed tenant policy set registry.

Source code in safeai/dashboard/service.py
class TenantPolicySetManager:
    """File-backed tenant policy set registry."""

    def __init__(self, *, file_path: Path | None, default_tenant_id: str = "default") -> None:
        self.file_path = file_path
        self.default_tenant_id = _token(default_tenant_id) or "default"
        self._sets: dict[str, TenantPolicySet] = {}
        self._load()
        if not self._sets:
            self.upsert(
                TenantPolicySet(
                    tenant_id=self.default_tenant_id,
                    name="Default Tenant",
                    policy_files=("policies/default.yaml",),
                    agents=("default-agent",),
                )
            )

    def list_sets(self) -> list[TenantPolicySet]:
        return sorted(self._sets.values(), key=lambda item: item.tenant_id)

    def get(self, tenant_id: str) -> TenantPolicySet | None:
        token = _token(tenant_id)
        if not token:
            return None
        return self._sets.get(token)

    def upsert(self, policy_set: TenantPolicySet) -> None:
        self._sets[policy_set.tenant_id] = policy_set
        self._persist()

    def resolve_agent_tenant(self, agent_id: str | None) -> str:
        token = _token(agent_id)
        if not token:
            return self.default_tenant_id
        for row in self._sets.values():
            if token in row.agents:
                return row.tenant_id
        return self.default_tenant_id

    def _load(self) -> None:
        if self.file_path is None:
            return
        self.file_path.parent.mkdir(parents=True, exist_ok=True)
        if not self.file_path.exists():
            return
        raw = yaml.safe_load(self.file_path.read_text(encoding="utf-8")) or {}
        if not isinstance(raw, dict):
            return
        rows = raw.get("tenants", [])
        if not isinstance(rows, list):
            return
        loaded: dict[str, TenantPolicySet] = {}
        for item in rows:
            if not isinstance(item, dict):
                continue
            tenant_id = _token(item.get("tenant_id"))
            if not tenant_id:
                continue
            name = _token(item.get("name")) or tenant_id
            policy_files = _normalize_tokens(item.get("policy_files", []))
            agents = _normalize_tokens(item.get("agents", []))
            loaded[tenant_id] = TenantPolicySet(
                tenant_id=tenant_id,
                name=name,
                policy_files=policy_files,
                agents=agents,
            )
        self._sets = loaded

    def _persist(self) -> None:
        if self.file_path is None:
            return
        self.file_path.parent.mkdir(parents=True, exist_ok=True)
        payload = {
            "version": "v1alpha1",
            "tenants": [
                {
                    "tenant_id": row.tenant_id,
                    "name": row.name,
                    "policy_files": list(row.policy_files),
                    "agents": list(row.agents),
                }
                for row in self.list_sets()
            ],
        }
        self.file_path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8")

AlertRuleManager

File-backed alert rules and alert event sink.

Source code in safeai/dashboard/service.py
class AlertRuleManager:
    """File-backed alert rules and alert event sink."""

    def __init__(
        self,
        *,
        rules_file: Path | None,
        alert_log_file: Path | None,
        cooldown_seconds: int = 60,
    ) -> None:
        self.rules_file = rules_file
        self.alert_log_file = alert_log_file
        self.cooldown_seconds = max(cooldown_seconds, 0)
        self._rules: dict[str, AlertRule] = {}
        self._sliding_windows: dict[str, deque[datetime]] = {}
        self._last_alert_time: dict[str, datetime] = {}
        self._alert_channels: list[Any] = []
        self._load()

    def list_rules(self) -> list[AlertRule]:
        return sorted(self._rules.values(), key=lambda row: row.rule_id)

    def upsert(self, rule: AlertRule) -> None:
        self._rules[rule.rule_id] = rule
        self._persist_rules()

    def recent_alerts(self, *, limit: int = 20) -> list[dict[str, Any]]:
        if self.alert_log_file is None or not self.alert_log_file.exists():
            return []
        rows: list[dict[str, Any]] = []
        for line in self.alert_log_file.read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if not line:
                continue
            try:
                payload = json.loads(line)
            except Exception:
                continue
            if isinstance(payload, dict):
                rows.append(payload)
        rows.sort(key=lambda item: str(item.get("timestamp", "")), reverse=True)
        if limit <= 0:
            return rows
        return rows[:limit]

    def set_alert_channels(self, channels: list[Any]) -> None:
        """Set external alert channels for multi-channel dispatch."""
        self._alert_channels = list(channels)

    def evaluate_single_event(self, event: dict[str, Any]) -> list[dict[str, Any]]:
        """Push-based alert evaluation for a single incoming event."""
        now = datetime.now(timezone.utc)
        triggered: list[dict[str, Any]] = []
        for rule in self.list_rules():
            if not _matches_rule(event, rule):
                continue
            window_duration = _parse_duration(rule.window)
            window = self._sliding_windows.setdefault(rule.rule_id, deque())
            window.append(now)
            cutoff = now - window_duration
            while window and window[0] < cutoff:
                window.popleft()
            if len(window) < rule.threshold:
                continue
            last_fired = self._last_alert_time.get(rule.rule_id)
            if last_fired and self.cooldown_seconds > 0:
                if (now - last_fired).total_seconds() < self.cooldown_seconds:
                    continue
            self._last_alert_time[rule.rule_id] = now
            alert = {
                "alert_id": f"alr_{now.strftime('%Y%m%d%H%M%S')}_{rule.rule_id}",
                "rule_id": rule.rule_id,
                "rule_name": rule.name,
                "threshold": rule.threshold,
                "window": rule.window,
                "count": len(window),
                "channels": list(rule.channels),
                "sample_event_ids": [str(event.get("event_id", ""))],
                "timestamp": now.isoformat(),
            }
            self._dispatch_to_channels(alert)
            self._notify(alert)
            triggered.append(alert)
        return triggered

    def _dispatch_to_channels(self, alert: dict[str, Any]) -> None:
        """Dispatch an alert to registered external channels."""
        if not self._alert_channels:
            return
        try:
            from safeai.alerting.channels import dispatch_alert

            dispatch_alert(alert, self._alert_channels)
        except Exception:
            pass

    def evaluate(self, *, events: list[dict[str, Any]]) -> list[dict[str, Any]]:
        now = datetime.now(timezone.utc)
        triggered: list[dict[str, Any]] = []
        for rule in self.list_rules():
            cutoff = now - _parse_duration(rule.window)
            matched = [event for event in events if _event_within(event, cutoff) and _matches_rule(event, rule)]
            if len(matched) < rule.threshold:
                continue
            tenant_ids = list(_normalize_tokens(_tenant_from_event_metadata(item) for item in matched))
            alert = {
                "alert_id": f"alr_{now.strftime('%Y%m%d%H%M%S')}_{rule.rule_id}",
                "rule_id": rule.rule_id,
                "rule_name": rule.name,
                "threshold": rule.threshold,
                "window": rule.window,
                "count": len(matched),
                "channels": list(rule.channels),
                "tenant_ids": [tenant for tenant in tenant_ids if tenant],
                "sample_event_ids": [str(item.get("event_id", "")) for item in matched[:20]],
                "timestamp": now.isoformat(),
            }
            self._notify(alert)
            triggered.append(alert)
        return triggered

    def _load(self) -> None:
        if self.rules_file is None:
            return
        self.rules_file.parent.mkdir(parents=True, exist_ok=True)
        if not self.rules_file.exists():
            return
        raw = yaml.safe_load(self.rules_file.read_text(encoding="utf-8")) or {}
        if not isinstance(raw, dict):
            return
        rows = raw.get("alert_rules", [])
        if not isinstance(rows, list):
            return
        loaded: dict[str, AlertRule] = {}
        for item in rows:
            if not isinstance(item, dict):
                continue
            parsed = _parse_alert_rule(item)
            if parsed is None:
                continue
            loaded[parsed.rule_id] = parsed
        self._rules = loaded

    def _persist_rules(self) -> None:
        if self.rules_file is None:
            return
        self.rules_file.parent.mkdir(parents=True, exist_ok=True)
        payload = {
            "version": "v1alpha1",
            "alert_rules": [
                {
                    "rule_id": rule.rule_id,
                    "name": rule.name,
                    "threshold": rule.threshold,
                    "window": rule.window,
                    "filters": dict(rule.filters),
                    "channels": list(rule.channels),
                }
                for rule in self.list_rules()
            ],
        }
        self.rules_file.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8")

    def _notify(self, alert: dict[str, Any]) -> None:
        channels = {str(item).strip().lower() for item in alert.get("channels", []) if str(item).strip()}
        if "file" not in channels or self.alert_log_file is None:
            return
        self.alert_log_file.parent.mkdir(parents=True, exist_ok=True)
        with self.alert_log_file.open("a", encoding="utf-8") as fh:
            fh.write(json.dumps(alert, separators=(",", ":"), ensure_ascii=True) + "\n")

set_alert_channels

set_alert_channels(channels: list[Any]) -> None

Set external alert channels for multi-channel dispatch.

Source code in safeai/dashboard/service.py
def set_alert_channels(self, channels: list[Any]) -> None:
    """Set external alert channels for multi-channel dispatch."""
    self._alert_channels = list(channels)

evaluate_single_event

evaluate_single_event(event: dict[str, Any]) -> list[dict[str, Any]]

Push-based alert evaluation for a single incoming event.

Source code in safeai/dashboard/service.py
def evaluate_single_event(self, event: dict[str, Any]) -> list[dict[str, Any]]:
    """Push-based alert evaluation for a single incoming event."""
    now = datetime.now(timezone.utc)
    triggered: list[dict[str, Any]] = []
    for rule in self.list_rules():
        if not _matches_rule(event, rule):
            continue
        window_duration = _parse_duration(rule.window)
        window = self._sliding_windows.setdefault(rule.rule_id, deque())
        window.append(now)
        cutoff = now - window_duration
        while window and window[0] < cutoff:
            window.popleft()
        if len(window) < rule.threshold:
            continue
        last_fired = self._last_alert_time.get(rule.rule_id)
        if last_fired and self.cooldown_seconds > 0:
            if (now - last_fired).total_seconds() < self.cooldown_seconds:
                continue
        self._last_alert_time[rule.rule_id] = now
        alert = {
            "alert_id": f"alr_{now.strftime('%Y%m%d%H%M%S')}_{rule.rule_id}",
            "rule_id": rule.rule_id,
            "rule_name": rule.name,
            "threshold": rule.threshold,
            "window": rule.window,
            "count": len(window),
            "channels": list(rule.channels),
            "sample_event_ids": [str(event.get("event_id", ""))],
            "timestamp": now.isoformat(),
        }
        self._dispatch_to_channels(alert)
        self._notify(alert)
        triggered.append(alert)
    return triggered

DashboardService

Security operations dashboard and enterprise controls.

Source code in safeai/dashboard/service.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
class DashboardService:
    """Security operations dashboard and enterprise controls."""

    def __init__(self, *, sdk: SafeAI, config_path: str | Path, config: DashboardConfig) -> None:
        self.sdk = sdk
        self.config = config
        path = Path(config_path).expanduser().resolve()
        users: dict[str, DashboardUserConfig] = {}
        for row in config.users:
            token = _token(row.user_id)
            if token:
                users[token] = row
        self._users = users
        self._tenant_sets = TenantPolicySetManager(
            file_path=_resolve_optional_path(path, config.tenant_policy_file),
            default_tenant_id="default",
        )
        self._alerts = AlertRuleManager(
            rules_file=_resolve_optional_path(path, config.alert_rules_file),
            alert_log_file=_resolve_optional_path(path, config.alert_log_file),
        )
        self._cost_tracker: CostTracker | None = None

    def authorize_request(self, headers: Mapping[str, str], *, permission: str) -> DashboardPrincipal:
        if not self.config.enabled:
            raise HTTPException(status_code=404, detail="dashboard is disabled")
        principal = self._authenticate(headers)
        self._authorize(principal, permission=permission)
        return principal

    def render_dashboard_page(self) -> str:
        return _DASHBOARD_HTML

    def overview(self, principal: DashboardPrincipal, *, last: str = "24h") -> dict[str, Any]:
        events = self.query_events(principal, filters={"last": last, "limit": 5000, "newest_first": True})
        approvals = self.list_approvals(principal, status="pending", limit=200, newest_first=True)
        incidents = self.list_incidents(principal, last=last, limit=20)
        recent_alerts = self._alerts.recent_alerts(limit=20)
        visible_tenants = self.list_tenant_policy_sets(principal)
        return {
            "window": last,
            "events_total": len(events),
            "action_counts": _count_by(events, key="action"),
            "boundary_counts": _count_by(events, key="boundary"),
            "pending_approvals": len(approvals),
            "recent_incidents": incidents[:8],
            "recent_alerts": recent_alerts[:8],
            "tenants": visible_tenants,
        }

    def query_events(self, principal: DashboardPrincipal, *, filters: dict[str, Any]) -> list[dict[str, Any]]:
        rows = self.sdk.query_audit(**filters)
        return self._filter_events_by_tenant(rows, principal)

    def list_incidents(self, principal: DashboardPrincipal, *, last: str = "24h", limit: int = 100) -> list[dict[str, Any]]:
        rows = self.query_events(principal, filters={"last": last, "limit": max(limit * 5, 200), "newest_first": True})
        incident_rows = [row for row in rows if str(row.get("action")) in {"block", "redact", "require_approval"}]
        return incident_rows[: max(limit, 1)]

    def list_approvals(
        self,
        principal: DashboardPrincipal,
        *,
        status: str | None = None,
        limit: int = 100,
        newest_first: bool = True,
    ) -> list[dict[str, Any]]:
        rows = self.sdk.list_approval_requests(
            status=status,
            newest_first=newest_first,
            limit=max(limit * 5, 500),
        )
        visible = [row for row in rows if self._is_tenant_allowed(self._approval_tenant(row), principal)]
        visible.sort(key=lambda item: item.requested_at, reverse=newest_first)
        if limit > 0:
            visible = visible[:limit]
        return [self._approval_to_dict(item) for item in visible]

    def decide_approval(
        self,
        principal: DashboardPrincipal,
        *,
        request_id: str,
        decision: str,
        note: str | None = None,
    ) -> dict[str, Any]:
        row = self.sdk.approvals.get(request_id)
        if row is None:
            raise HTTPException(status_code=404, detail="approval request not found")
        if not self._is_tenant_allowed(self._approval_tenant(row), principal):
            raise HTTPException(status_code=404, detail="approval request not found")
        token = str(decision).strip().lower()
        if token == "approve":
            ok = self.sdk.approve_request(request_id, approver_id=principal.user_id, note=note)
        elif token == "deny":
            ok = self.sdk.deny_request(request_id, approver_id=principal.user_id, note=note)
        else:
            raise HTTPException(status_code=400, detail="decision must be 'approve' or 'deny'")
        if not ok:
            raise HTTPException(status_code=409, detail="approval request can no longer be decided")
        updated = self.sdk.approvals.get(request_id)
        if updated is None:
            raise HTTPException(status_code=500, detail="approval request disappeared after decision")
        return self._approval_to_dict(updated)

    def compliance_report(
        self,
        principal: DashboardPrincipal,
        *,
        since: str | None = None,
        until: str | None = None,
        last: str | None = "24h",
        limit: int = 20000,
    ) -> dict[str, Any]:
        events = self.query_events(
            principal,
            filters={"since": since, "until": until, "last": last, "limit": max(limit, 1), "newest_first": False},
        )
        approvals = self.sdk.list_approval_requests(limit=100000, newest_first=False)
        visible_approvals = [row for row in approvals if self._is_tenant_allowed(self._approval_tenant(row), principal)]
        if since or until or last:
            start_at, end_at = _window_bounds(since=since, until=until, last=last)
            visible_approvals = [
                row
                for row in visible_approvals
                if _approval_within_window(row, start_at=start_at, end_at=end_at)
            ]
        action_counts = _count_by(events, key="action")
        boundary_counts = _count_by(events, key="boundary")
        policy_counts = _count_by(events, key="policy_name")
        agent_counts = _count_by(events, key="agent_id")
        approval_counts = _count_by_approval_status(visible_approvals)
        memory_retention_events = [
            row
            for row in events
            if str(row.get("boundary")) == "memory" and str(_metadata(row).get("phase")) == "retention_purge"
        ]
        violation_count = sum(action_counts.get(key, 0) for key in ("block", "redact", "require_approval"))
        approval_latencies = [
            (row.decided_at - row.requested_at).total_seconds()
            for row in visible_approvals
            if row.decided_at is not None
        ]
        avg_latency = sum(approval_latencies) / len(approval_latencies) if approval_latencies else 0.0
        anomaly_flags: list[str] = []
        total_events = len(events)
        blocked = action_counts.get("block", 0)
        if total_events and blocked / total_events >= 0.1:
            anomaly_flags.append("blocked-rate>=10%")
        if approval_counts.get("pending", 0) >= 10:
            anomaly_flags.append("pending-approvals>=10")
        return {
            "generated_at": datetime.now(timezone.utc).isoformat(),
            "window": {"since": since, "until": until, "last": last},
            "summary": {
                "total_events": total_events,
                "boundary_counts": boundary_counts,
                "action_counts": action_counts,
                "policy_violation_count": violation_count,
                "top_policies": sorted(policy_counts.items(), key=lambda item: item[1], reverse=True)[:10],
                "data_access_by_agent": sorted(agent_counts.items(), key=lambda item: item[1], reverse=True)[:10],
                "approval_stats": {
                    "counts": approval_counts,
                    "average_latency_seconds": round(avg_latency, 3),
                },
                "memory_retention_events": len(memory_retention_events),
                "anomaly_flags": anomaly_flags,
            },
            "evidence": {
                "sample_event_ids": [str(row.get("event_id", "")) for row in events[:20]],
                "sample_approval_ids": [row.request_id for row in visible_approvals[:20]],
            },
        }

    def list_tenant_policy_sets(self, principal: DashboardPrincipal) -> list[dict[str, Any]]:
        rows = self._tenant_sets.list_sets()
        if "*" not in principal.tenant_scope:
            rows = [row for row in rows if row.tenant_id in principal.tenant_scope]
        return [self._tenant_policy_to_dict(row) for row in rows]

    def get_tenant_policy_set(self, principal: DashboardPrincipal, tenant_id: str) -> dict[str, Any]:
        token = _token(tenant_id)
        if not token:
            raise HTTPException(status_code=400, detail="tenant_id is required")
        if "*" not in principal.tenant_scope and token not in principal.tenant_scope:
            raise HTTPException(status_code=403, detail="tenant not in scope")
        row = self._tenant_sets.get(token)
        if row is None:
            raise HTTPException(status_code=404, detail="tenant policy set not found")
        return self._tenant_policy_to_dict(row)

    def update_tenant_policy_set(
        self,
        principal: DashboardPrincipal,
        *,
        tenant_id: str,
        name: str | None = None,
        policy_files: list[str] | None = None,
        agents: list[str] | None = None,
    ) -> dict[str, Any]:
        token = _token(tenant_id)
        if not token:
            raise HTTPException(status_code=400, detail="tenant_id is required")
        if "*" not in principal.tenant_scope and token not in principal.tenant_scope:
            raise HTTPException(status_code=403, detail="tenant not in scope")
        current = self._tenant_sets.get(token)
        resolved_name = _token(name) or (current.name if current else token)
        resolved_files = _normalize_tokens(policy_files or (current.policy_files if current else []))
        resolved_agents = _normalize_tokens(agents or (current.agents if current else []))
        updated = TenantPolicySet(
            tenant_id=token,
            name=resolved_name,
            policy_files=resolved_files,
            agents=resolved_agents,
        )
        self._tenant_sets.upsert(updated)
        return self._tenant_policy_to_dict(updated)

    def agent_timeline(
        self,
        principal: DashboardPrincipal,
        *,
        agent_id: str | None = None,
        last: str = "24h",
        limit: int = 200,
    ) -> list[dict[str, Any]]:
        """Query views over existing audit log grouped by agent."""
        filters: dict[str, Any] = {"last": last, "limit": max(limit * 5, 500), "newest_first": True}
        if agent_id:
            filters["agent_id"] = agent_id
        events = self.query_events(principal, filters=filters)
        agents: dict[str, dict[str, Any]] = {}
        for event in events:
            aid = str(event.get("agent_id", "unknown"))
            if aid not in agents:
                agents[aid] = {"agent_id": aid, "event_count": 0, "events": [], "last_seen": None}
            agents[aid]["event_count"] += 1
            if agents[aid]["last_seen"] is None:
                agents[aid]["last_seen"] = event.get("timestamp")
            if len(agents[aid]["events"]) < limit:
                agents[aid]["events"].append(event)
        return sorted(agents.values(), key=lambda x: x.get("last_seen") or "", reverse=True)

    def session_trace(
        self,
        principal: DashboardPrincipal,
        *,
        session_id: str,
        limit: int = 200,
    ) -> list[dict[str, Any]]:
        """Return chronological event trace for a given session."""
        events = self.query_events(
            principal, filters={"session_id": session_id, "limit": max(limit, 1), "newest_first": False}
        )
        return events[:limit]

    def list_alert_rules(self) -> list[dict[str, Any]]:
        return [self._alert_rule_to_dict(row) for row in self._alerts.list_rules()]

    def upsert_alert_rule(
        self,
        *,
        rule_id: str,
        name: str,
        threshold: int,
        window: str,
        filters: dict[str, Any] | None = None,
        channels: list[str] | None = None,
    ) -> dict[str, Any]:
        parsed = _parse_alert_rule(
            {
                "rule_id": rule_id,
                "name": name,
                "threshold": threshold,
                "window": window,
                "filters": filters or {},
                "channels": channels or ["file"],
            }
        )
        if parsed is None:
            raise HTTPException(status_code=400, detail="invalid alert rule payload")
        self._alerts.upsert(parsed)
        return self._alert_rule_to_dict(parsed)

    def set_cost_tracker(self, tracker: CostTracker) -> None:
        """Attach an external CostTracker for the cost dashboard."""
        self._cost_tracker = tracker

    def cost_summary(self, principal: DashboardPrincipal) -> dict[str, Any]:
        """Return aggregated cost data with budget status."""
        tracker = self._cost_tracker
        if tracker is None:
            return {
                "total_cost": 0.0,
                "total_input_tokens": 0,
                "total_output_tokens": 0,
                "record_count": 0,
                "by_model": {},
                "by_provider": {},
                "by_agent": {},
                "budgets": [],
            }
        summary = tracker.summary()
        budgets = tracker.check_budget()
        return {
            "total_cost": summary.total_cost,
            "total_input_tokens": summary.total_input_tokens,
            "total_output_tokens": summary.total_output_tokens,
            "record_count": summary.record_count,
            "by_model": summary.by_model,
            "by_provider": summary.by_provider,
            "by_agent": summary.by_agent,
            "budgets": [
                {
                    "scope": b.scope,
                    "scope_id": b.scope_id,
                    "spent": b.spent,
                    "limit": b.limit,
                    "utilization_pct": b.utilization_pct,
                    "action": b.action,
                    "exceeded": b.exceeded,
                }
                for b in budgets
            ],
        }

    def evaluate_alerts(self, principal: DashboardPrincipal, *, last: str = "15m") -> dict[str, Any]:
        rows = self.query_events(principal, filters={"last": last, "limit": 20000, "newest_first": True})
        triggered = self._alerts.evaluate(events=rows)
        if "*" not in principal.tenant_scope:
            allowed = set(principal.tenant_scope)
            triggered = [
                row
                for row in triggered
                if not row.get("tenant_ids") or bool(allowed.intersection(set(row.get("tenant_ids", []))))
            ]
        return {"window": last, "triggered_count": len(triggered), "alerts": triggered}

    def _authenticate(self, headers: Mapping[str, str]) -> DashboardPrincipal:
        if not self.config.rbac_enabled:
            return DashboardPrincipal(user_id="rbac-disabled", role="admin", tenant_scope=("*",))
        user_id = _header_value(headers, self.config.user_header)
        if not user_id:
            raise HTTPException(status_code=401, detail=f"missing '{self.config.user_header}' header")
        user = self._users.get(user_id)
        if user is None:
            raise HTTPException(status_code=403, detail="dashboard user is not registered")
        tenant_scope = tuple(user.tenants or [self._tenant_sets.default_tenant_id])
        selected_tenant = _header_value(headers, self.config.tenant_header)
        if selected_tenant:
            if "*" not in tenant_scope and selected_tenant not in tenant_scope:
                raise HTTPException(status_code=403, detail="requested tenant is outside user scope")
            tenant_scope = (selected_tenant,)
        return DashboardPrincipal(
            user_id=_token(user.user_id) or "unknown",
            role=_token(user.role) or "viewer",
            tenant_scope=tuple(tenant_scope),
        )

    def _authorize(self, principal: DashboardPrincipal, *, permission: str) -> None:
        allowed = _ROLE_PERMISSIONS.get(principal.role, set())
        if "*" in allowed or permission in allowed:
            return
        raise HTTPException(status_code=403, detail=f"role '{principal.role}' lacks '{permission}' permission")

    def _filter_events_by_tenant(
        self, rows: list[dict[str, Any]], principal: DashboardPrincipal
    ) -> list[dict[str, Any]]:
        if "*" in principal.tenant_scope:
            return rows
        allowed = set(principal.tenant_scope)
        return [row for row in rows if self._event_tenant(row) in allowed]

    def _event_tenant(self, event: dict[str, Any]) -> str:
        metadata = _metadata(event)
        direct = _token(metadata.get("tenant_id"))
        if direct:
            return direct
        for key in ("agent_id", "source_agent_id", "destination_agent_id"):
            agent = _token(event.get(key))
            if agent:
                return self._tenant_sets.resolve_agent_tenant(agent)
        return self._tenant_sets.default_tenant_id

    def _approval_tenant(self, row: ApprovalRequest) -> str:
        metadata = row.metadata if isinstance(row.metadata, dict) else {}
        direct = _token(metadata.get("tenant_id"))
        if direct:
            return direct
        return self._tenant_sets.resolve_agent_tenant(row.agent_id)

    @staticmethod
    def _is_tenant_allowed(tenant_id: str, principal: DashboardPrincipal) -> bool:
        return "*" in principal.tenant_scope or tenant_id in principal.tenant_scope

    @staticmethod
    def _approval_to_dict(row: ApprovalRequest) -> dict[str, Any]:
        return {
            "request_id": row.request_id,
            "status": row.status,
            "reason": row.reason,
            "policy_name": row.policy_name,
            "agent_id": row.agent_id,
            "tool_name": row.tool_name,
            "session_id": row.session_id,
            "action_type": row.action_type,
            "data_tags": list(row.data_tags),
            "requested_at": row.requested_at.isoformat(),
            "expires_at": row.expires_at.isoformat(),
            "decided_at": row.decided_at.isoformat() if row.decided_at else None,
            "approver_id": row.approver_id,
            "decision_note": row.decision_note,
            "metadata": dict(row.metadata or {}),
        }

    @staticmethod
    def _tenant_policy_to_dict(row: TenantPolicySet) -> dict[str, Any]:
        return {
            "tenant_id": row.tenant_id,
            "name": row.name,
            "policy_files": list(row.policy_files),
            "agents": list(row.agents),
        }

    @staticmethod
    def _alert_rule_to_dict(row: AlertRule) -> dict[str, Any]:
        return {
            "rule_id": row.rule_id,
            "name": row.name,
            "threshold": row.threshold,
            "window": row.window,
            "filters": dict(row.filters),
            "channels": list(row.channels),
        }

agent_timeline

agent_timeline(principal: DashboardPrincipal, *, agent_id: str | None = None, last: str = '24h', limit: int = 200) -> list[dict[str, Any]]

Query views over existing audit log grouped by agent.

Source code in safeai/dashboard/service.py
def agent_timeline(
    self,
    principal: DashboardPrincipal,
    *,
    agent_id: str | None = None,
    last: str = "24h",
    limit: int = 200,
) -> list[dict[str, Any]]:
    """Query views over existing audit log grouped by agent."""
    filters: dict[str, Any] = {"last": last, "limit": max(limit * 5, 500), "newest_first": True}
    if agent_id:
        filters["agent_id"] = agent_id
    events = self.query_events(principal, filters=filters)
    agents: dict[str, dict[str, Any]] = {}
    for event in events:
        aid = str(event.get("agent_id", "unknown"))
        if aid not in agents:
            agents[aid] = {"agent_id": aid, "event_count": 0, "events": [], "last_seen": None}
        agents[aid]["event_count"] += 1
        if agents[aid]["last_seen"] is None:
            agents[aid]["last_seen"] = event.get("timestamp")
        if len(agents[aid]["events"]) < limit:
            agents[aid]["events"].append(event)
    return sorted(agents.values(), key=lambda x: x.get("last_seen") or "", reverse=True)

session_trace

session_trace(principal: DashboardPrincipal, *, session_id: str, limit: int = 200) -> list[dict[str, Any]]

Return chronological event trace for a given session.

Source code in safeai/dashboard/service.py
def session_trace(
    self,
    principal: DashboardPrincipal,
    *,
    session_id: str,
    limit: int = 200,
) -> list[dict[str, Any]]:
    """Return chronological event trace for a given session."""
    events = self.query_events(
        principal, filters={"session_id": session_id, "limit": max(limit, 1), "newest_first": False}
    )
    return events[:limit]

set_cost_tracker

set_cost_tracker(tracker: CostTracker) -> None

Attach an external CostTracker for the cost dashboard.

Source code in safeai/dashboard/service.py
def set_cost_tracker(self, tracker: CostTracker) -> None:
    """Attach an external CostTracker for the cost dashboard."""
    self._cost_tracker = tracker

cost_summary

cost_summary(principal: DashboardPrincipal) -> dict[str, Any]

Return aggregated cost data with budget status.

Source code in safeai/dashboard/service.py
def cost_summary(self, principal: DashboardPrincipal) -> dict[str, Any]:
    """Return aggregated cost data with budget status."""
    tracker = self._cost_tracker
    if tracker is None:
        return {
            "total_cost": 0.0,
            "total_input_tokens": 0,
            "total_output_tokens": 0,
            "record_count": 0,
            "by_model": {},
            "by_provider": {},
            "by_agent": {},
            "budgets": [],
        }
    summary = tracker.summary()
    budgets = tracker.check_budget()
    return {
        "total_cost": summary.total_cost,
        "total_input_tokens": summary.total_input_tokens,
        "total_output_tokens": summary.total_output_tokens,
        "record_count": summary.record_count,
        "by_model": summary.by_model,
        "by_provider": summary.by_provider,
        "by_agent": summary.by_agent,
        "budgets": [
            {
                "scope": b.scope,
                "scope_id": b.scope_id,
                "spent": b.spent,
                "limit": b.limit,
                "utilization_pct": b.utilization_pct,
                "action": b.action,
                "exceeded": b.exceeded,
            }
            for b in budgets
        ],
    }

routes

Phase 5 dashboard HTTP routes.