class DashboardService:
"""Security operations dashboard and enterprise controls."""
def __init__(self, *, sdk: SafeAI, config_path: str | Path, config: DashboardConfig) -> None:
self.sdk = sdk
self.config = config
path = Path(config_path).expanduser().resolve()
users: dict[str, DashboardUserConfig] = {}
for row in config.users:
token = _token(row.user_id)
if token:
users[token] = row
self._users = users
self._tenant_sets = TenantPolicySetManager(
file_path=_resolve_optional_path(path, config.tenant_policy_file),
default_tenant_id="default",
)
self._alerts = AlertRuleManager(
rules_file=_resolve_optional_path(path, config.alert_rules_file),
alert_log_file=_resolve_optional_path(path, config.alert_log_file),
)
self._cost_tracker: CostTracker | None = None
def authorize_request(self, headers: Mapping[str, str], *, permission: str) -> DashboardPrincipal:
if not self.config.enabled:
raise HTTPException(status_code=404, detail="dashboard is disabled")
principal = self._authenticate(headers)
self._authorize(principal, permission=permission)
return principal
def render_dashboard_page(self) -> str:
return _DASHBOARD_HTML
def overview(self, principal: DashboardPrincipal, *, last: str = "24h") -> dict[str, Any]:
events = self.query_events(principal, filters={"last": last, "limit": 5000, "newest_first": True})
approvals = self.list_approvals(principal, status="pending", limit=200, newest_first=True)
incidents = self.list_incidents(principal, last=last, limit=20)
recent_alerts = self._alerts.recent_alerts(limit=20)
visible_tenants = self.list_tenant_policy_sets(principal)
return {
"window": last,
"events_total": len(events),
"action_counts": _count_by(events, key="action"),
"boundary_counts": _count_by(events, key="boundary"),
"pending_approvals": len(approvals),
"recent_incidents": incidents[:8],
"recent_alerts": recent_alerts[:8],
"tenants": visible_tenants,
}
def query_events(self, principal: DashboardPrincipal, *, filters: dict[str, Any]) -> list[dict[str, Any]]:
rows = self.sdk.query_audit(**filters)
return self._filter_events_by_tenant(rows, principal)
def list_incidents(self, principal: DashboardPrincipal, *, last: str = "24h", limit: int = 100) -> list[dict[str, Any]]:
rows = self.query_events(principal, filters={"last": last, "limit": max(limit * 5, 200), "newest_first": True})
incident_rows = [row for row in rows if str(row.get("action")) in {"block", "redact", "require_approval"}]
return incident_rows[: max(limit, 1)]
def list_approvals(
self,
principal: DashboardPrincipal,
*,
status: str | None = None,
limit: int = 100,
newest_first: bool = True,
) -> list[dict[str, Any]]:
rows = self.sdk.list_approval_requests(
status=status,
newest_first=newest_first,
limit=max(limit * 5, 500),
)
visible = [row for row in rows if self._is_tenant_allowed(self._approval_tenant(row), principal)]
visible.sort(key=lambda item: item.requested_at, reverse=newest_first)
if limit > 0:
visible = visible[:limit]
return [self._approval_to_dict(item) for item in visible]
def decide_approval(
self,
principal: DashboardPrincipal,
*,
request_id: str,
decision: str,
note: str | None = None,
) -> dict[str, Any]:
row = self.sdk.approvals.get(request_id)
if row is None:
raise HTTPException(status_code=404, detail="approval request not found")
if not self._is_tenant_allowed(self._approval_tenant(row), principal):
raise HTTPException(status_code=404, detail="approval request not found")
token = str(decision).strip().lower()
if token == "approve":
ok = self.sdk.approve_request(request_id, approver_id=principal.user_id, note=note)
elif token == "deny":
ok = self.sdk.deny_request(request_id, approver_id=principal.user_id, note=note)
else:
raise HTTPException(status_code=400, detail="decision must be 'approve' or 'deny'")
if not ok:
raise HTTPException(status_code=409, detail="approval request can no longer be decided")
updated = self.sdk.approvals.get(request_id)
if updated is None:
raise HTTPException(status_code=500, detail="approval request disappeared after decision")
return self._approval_to_dict(updated)
def compliance_report(
self,
principal: DashboardPrincipal,
*,
since: str | None = None,
until: str | None = None,
last: str | None = "24h",
limit: int = 20000,
) -> dict[str, Any]:
events = self.query_events(
principal,
filters={"since": since, "until": until, "last": last, "limit": max(limit, 1), "newest_first": False},
)
approvals = self.sdk.list_approval_requests(limit=100000, newest_first=False)
visible_approvals = [row for row in approvals if self._is_tenant_allowed(self._approval_tenant(row), principal)]
if since or until or last:
start_at, end_at = _window_bounds(since=since, until=until, last=last)
visible_approvals = [
row
for row in visible_approvals
if _approval_within_window(row, start_at=start_at, end_at=end_at)
]
action_counts = _count_by(events, key="action")
boundary_counts = _count_by(events, key="boundary")
policy_counts = _count_by(events, key="policy_name")
agent_counts = _count_by(events, key="agent_id")
approval_counts = _count_by_approval_status(visible_approvals)
memory_retention_events = [
row
for row in events
if str(row.get("boundary")) == "memory" and str(_metadata(row).get("phase")) == "retention_purge"
]
violation_count = sum(action_counts.get(key, 0) for key in ("block", "redact", "require_approval"))
approval_latencies = [
(row.decided_at - row.requested_at).total_seconds()
for row in visible_approvals
if row.decided_at is not None
]
avg_latency = sum(approval_latencies) / len(approval_latencies) if approval_latencies else 0.0
anomaly_flags: list[str] = []
total_events = len(events)
blocked = action_counts.get("block", 0)
if total_events and blocked / total_events >= 0.1:
anomaly_flags.append("blocked-rate>=10%")
if approval_counts.get("pending", 0) >= 10:
anomaly_flags.append("pending-approvals>=10")
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"window": {"since": since, "until": until, "last": last},
"summary": {
"total_events": total_events,
"boundary_counts": boundary_counts,
"action_counts": action_counts,
"policy_violation_count": violation_count,
"top_policies": sorted(policy_counts.items(), key=lambda item: item[1], reverse=True)[:10],
"data_access_by_agent": sorted(agent_counts.items(), key=lambda item: item[1], reverse=True)[:10],
"approval_stats": {
"counts": approval_counts,
"average_latency_seconds": round(avg_latency, 3),
},
"memory_retention_events": len(memory_retention_events),
"anomaly_flags": anomaly_flags,
},
"evidence": {
"sample_event_ids": [str(row.get("event_id", "")) for row in events[:20]],
"sample_approval_ids": [row.request_id for row in visible_approvals[:20]],
},
}
def list_tenant_policy_sets(self, principal: DashboardPrincipal) -> list[dict[str, Any]]:
rows = self._tenant_sets.list_sets()
if "*" not in principal.tenant_scope:
rows = [row for row in rows if row.tenant_id in principal.tenant_scope]
return [self._tenant_policy_to_dict(row) for row in rows]
def get_tenant_policy_set(self, principal: DashboardPrincipal, tenant_id: str) -> dict[str, Any]:
token = _token(tenant_id)
if not token:
raise HTTPException(status_code=400, detail="tenant_id is required")
if "*" not in principal.tenant_scope and token not in principal.tenant_scope:
raise HTTPException(status_code=403, detail="tenant not in scope")
row = self._tenant_sets.get(token)
if row is None:
raise HTTPException(status_code=404, detail="tenant policy set not found")
return self._tenant_policy_to_dict(row)
def update_tenant_policy_set(
self,
principal: DashboardPrincipal,
*,
tenant_id: str,
name: str | None = None,
policy_files: list[str] | None = None,
agents: list[str] | None = None,
) -> dict[str, Any]:
token = _token(tenant_id)
if not token:
raise HTTPException(status_code=400, detail="tenant_id is required")
if "*" not in principal.tenant_scope and token not in principal.tenant_scope:
raise HTTPException(status_code=403, detail="tenant not in scope")
current = self._tenant_sets.get(token)
resolved_name = _token(name) or (current.name if current else token)
resolved_files = _normalize_tokens(policy_files or (current.policy_files if current else []))
resolved_agents = _normalize_tokens(agents or (current.agents if current else []))
updated = TenantPolicySet(
tenant_id=token,
name=resolved_name,
policy_files=resolved_files,
agents=resolved_agents,
)
self._tenant_sets.upsert(updated)
return self._tenant_policy_to_dict(updated)
def agent_timeline(
self,
principal: DashboardPrincipal,
*,
agent_id: str | None = None,
last: str = "24h",
limit: int = 200,
) -> list[dict[str, Any]]:
"""Query views over existing audit log grouped by agent."""
filters: dict[str, Any] = {"last": last, "limit": max(limit * 5, 500), "newest_first": True}
if agent_id:
filters["agent_id"] = agent_id
events = self.query_events(principal, filters=filters)
agents: dict[str, dict[str, Any]] = {}
for event in events:
aid = str(event.get("agent_id", "unknown"))
if aid not in agents:
agents[aid] = {"agent_id": aid, "event_count": 0, "events": [], "last_seen": None}
agents[aid]["event_count"] += 1
if agents[aid]["last_seen"] is None:
agents[aid]["last_seen"] = event.get("timestamp")
if len(agents[aid]["events"]) < limit:
agents[aid]["events"].append(event)
return sorted(agents.values(), key=lambda x: x.get("last_seen") or "", reverse=True)
def session_trace(
self,
principal: DashboardPrincipal,
*,
session_id: str,
limit: int = 200,
) -> list[dict[str, Any]]:
"""Return chronological event trace for a given session."""
events = self.query_events(
principal, filters={"session_id": session_id, "limit": max(limit, 1), "newest_first": False}
)
return events[:limit]
def list_alert_rules(self) -> list[dict[str, Any]]:
return [self._alert_rule_to_dict(row) for row in self._alerts.list_rules()]
def upsert_alert_rule(
self,
*,
rule_id: str,
name: str,
threshold: int,
window: str,
filters: dict[str, Any] | None = None,
channels: list[str] | None = None,
) -> dict[str, Any]:
parsed = _parse_alert_rule(
{
"rule_id": rule_id,
"name": name,
"threshold": threshold,
"window": window,
"filters": filters or {},
"channels": channels or ["file"],
}
)
if parsed is None:
raise HTTPException(status_code=400, detail="invalid alert rule payload")
self._alerts.upsert(parsed)
return self._alert_rule_to_dict(parsed)
def set_cost_tracker(self, tracker: CostTracker) -> None:
"""Attach an external CostTracker for the cost dashboard."""
self._cost_tracker = tracker
def cost_summary(self, principal: DashboardPrincipal) -> dict[str, Any]:
"""Return aggregated cost data with budget status."""
tracker = self._cost_tracker
if tracker is None:
return {
"total_cost": 0.0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"record_count": 0,
"by_model": {},
"by_provider": {},
"by_agent": {},
"budgets": [],
}
summary = tracker.summary()
budgets = tracker.check_budget()
return {
"total_cost": summary.total_cost,
"total_input_tokens": summary.total_input_tokens,
"total_output_tokens": summary.total_output_tokens,
"record_count": summary.record_count,
"by_model": summary.by_model,
"by_provider": summary.by_provider,
"by_agent": summary.by_agent,
"budgets": [
{
"scope": b.scope,
"scope_id": b.scope_id,
"spent": b.spent,
"limit": b.limit,
"utilization_pct": b.utilization_pct,
"action": b.action,
"exceeded": b.exceeded,
}
for b in budgets
],
}
def evaluate_alerts(self, principal: DashboardPrincipal, *, last: str = "15m") -> dict[str, Any]:
rows = self.query_events(principal, filters={"last": last, "limit": 20000, "newest_first": True})
triggered = self._alerts.evaluate(events=rows)
if "*" not in principal.tenant_scope:
allowed = set(principal.tenant_scope)
triggered = [
row
for row in triggered
if not row.get("tenant_ids") or bool(allowed.intersection(set(row.get("tenant_ids", []))))
]
return {"window": last, "triggered_count": len(triggered), "alerts": triggered}
def _authenticate(self, headers: Mapping[str, str]) -> DashboardPrincipal:
if not self.config.rbac_enabled:
return DashboardPrincipal(user_id="rbac-disabled", role="admin", tenant_scope=("*",))
user_id = _header_value(headers, self.config.user_header)
if not user_id:
raise HTTPException(status_code=401, detail=f"missing '{self.config.user_header}' header")
user = self._users.get(user_id)
if user is None:
raise HTTPException(status_code=403, detail="dashboard user is not registered")
tenant_scope = tuple(user.tenants or [self._tenant_sets.default_tenant_id])
selected_tenant = _header_value(headers, self.config.tenant_header)
if selected_tenant:
if "*" not in tenant_scope and selected_tenant not in tenant_scope:
raise HTTPException(status_code=403, detail="requested tenant is outside user scope")
tenant_scope = (selected_tenant,)
return DashboardPrincipal(
user_id=_token(user.user_id) or "unknown",
role=_token(user.role) or "viewer",
tenant_scope=tuple(tenant_scope),
)
def _authorize(self, principal: DashboardPrincipal, *, permission: str) -> None:
allowed = _ROLE_PERMISSIONS.get(principal.role, set())
if "*" in allowed or permission in allowed:
return
raise HTTPException(status_code=403, detail=f"role '{principal.role}' lacks '{permission}' permission")
def _filter_events_by_tenant(
self, rows: list[dict[str, Any]], principal: DashboardPrincipal
) -> list[dict[str, Any]]:
if "*" in principal.tenant_scope:
return rows
allowed = set(principal.tenant_scope)
return [row for row in rows if self._event_tenant(row) in allowed]
def _event_tenant(self, event: dict[str, Any]) -> str:
metadata = _metadata(event)
direct = _token(metadata.get("tenant_id"))
if direct:
return direct
for key in ("agent_id", "source_agent_id", "destination_agent_id"):
agent = _token(event.get(key))
if agent:
return self._tenant_sets.resolve_agent_tenant(agent)
return self._tenant_sets.default_tenant_id
def _approval_tenant(self, row: ApprovalRequest) -> str:
metadata = row.metadata if isinstance(row.metadata, dict) else {}
direct = _token(metadata.get("tenant_id"))
if direct:
return direct
return self._tenant_sets.resolve_agent_tenant(row.agent_id)
@staticmethod
def _is_tenant_allowed(tenant_id: str, principal: DashboardPrincipal) -> bool:
return "*" in principal.tenant_scope or tenant_id in principal.tenant_scope
@staticmethod
def _approval_to_dict(row: ApprovalRequest) -> dict[str, Any]:
return {
"request_id": row.request_id,
"status": row.status,
"reason": row.reason,
"policy_name": row.policy_name,
"agent_id": row.agent_id,
"tool_name": row.tool_name,
"session_id": row.session_id,
"action_type": row.action_type,
"data_tags": list(row.data_tags),
"requested_at": row.requested_at.isoformat(),
"expires_at": row.expires_at.isoformat(),
"decided_at": row.decided_at.isoformat() if row.decided_at else None,
"approver_id": row.approver_id,
"decision_note": row.decision_note,
"metadata": dict(row.metadata or {}),
}
@staticmethod
def _tenant_policy_to_dict(row: TenantPolicySet) -> dict[str, Any]:
return {
"tenant_id": row.tenant_id,
"name": row.name,
"policy_files": list(row.policy_files),
"agents": list(row.agents),
}
@staticmethod
def _alert_rule_to_dict(row: AlertRule) -> dict[str, Any]:
return {
"rule_id": row.rule_id,
"name": row.name,
"threshold": row.threshold,
"window": row.window,
"filters": dict(row.filters),
"channels": list(row.channels),
}