Skip to content

SafeAI

The main SDK entry point.

SafeAI

Runtime orchestration for boundary components.

Source code in safeai/api.py
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
class SafeAI:
    """Runtime orchestration for boundary components."""

    def __init__(
        self,
        policy_engine: PolicyEngine,
        classifier: Classifier,
        audit_logger: AuditLogger,
        memory_controller: MemoryController | None = None,
        contract_registry: ToolContractRegistry | None = None,
        identity_registry: AgentIdentityRegistry | None = None,
        capability_manager: CapabilityTokenManager | None = None,
        secret_manager: SecretManager | None = None,
        approval_manager: ApprovalManager | None = None,
        plugin_manager: PluginManager | None = None,
        memory_auto_purge_expired: bool = True,
    ) -> None:
        """Initialize the SafeAI runtime orchestrator.

        SafeAI is the central facade that wires together all boundary-enforcement
        components (scanning, guarding, interception, policy evaluation, auditing,
        memory, contracts, identities, capabilities, secrets, approvals, and plugins).

        Args:
            policy_engine: Engine that evaluates policy rules against data-tag contexts.
            classifier: Detector-backed classifier used to tag data flowing through boundaries.
            audit_logger: Logger that persists audit events to a JSONL file.
            memory_controller: Optional schema-enforced agent memory store.
            contract_registry: Optional registry of tool-level data-tag contracts.
            identity_registry: Optional registry of agent identity declarations.
            capability_manager: Optional manager for scoped capability tokens.
            secret_manager: Optional secret resolution manager.
            approval_manager: Optional human-in-the-loop approval gate.
            plugin_manager: Optional plugin manager for third-party extensions.
            memory_auto_purge_expired: If True, automatically purge expired memory
                entries on every read/write operation.
        """
        self.policy_engine = policy_engine
        self.classifier = classifier
        self.audit = audit_logger
        self.memory = memory_controller
        self.contracts = contract_registry or ToolContractRegistry()
        self.identities = identity_registry or AgentIdentityRegistry()
        self.capabilities = capability_manager or CapabilityTokenManager()
        self.secrets = secret_manager or SecretManager(capability_manager=self.capabilities)
        self.approvals = approval_manager or ApprovalManager()
        self.plugins = plugin_manager or PluginManager()
        self.templates = PolicyTemplateCatalog(plugin_manager=self.plugins)
        self.memory_auto_purge_expired = memory_auto_purge_expired
        self._ai_backends: Any = None  # Lazy: AIBackendRegistry
        self._input = InputScanner(classifier=classifier, policy_engine=policy_engine, audit_logger=audit_logger)
        self._structured = StructuredScanner(
            classifier=classifier,
            policy_engine=policy_engine,
            audit_logger=audit_logger,
        )
        self._output = OutputGuard(classifier=classifier, policy_engine=policy_engine, audit_logger=audit_logger)
        self._action = ActionInterceptor(
            policy_engine=policy_engine,
            audit_logger=audit_logger,
            contract_registry=self.contracts,
            identity_registry=self.identities,
            capability_manager=self.capabilities,
            approval_manager=self.approvals,
            classifier=classifier,
        )

    @property
    def advanced(self) -> "AdvancedAPI":
        """Access advanced API methods (contracts, identities, capabilities, secrets, etc.)."""
        if not hasattr(self, "_advanced"):
            from safeai.advanced import AdvancedAPI

            self._advanced = AdvancedAPI(self)
        return self._advanced

    @classmethod
    def quickstart(
        cls,
        *,
        block_secrets: bool = True,
        redact_pii: bool = True,
        block_pii: bool = False,
        custom_rules: list[dict] | None = None,
        audit_path: str | None = None,
    ) -> "SafeAI":
        """Create a ready-to-use SafeAI instance with sensible defaults — no config files needed.

        Basic usage::

            from safeai import SafeAI
            ai = SafeAI.quickstart()

        Customise what gets enforced::

            # Block PII instead of redacting it
            ai = SafeAI.quickstart(block_pii=True, redact_pii=False)

            # Secrets only, ignore PII
            ai = SafeAI.quickstart(redact_pii=False)

            # Everything off except your own rules
            ai = SafeAI.quickstart(block_secrets=False, redact_pii=False, custom_rules=[
                {"name": "my-rule", "boundary": ["input"], "priority": 10,
                 "condition": {"data_tags": ["secret.credential"]},
                 "action": "block", "reason": "No creds allowed."},
            ])

        Args:
            block_secrets: Block API keys, tokens, and credentials (default True).
            redact_pii: Redact emails, phone numbers, SSNs in outputs (default True).
            block_pii: Block PII entirely instead of redacting (default False).
                       If both redact_pii and block_pii are True, block wins.
            custom_rules: Extra policy rules (list of dicts) added before the
                          default-allow rules. Same format as policy YAML.
            audit_path: File path for audit log. Defaults to a temp file.
        """
        rules: list[dict] = []

        if block_secrets:
            rules.append({
                "name": "block-secrets-everywhere",
                "boundary": ["input", "action", "output"],
                "priority": 10,
                "condition": {"data_tags": ["secret.credential", "secret.token", "secret"]},
                "action": "block",
                "reason": "Secrets must never cross any boundary.",
            })

        if block_pii:
            rules.append({
                "name": "block-personal-data",
                "boundary": ["input", "action", "output"],
                "priority": 20,
                "condition": {"data_tags": ["personal", "personal.pii", "personal.phi", "personal.financial"]},
                "action": "block",
                "reason": "Personal data must not cross any boundary.",
            })
        elif redact_pii:
            rules.append({
                "name": "redact-personal-data-in-output",
                "boundary": ["output"],
                "priority": 20,
                "condition": {"data_tags": ["personal", "personal.pii", "personal.phi", "personal.financial"]},
                "action": "redact",
                "reason": "Personal data must not appear in outbound responses.",
            })

        if custom_rules:
            rules.extend(custom_rules)

        # Default-allow fallbacks (always last)
        for boundary in ("input", "action", "output"):
            rules.append({
                "name": f"allow-{boundary}-by-default",
                "boundary": [boundary],
                "priority": 1000,
                "action": "allow",
                "reason": "Allow when no restrictive policy matched.",
            })

        policy_engine = PolicyEngine(normalize_rules(rules))
        classifier = Classifier(patterns=list(all_detectors()))
        _audit_path = audit_path or str(Path(tempfile.gettempdir()) / "safeai-audit.jsonl")
        audit = AuditLogger(_audit_path)
        return cls(
            policy_engine=policy_engine,
            classifier=classifier,
            audit_logger=audit,
        )

    @classmethod
    def from_config(cls, path: str | Path) -> "SafeAI":
        """Create a SafeAI instance from a YAML/JSON configuration file.

        Loads policy rules, memory schemas, tool contracts, agent identities,
        plugins, audit settings, and approval configuration from the paths
        declared in the config file.

        Args:
            path: Path to the SafeAI configuration file (YAML or JSON).

        Returns:
            A fully configured SafeAI instance.
        """
        cfg = load_config(path)
        config_path = Path(path).expanduser().resolve()
        policy_files, raw_rules = load_policy_bundle(config_path, cfg.paths.policy_files, version=cfg.version)
        policy_engine = PolicyEngine(normalize_rules(raw_rules))

        def _reload_rules():
            _, fresh_rules = load_policy_bundle(config_path, cfg.paths.policy_files, version=cfg.version)
            return normalize_rules(fresh_rules)

        policy_engine.register_reload(policy_files, _reload_rules)
        _, memory_docs = load_memory_bundle(config_path, cfg.paths.memory_schema_files, version=cfg.version)
        memory = MemoryController.from_documents(memory_docs) if memory_docs else None
        _, contract_docs = load_contract_bundle(config_path, cfg.paths.contract_files, version=cfg.version)
        contracts = ToolContractRegistry(normalize_contracts(contract_docs)) if contract_docs else ToolContractRegistry()
        _, identity_docs = load_identity_bundle(config_path, cfg.paths.identity_files, version=cfg.version)
        identities = (
            AgentIdentityRegistry(normalize_agent_identities(identity_docs))
            if identity_docs
            else AgentIdentityRegistry()
        )
        plugin_manager = (
            PluginManager.from_patterns(config_path=config_path, patterns=cfg.plugins.plugin_files)
            if cfg.plugins.enabled
            else PluginManager()
        )
        classifier = Classifier(patterns=[*all_detectors(), *plugin_manager.detector_patterns()])
        audit = AuditLogger(_resolve_optional_path(config_path, cfg.audit.file_path))
        capabilities = CapabilityTokenManager()
        approvals = ApprovalManager(
            file_path=_resolve_optional_path(config_path, cfg.approvals.file_path),
            default_ttl=cfg.approvals.default_ttl,
        )
        instance = cls(
            policy_engine=policy_engine,
            classifier=classifier,
            audit_logger=audit,
            memory_controller=memory,
            contract_registry=contracts,
            identity_registry=identities,
            capability_manager=capabilities,
            secret_manager=SecretManager(capability_manager=capabilities),
            approval_manager=approvals,
            plugin_manager=plugin_manager,
            memory_auto_purge_expired=cfg.memory_runtime.auto_purge_expired,
        )

        # Auto-register secret backends from config
        if cfg.secrets.enabled:
            for backend_cfg in cfg.secrets.backends:
                try:
                    instance._register_secret_backend_from_config(backend_cfg)
                except Exception as exc:
                    import logging
                    logging.getLogger(__name__).warning(
                        "Failed to register secret backend '%s': %s", backend_cfg.name, exc
                    )

        return instance

    def scan_input(self, data: str, agent_id: str = "unknown") -> ScanResult:
        """Scan text data through the input boundary.

        Classifies the input, evaluates policy rules, and returns a decision
        (allow, block, or redact) along with any detections.

        Args:
            data: Raw text to scan.
            agent_id: Identifier of the agent submitting the input.

        Returns:
            ScanResult containing the policy decision, detections, and filtered text.
        """
        return self._input.scan(data, agent_id=agent_id)

    def guard_output(self, data: str, agent_id: str = "unknown") -> GuardResult:
        """Guard text data at the output boundary.

        Classifies the outbound text, evaluates policy rules, and returns a
        decision (allow, block, or redact) with any detections.

        Args:
            data: Outbound text to guard.
            agent_id: Identifier of the agent producing the output.

        Returns:
            GuardResult containing the policy decision, detections, and filtered text.
        """
        return self._output.guard(data, agent_id=agent_id)

    def scan_structured_input(self, payload: Any, *, agent_id: str = "unknown") -> StructuredScanResult:
        """Scan a structured payload (dict, list, or nested object) through the input boundary.

        Recursively walks the payload, classifies string values, evaluates
        policy rules, and returns detections with JSON-path locations.

        Args:
            payload: Structured data (typically a dict or list) to scan.
            agent_id: Identifier of the agent submitting the payload.

        Returns:
            StructuredScanResult with the policy decision, path-level detections,
            and a filtered copy of the payload.
        """
        return self._structured.scan(payload, agent_id=agent_id)

    def scan_file_input(self, file_path: str | Path, *, agent_id: str = "unknown") -> FileScanResult:
        """Scan a file through the input boundary.

        Supports JSON files (structured scan) and all other text files (text scan).

        Args:
            file_path: Path to the file to scan.
            agent_id: Agent requesting the scan.

        Returns:
            FileScanResult with mode, decision, detections, and filtered content.
            Supports dict-style access for backward compatibility.

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        resolved = Path(file_path).expanduser().resolve()
        if not resolved.exists() or not resolved.is_file():
            raise FileNotFoundError(f"file not found: {resolved}")
        raw = resolved.read_bytes()
        suffix = resolved.suffix.strip().lower()
        size_bytes = len(raw)

        if suffix == ".json":
            payload = json.loads(raw.decode("utf-8", errors="strict"))
            structured = self.scan_structured_input(payload, agent_id=agent_id)
            return FileScanResult(
                mode="structured",
                file_path=str(resolved),
                size_bytes=size_bytes,
                decision={
                    "action": structured.decision.action,
                    "policy_name": structured.decision.policy_name,
                    "reason": structured.decision.reason,
                },
                detections=[
                    {
                        "path": item.path,
                        "detector": item.detector,
                        "tag": item.tag,
                        "start": item.start,
                        "end": item.end,
                    }
                    for item in structured.detections
                ],
                filtered=structured.filtered,
            )

        text = raw.decode("utf-8", errors="replace")
        scan = self.scan_input(text, agent_id=agent_id)
        return FileScanResult(
            mode="text",
            file_path=str(resolved),
            size_bytes=size_bytes,
            decision={
                "action": scan.decision.action,
                "policy_name": scan.decision.policy_name,
                "reason": scan.decision.reason,
            },
            detections=[
                {
                    "detector": item.detector,
                    "tag": item.tag,
                    "start": item.start,
                    "end": item.end,
                }
                for item in scan.detections
            ],
            filtered=scan.filtered,
        )

    def reload_policies(self) -> bool:
        """Reload policies only when watched files changed."""
        return self.policy_engine.reload_if_changed()

    def force_reload_policies(self) -> bool:
        """Always reload policies from configured files."""
        return self.policy_engine.reload()

    def memory_write(
        self, key: str, value: Any, *, agent_id: str = "unknown", strict: bool = False
    ) -> MemoryWriteResult:
        """Write a value to schema-enforced agent memory.

        Args:
            key: Field name defined in the memory schema.
            value: Value to store. Must match the field's declared type.
            agent_id: Agent performing the write.
            strict: If True, raise MemoryValidationError on failure instead of returning False.

        Returns:
            A :class:`MemoryWriteResult` with ``success`` and ``reason`` fields.
            Supports ``bool()`` conversion for backward compatibility.
        """
        if not self.memory:
            return MemoryWriteResult(success=False, reason="no memory configured")
        self._auto_purge_memory(trigger="memory_write", agent_id=agent_id)
        return self.memory.write(key=key, value=value, agent_id=agent_id, strict=strict)

    def memory_read(self, key: str, *, agent_id: str = "unknown") -> MemoryReadResult:
        """Read a value from agent memory.

        Args:
            key: Field name to read.
            agent_id: Agent performing the read.

        Returns:
            A :class:`MemoryReadResult` with ``found``, ``value``, and ``reason`` fields.
        """
        if not self.memory:
            return MemoryReadResult(found=False, reason="no memory configured")
        self._auto_purge_memory(trigger="memory_read", agent_id=agent_id)
        return self.memory.read(key=key, agent_id=agent_id)

    def memory_purge_expired(self) -> int:
        """Manually purge all expired entries from agent memory.

        Emits an audit event when entries are purged.

        Returns:
            The number of memory entries that were purged.
        """
        if not self.memory:
            return 0
        purged = self.memory.purge_expired()
        if purged:
            self._emit_memory_retention_event(
                agent_id="system",
                reason=f"Purged {purged} expired memory entr{'y' if purged == 1 else 'ies'}",
                metadata={"phase": "retention_purge", "trigger": "manual", "purged_count": purged},
            )
        return purged

    def resolve_memory_handle(
        self,
        handle_id: str,
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
    ) -> Any:
        """Resolve an encrypted memory handle, subject to policy gating.

        Looks up the handle metadata, evaluates action-boundary policy rules
        against the handle's data tag, and — if allowed — decrypts and returns
        the stored value.  Returns None when the handle is missing, policy
        blocks access, or decryption fails.

        Args:
            handle_id: Opaque identifier returned by a previous memory write.
            agent_id: Agent requesting the resolution.
            session_id: Optional session scope for audit context.
            source_agent_id: Optional originating agent for multi-agent flows.
            destination_agent_id: Optional target agent for multi-agent flows.

        Returns:
            The decrypted value, or None if resolution is denied or fails.
        """
        if not self.memory:
            return None
        metadata = self.memory.handle_metadata(handle_id)
        if metadata is None:
            self._emit_memory_retention_event(
                agent_id=agent_id,
                reason=f"Memory handle '{handle_id}' not found",
                metadata={
                    "phase": "handle_resolve",
                    "handle_id": handle_id,
                    "resolution": "missing",
                },
                action="block",
                policy_name="memory-handle",
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
            )
            return None

        decision = self.policy_engine.evaluate(
            PolicyContext(
                boundary="action",
                data_tags=[metadata["tag"]],
                agent_id=agent_id,
                tool_name="memory.resolve_handle",
            )
        )
        if decision.action in {"block", "redact", "require_approval"}:
            self._emit_memory_retention_event(
                agent_id=agent_id,
                reason=decision.reason,
                metadata={
                    "phase": "handle_resolve",
                    "handle_id": handle_id,
                    "resolution": "policy_blocked",
                    "handle_tag": metadata["tag"],
                },
                action=decision.action,
                policy_name=decision.policy_name,
                data_tags=[metadata["tag"]],
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
            )
            return None

        try:
            resolved = self.memory.resolve_handle(handle_id, agent_id=agent_id)
        except Exception as exc:
            self._emit_memory_retention_event(
                agent_id=agent_id,
                reason=str(exc),
                metadata={
                    "phase": "handle_resolve",
                    "handle_id": handle_id,
                    "resolution": "resolve_failed",
                    "handle_tag": metadata["tag"],
                },
                action="block",
                policy_name="memory-handle",
                data_tags=[metadata["tag"]],
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
            )
            return None

        self._emit_memory_retention_event(
            agent_id=agent_id,
            reason="Resolved encrypted memory handle",
            metadata={
                "phase": "handle_resolve",
                "handle_id": handle_id,
                "resolution": "allow",
                "handle_tag": metadata["tag"],
                "encrypted": True,
            },
            action="allow",
            policy_name=decision.policy_name or "memory-handle",
            data_tags=[metadata["tag"]],
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
        )
        return resolved

    def query_audit(self, **filters: Any) -> list[dict[str, Any]]:
        """Query the audit log with optional filters.

        Args:
            **filters: Keyword arguments forwarded to the audit logger's query
                method (e.g., ``event_id``, ``agent_id``, ``boundary``, ``last``,
                ``limit``).

        Returns:
            A list of audit event dictionaries matching the filters.
        """
        return self.audit.query(**filters)

    def validate_tool_request(self, tool_name: str, data_tags: list[str]) -> ContractValidationResult:
        """Validate a tool invocation against its registered data-tag contract.

        Args:
            tool_name: Name of the tool being invoked.
            data_tags: Data tags present in the request payload.

        Returns:
            ContractValidationResult indicating whether the contract allows the
            given data tags.
        """
        return self.contracts.validate_request(tool_name=tool_name, data_tags=data_tags)

    def validate_agent_identity(
        self,
        agent_id: str,
        *,
        tool_name: str | None = None,
        data_tags: list[str] | None = None,
    ) -> AgentIdentityValidationResult:
        """Validate an agent's declared identity and permissions.

        Checks that the agent is registered and, optionally, that it is
        permitted to invoke the specified tool or handle the given data tags.

        Args:
            agent_id: Identifier of the agent to validate.
            tool_name: Optional tool name to check against the agent's allowed tools.
            data_tags: Optional data tags to check against the agent's allowed tags.

        Returns:
            AgentIdentityValidationResult with a valid flag and reason.
        """
        return self.identities.validate(agent_id=agent_id, tool_name=tool_name, data_tags=data_tags)

    def issue_capability_token(
        self,
        *,
        agent_id: str,
        tool_name: str,
        actions: list[str],
        ttl: str = "10m",
        secret_keys: list[str] | None = None,
        session_id: str | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> Any:
        """Issue a scoped, time-limited capability token to an agent.

        Capability tokens grant an agent permission to perform specific actions
        on a specific tool, optionally scoped to a session and a set of secret
        keys.

        Args:
            agent_id: Agent the token is issued to.
            tool_name: Tool the token grants access to.
            actions: List of permitted actions (e.g., ``["invoke", "read"]``).
            ttl: Time-to-live string (e.g., ``"10m"``, ``"1h"``).
            secret_keys: Optional list of secret keys the token may resolve.
            session_id: Optional session scope for the token.
            metadata: Optional extra metadata stored with the token.

        Returns:
            The issued capability token object.

        Example::

            token = ai.issue_capability_token(
                agent_id="data-agent",
                tool_name="db_query",
                actions=["invoke"],
                ttl="5m",
                secret_keys=["DB_PASSWORD"],
            )
            result = ai.validate_capability_token(
                token.token_id,
                agent_id="data-agent",
                tool_name="db_query",
            )
        """
        return self.capabilities.issue(
            agent_id=agent_id,
            tool_name=tool_name,
            actions=actions,
            ttl=ttl,
            secret_keys=secret_keys,
            session_id=session_id,
            metadata=metadata,
        )

    def validate_capability_token(
        self,
        token_id: str,
        *,
        agent_id: str,
        tool_name: str,
        action: str = "invoke",
        session_id: str | None = None,
    ) -> CapabilityValidationResult:
        """Validate a capability token for a specific agent, tool, and action.

        Args:
            token_id: Identifier of the capability token to validate.
            agent_id: Agent presenting the token.
            tool_name: Tool the agent wants to use.
            action: Action the agent wants to perform (default ``"invoke"``).
            session_id: Optional session scope to validate against.

        Returns:
            CapabilityValidationResult indicating whether the token is valid.
        """
        return self.capabilities.validate(
            token_id,
            agent_id=agent_id,
            tool_name=tool_name,
            action=action,
            session_id=session_id,
        )

    def revoke_capability_token(self, token_id: str) -> bool:
        """Revoke a previously issued capability token.

        Args:
            token_id: Identifier of the token to revoke.

        Returns:
            True if the token was found and revoked, False otherwise.
        """
        return self.capabilities.revoke(token_id)

    def purge_expired_capability_tokens(self) -> int:
        """Remove all expired capability tokens from the token store.

        Returns:
            The number of tokens that were purged.
        """
        return self.capabilities.purge_expired()

    def list_approval_requests(
        self,
        *,
        status: str | None = None,
        agent_id: str | None = None,
        tool_name: str | None = None,
        newest_first: bool = True,
        limit: int = 100,
    ) -> list[ApprovalRequest]:
        """List human-in-the-loop approval requests.

        Args:
            status: Filter by status (``"pending"``, ``"approved"``, ``"denied"``,
                or ``"expired"``). None returns all statuses.
            agent_id: Filter by the requesting agent.
            tool_name: Filter by the tool the request targets.
            newest_first: If True, return newest requests first.
            limit: Maximum number of requests to return.

        Returns:
            A list of ApprovalRequest objects matching the filters.
        """
        typed_status = status if status in {"pending", "approved", "denied", "expired"} else None
        return self.approvals.list_requests(
            status=typed_status,  # type: ignore[arg-type]
            agent_id=agent_id,
            tool_name=tool_name,
            newest_first=newest_first,
            limit=limit,
        )

    def approve_request(self, request_id: str, *, approver_id: str, note: str | None = None) -> bool:
        """Approve a pending approval request.

        Args:
            request_id: Identifier of the approval request.
            approver_id: Identifier of the human or system approving the request.
            note: Optional free-text note attached to the approval.

        Returns:
            True if the request was successfully approved, False otherwise.
        """
        return self.approvals.approve(request_id, approver_id=approver_id, note=note)

    def deny_request(self, request_id: str, *, approver_id: str, note: str | None = None) -> bool:
        """Deny a pending approval request.

        Args:
            request_id: Identifier of the approval request.
            approver_id: Identifier of the human or system denying the request.
            note: Optional free-text note attached to the denial.

        Returns:
            True if the request was successfully denied, False otherwise.
        """
        return self.approvals.deny(request_id, approver_id=approver_id, note=note)

    def register_secret_backend(
        self,
        name: str,
        backend: SecretBackend,
        *,
        replace: bool = False,
    ) -> None:
        """Register a named secret backend for secret resolution.

        Args:
            name: Unique name for the backend (e.g., ``"vault"``, ``"env"``).
            backend: A SecretBackend implementation that can resolve secret keys.
            replace: If True, replace an existing backend with the same name.
        """
        self.secrets.register_backend(name, backend, replace=replace)

    def _register_secret_backend_from_config(self, cfg) -> None:
        """Register a secret backend from YAML config."""
        import os
        if cfg.type == "vault":
            from safeai.secrets.vault import VaultSecretBackend
            url = os.environ.get(cfg.url_env or "VAULT_ADDR", "")
            token = os.environ.get(cfg.token_env or "VAULT_TOKEN", "")
            self.register_secret_backend(cfg.name, VaultSecretBackend(url=url, token=token))
        elif cfg.type == "aws":
            from safeai.secrets.aws import AWSSecretBackend
            region = os.environ.get(cfg.region_env or "AWS_REGION", "us-east-1")
            self.register_secret_backend(cfg.name, AWSSecretBackend(region_name=region))
        elif cfg.type == "env":
            from safeai.secrets.env import EnvSecretBackend
            self.register_secret_backend(cfg.name, EnvSecretBackend())
        else:
            raise ValueError(
                f"Unknown secret backend type: '{cfg.type}'\n"
                f"Fix: Use one of: vault, aws, env"
            )

    def list_secret_backends(self) -> list[str]:
        """List the names of all registered secret backends.

        Returns:
            A list of backend name strings.
        """
        return self.secrets.list_backends()

    def resolve_secret(
        self,
        *,
        token_id: str,
        secret_key: str,
        agent_id: str,
        tool_name: str,
        action: str = "invoke",
        session_id: str | None = None,
        backend: str = "env",
    ) -> ResolvedSecret:
        """Resolve a single secret key using a capability token, with full audit logging.

        Validates the capability token, retrieves the secret from the specified
        backend, and emits an audit event recording the outcome.

        Args:
            token_id: Capability token authorizing the secret access.
            secret_key: Key of the secret to resolve (e.g., ``"DB_PASSWORD"``).
            agent_id: Agent requesting the secret.
            tool_name: Tool the secret is being resolved for.
            action: Capability action to validate (default ``"invoke"``).
            session_id: Optional session scope for token validation.
            backend: Name of the secret backend to use (default ``"env"``).

        Returns:
            ResolvedSecret containing the secret value and metadata.

        Raises:
            SecretAccessDeniedError: If the capability token is invalid or
                does not authorize the requested secret.
            SecretNotFoundError: If the secret key does not exist in the backend.

        Example::

            token = ai.issue_capability_token(
                agent_id="worker",
                tool_name="api_call",
                actions=["invoke"],
                secret_keys=["API_KEY"],
            )
            secret = ai.resolve_secret(
                token_id=token.token_id,
                secret_key="API_KEY",
                agent_id="worker",
                tool_name="api_call",
            )
        """
        try:
            resolved = self.secrets.resolve_secret(
                token_id=token_id,
                secret_key=secret_key,
                agent_id=agent_id,
                tool_name=tool_name,
                action=action,
                session_id=session_id,
                backend=backend,
            )
        except SecretError as exc:
            event_action = "block" if isinstance(exc, SecretAccessDeniedError) else "deny"
            self.audit.emit(
                AuditEvent(
                    boundary="action",
                    action=event_action,
                    policy_name="secret-manager",
                    reason=str(exc),
                    data_tags=["secret"],
                    agent_id=agent_id,
                    tool_name=tool_name,
                    session_id=session_id,
                    metadata={
                        "phase": "secret_resolve",
                        "secret_backend": backend,
                        "secret_key": secret_key,
                        "capability_token_id": token_id,
                        "result": "error",
                    },
                )
            )
            raise
        self.audit.emit(
            AuditEvent(
                boundary="action",
                action="allow",
                policy_name="secret-manager",
                reason="secret resolved by scoped capability",
                data_tags=["secret"],
                agent_id=agent_id,
                tool_name=tool_name,
                session_id=session_id,
                metadata={
                    "phase": "secret_resolve",
                    "secret_backend": backend,
                    "secret_key": secret_key,
                    "capability_token_id": token_id,
                    "result": "allow",
                },
            )
        )
        return resolved

    def resolve_secrets(
        self,
        *,
        token_id: str,
        secret_keys: list[str],
        agent_id: str,
        tool_name: str,
        action: str = "invoke",
        session_id: str | None = None,
        backend: str = "env",
    ) -> dict[str, ResolvedSecret]:
        """Resolve multiple secret keys in a single call.

        Iterates over the requested keys, resolving each via ``resolve_secret``.
        If any key is not found, raises SecretNotFoundError after attempting all.

        Args:
            token_id: Capability token authorizing the secret access.
            secret_keys: List of secret keys to resolve.
            agent_id: Agent requesting the secrets.
            tool_name: Tool the secrets are being resolved for.
            action: Capability action to validate (default ``"invoke"``).
            session_id: Optional session scope for token validation.
            backend: Name of the secret backend to use (default ``"env"``).

        Returns:
            A dict mapping each secret key to its ResolvedSecret.

        Raises:
            SecretNotFoundError: If one or more keys could not be found.
        """
        rows: dict[str, ResolvedSecret] = {}
        missing: list[str] = []
        for key in secret_keys:
            try:
                rows[key] = self.resolve_secret(
                    token_id=token_id,
                    secret_key=key,
                    agent_id=agent_id,
                    tool_name=tool_name,
                    action=action,
                    session_id=session_id,
                    backend=backend,
                )
            except SecretNotFoundError:
                missing.append(key)
                continue
        if missing:
            raise SecretNotFoundError(
                f"Unable to resolve secret key(s) from backend '{backend}': {','.join(sorted(set(missing)))}"
            )
        return rows

    def intercept_tool_request(
        self,
        tool_name: str,
        parameters: dict[str, Any],
        data_tags: list[str],
        *,
        agent_id: str = "unknown",
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        action_type: str | None = None,
        capability_token_id: str | None = None,
        capability_action: str = "invoke",
        approval_request_id: str | None = None,
    ) -> InterceptResult:
        """Intercept a tool invocation at the action boundary.

        Runs the full interception pipeline: policy evaluation, contract
        validation, identity checks, capability-token verification, and
        approval gating.  Returns a decision (allow, block, redact, or
        require_approval) with audit logging.

        Args:
            tool_name: Name of the tool being invoked.
            parameters: Parameters the agent is passing to the tool.
            data_tags: Data tags present in the request payload.
            agent_id: Identifier of the invoking agent.
            session_id: Optional session scope for the request.
            source_agent_id: Optional originating agent in multi-agent flows.
            destination_agent_id: Optional target agent in multi-agent flows.
            action_type: Optional label for the kind of action (e.g., ``"tool_call"``).
            capability_token_id: Optional capability token authorizing the call.
            capability_action: Action to validate on the token (default ``"invoke"``).
            approval_request_id: Optional pre-existing approval request to validate.

        Returns:
            InterceptResult with the decision, detections, and filtered parameters.

        Example::

            result = ai.intercept_tool_request(
                tool_name="send_email",
                parameters={"to": "user@example.com", "body": "Hello"},
                data_tags=["personal.pii"],
                agent_id="assistant",
            )
            if result.decision.action == "allow":
                send_email(**result.filtered_parameters)
        """
        return self._action.intercept_request(
            ToolCall(
                tool_name=tool_name,
                agent_id=agent_id,
                parameters=dict(parameters),
                data_tags=list(data_tags),
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type=action_type,
                capability_token_id=capability_token_id,
                capability_action=capability_action,
                approval_request_id=approval_request_id,
            )
        )

    def intercept_tool_response(
        self,
        tool_name: str,
        response: dict[str, Any],
        *,
        agent_id: str = "unknown",
        request_data_tags: list[str] | None = None,
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
        action_type: str | None = None,
    ) -> ResponseInterceptResult:
        """Intercept a tool's response at the action boundary.

        Classifies the response payload, evaluates policy rules, and returns
        a decision with optional redaction of sensitive fields.

        Args:
            tool_name: Name of the tool that produced the response.
            response: The tool's response payload as a dict.
            agent_id: Identifier of the agent receiving the response.
            request_data_tags: Data tags from the original request, for context.
            session_id: Optional session scope.
            source_agent_id: Optional originating agent in multi-agent flows.
            destination_agent_id: Optional target agent in multi-agent flows.
            action_type: Optional label for the kind of action.

        Returns:
            ResponseInterceptResult with the decision and filtered response.
        """
        return self._action.intercept_response(
            ToolCall(
                tool_name=tool_name,
                agent_id=agent_id,
                parameters={},
                data_tags=list(request_data_tags or []),
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                action_type=action_type,
            ),
            dict(response),
        )

    def wrap(self, fn: Any) -> Any:
        """Wrap a function for use with framework adapters.

        Args:
            fn: The callable to wrap.

        Returns:
            A wrapped callable that delegates to the original function.
        """

        def _wrapped(*args: Any, **kwargs: Any) -> Any:
            return fn(*args, **kwargs)

        return _wrapped

    def langchain_adapter(self):
        """Return a LangChain adapter bound to this SafeAI instance."""
        from safeai.middleware.langchain import SafeAILangChainAdapter

        return SafeAILangChainAdapter(self)

    def claude_adk_adapter(self):
        """Return a Claude ADK adapter bound to this SafeAI instance."""
        from safeai.middleware.claude_adk import SafeAIClaudeADKAdapter

        return SafeAIClaudeADKAdapter(self)

    def google_adk_adapter(self):
        """Return a Google ADK adapter bound to this SafeAI instance."""
        from safeai.middleware.google_adk import SafeAIGoogleADKAdapter

        return SafeAIGoogleADKAdapter(self)

    def crewai_adapter(self):
        """Return a CrewAI adapter bound to this SafeAI instance."""
        from safeai.middleware.crewai import SafeAICrewAIAdapter

        return SafeAICrewAIAdapter(self)

    def autogen_adapter(self):
        """Return an AutoGen adapter bound to this SafeAI instance."""
        from safeai.middleware.autogen import SafeAIAutoGenAdapter

        return SafeAIAutoGenAdapter(self)

    def list_plugins(self) -> list[dict[str, Any]]:
        """List all loaded plugins and their metadata.

        Returns:
            A list of dicts, each describing a loaded plugin.
        """
        return self.plugins.list_plugins()

    def list_plugin_adapters(self) -> list[str]:
        """List the names of all adapter classes provided by loaded plugins.

        Returns:
            A list of adapter name strings.
        """
        return self.plugins.adapter_names()

    def plugin_adapter(self, name: str) -> Any:
        """Build and return a plugin adapter instance by name.

        Args:
            name: Name of the adapter to instantiate.

        Returns:
            An adapter instance bound to this SafeAI runtime.
        """
        return self.plugins.build_adapter(name, self)

    def list_policy_templates(self) -> list[dict[str, Any]]:
        """List all available policy templates from the built-in catalog and plugins.

        Returns:
            A list of dicts, each describing a policy template with its name,
            description, and tags.
        """
        return self.templates.list_templates()

    def load_policy_template(self, name: str) -> dict[str, Any]:
        """Load the full content of a policy template by name.

        Args:
            name: Name of the template to load.

        Returns:
            A dict containing the template's rules, metadata, and description.
        """
        return self.templates.load(name)

    def search_policy_templates(self, **kwargs: Any) -> list[dict[str, Any]]:
        """Search policy templates by tags, keywords, or other criteria.

        Args:
            **kwargs: Search filters forwarded to the template catalog's search
                method (e.g., ``tags``, ``keyword``).

        Returns:
            A list of matching template metadata dicts.
        """
        return self.templates.search(**kwargs)

    def install_policy_template(self, name: str) -> str:
        """Install a policy template into the current project.

        Writes the template's policy YAML file into the project's policy
        directory so it is loaded on next initialization.

        Args:
            name: Name of the template to install.

        Returns:
            The file path where the template was written.
        """
        return self.templates.install(name)

    # --- Intelligence layer (lazy imports) ---

    def _ensure_ai_registry(self) -> Any:
        if self._ai_backends is None:
            from safeai.intelligence.backend import AIBackendRegistry

            self._ai_backends = AIBackendRegistry()
        return self._ai_backends

    def register_ai_backend(self, name: str, backend: Any, *, default: bool = True) -> None:
        """Register an AI backend for the intelligence layer.

        Args:
            name: Unique name for the backend (e.g., ``"openai"``, ``"anthropic"``).
            backend: An AI backend instance implementing the backend protocol.
            default: If True, set this backend as the default for intelligence calls.
        """
        registry = self._ensure_ai_registry()
        registry.register(name, backend, default=default)

    def list_ai_backends(self) -> list[str]:
        """List the names of all registered AI backends.

        Returns:
            A list of backend name strings.
        """
        return self._ensure_ai_registry().list_backends()

    def intelligence_auto_config(
        self, project_path: str = ".", framework_hint: str | None = None
    ) -> Any:
        """Auto-generate SafeAI configuration for a project using AI analysis.

        Scans the project structure and, optionally, uses a framework hint to
        produce recommended policy rules, contracts, and identity declarations.

        Args:
            project_path: Path to the project directory to analyze.
            framework_hint: Optional framework name (e.g., ``"langchain"``) to
                tailor the recommendations.

        Returns:
            An AdvisorResult containing the generated configuration advice.
        """
        from safeai.intelligence.auto_config import AutoConfigAdvisor
        from safeai.intelligence.sanitizer import MetadataSanitizer

        backend = self._ensure_ai_registry().get()
        advisor = AutoConfigAdvisor(backend=backend, sanitizer=MetadataSanitizer())
        return advisor.advise(project_path=project_path, framework_hint=framework_hint)

    def intelligence_recommend(self, since: str = "7d") -> Any:
        """Generate policy recommendations based on recent audit events.

        Analyzes audit history from the specified time window and uses the AI
        backend to suggest policy improvements.

        Args:
            since: Time window for audit events (e.g., ``"7d"``, ``"24h"``).

        Returns:
            An AdvisorResult containing recommended policy changes.
        """
        from safeai.intelligence.recommender import RecommenderAdvisor
        from safeai.intelligence.sanitizer import MetadataSanitizer

        backend = self._ensure_ai_registry().get()
        sanitizer = MetadataSanitizer()
        events = self.query_audit(last=since)
        advisor = RecommenderAdvisor(backend=backend, sanitizer=sanitizer)
        return advisor.advise(events=events)

    def intelligence_explain(self, event_id: str) -> Any:
        """Explain a specific audit event using AI-powered incident analysis.

        Retrieves the event and surrounding context, then asks the AI backend
        to produce a human-readable explanation of what happened and why.

        Args:
            event_id: Identifier of the audit event to explain.

        Returns:
            An AdvisorResult with the incident explanation, or an error result
            if the event is not found.
        """
        from safeai.intelligence.incident import IncidentAdvisor
        from safeai.intelligence.sanitizer import MetadataSanitizer

        backend = self._ensure_ai_registry().get()
        sanitizer = MetadataSanitizer()
        events = self.query_audit(event_id=event_id)
        target = events[0] if events else None
        if not target:
            from safeai.intelligence.advisor import AdvisorResult

            return AdvisorResult(
                advisor_name="incident",
                status="error",
                summary=f"Event '{event_id}' not found.",
            )
        # Get surrounding context
        context_events = self.query_audit(last="1h", limit=5)
        advisor = IncidentAdvisor(backend=backend, sanitizer=sanitizer)
        return advisor.advise(event=target, context_events=context_events)

    def intelligence_compliance(
        self, framework: str = "hipaa", config_path: str | None = None
    ) -> Any:
        """Check current SafeAI configuration against a compliance framework.

        Uses the AI backend to evaluate whether the loaded policies satisfy
        the requirements of the specified compliance framework.

        Args:
            framework: Compliance framework to check (e.g., ``"hipaa"``, ``"gdpr"``).
            config_path: Optional path to a SafeAI config file to analyze.

        Returns:
            An AdvisorResult with compliance findings and gaps.
        """
        from safeai.intelligence.compliance import ComplianceAdvisor
        from safeai.intelligence.sanitizer import MetadataSanitizer

        backend = self._ensure_ai_registry().get()
        advisor = ComplianceAdvisor(backend=backend, sanitizer=MetadataSanitizer())
        return advisor.advise(framework=framework, config_path=config_path)

    def intelligence_integrate(self, target: str = "langchain", project_path: str = ".") -> Any:
        """Get AI-powered advice for integrating SafeAI with a target framework.

        Analyzes the project and produces step-by-step integration guidance
        tailored to the specified framework.

        Args:
            target: Framework to integrate with (e.g., ``"langchain"``, ``"crewai"``).
            project_path: Path to the project directory.

        Returns:
            An AdvisorResult with integration instructions and code snippets.
        """
        from safeai.intelligence.integration import IntegrationAdvisor
        from safeai.intelligence.sanitizer import MetadataSanitizer

        backend = self._ensure_ai_registry().get()
        advisor = IntegrationAdvisor(backend=backend, sanitizer=MetadataSanitizer())
        return advisor.advise(target=target, project_path=project_path)

    def intercept_agent_message(
        self,
        *,
        message: str,
        source_agent_id: str,
        destination_agent_id: str,
        data_tags: list[str] | None = None,
        session_id: str | None = None,
        approval_request_id: str | None = None,
    ) -> dict[str, Any]:
        """Intercept an agent-to-agent message at the action boundary.

        Classifies the message body, merges detected tags with any explicitly
        provided tags, evaluates policy rules, handles approval gating, and
        emits an audit event.  The message may be allowed, redacted, or blocked.

        Args:
            message: The text message being sent between agents.
            source_agent_id: Identifier of the sending agent.
            destination_agent_id: Identifier of the receiving agent.
            data_tags: Optional explicit data tags to include alongside
                auto-detected tags.
            session_id: Optional session scope for policy and approval context.
            approval_request_id: Optional pre-existing approval request ID to
                validate instead of creating a new one.

        Returns:
            A dict with keys ``"decision"`` (action, policy_name, reason),
            ``"data_tags"``, ``"filtered_message"``, and ``"approval_request_id"``.

        Example::

            result = ai.intercept_agent_message(
                message="Patient SSN is 123-45-6789",
                source_agent_id="triage-agent",
                destination_agent_id="billing-agent",
            )
            if result["decision"]["action"] == "allow":
                send_to_agent(result["filtered_message"])
        """
        body = str(message)
        detected_tags = {item.tag for item in self.classifier.classify_text(body)}
        explicit_tags = {str(tag).strip().lower() for tag in (data_tags or []) if str(tag).strip()}
        tags = sorted(explicit_tags.union(detected_tags))
        decision = self.policy_engine.evaluate(
            PolicyContext(
                boundary="action",
                data_tags=tags,
                agent_id=source_agent_id,
                tool_name="agent_to_agent",
                action_type="agent_to_agent",
            )
        )
        approval_id: str | None = None
        if decision.action == "require_approval":
            if approval_request_id:
                validation = self.approvals.validate(
                    approval_request_id,
                    agent_id=source_agent_id,
                    tool_name="agent_to_agent",
                    session_id=session_id,
                )
                approval_id = approval_request_id
                if validation.allowed:
                    decision = decision.__class__(
                        action="allow",
                        policy_name=decision.policy_name or "approval-gate",
                        reason=f"approval request '{approval_request_id}' approved",
                    )
                elif validation.request and validation.request.status == "denied":
                    decision = decision.__class__(
                        action="block",
                        policy_name="approval-gate",
                        reason=validation.reason,
                    )
            else:
                created = self.approvals.create_request(
                    reason=decision.reason,
                    policy_name=decision.policy_name or "approval-gate",
                    agent_id=source_agent_id,
                    tool_name="agent_to_agent",
                    session_id=session_id,
                    action_type="agent_to_agent",
                    data_tags=tags,
                    metadata={"destination_agent_id": destination_agent_id},
                    dedupe_key="|".join(
                        [
                            source_agent_id,
                            destination_agent_id,
                            session_id or "-",
                            ",".join(tags),
                            str(hash(body)),
                        ]
                    ),
                )
                approval_id = created.request_id

        if decision.action == "allow":
            filtered_message = body
        elif decision.action == "redact":
            filtered_message = "[REDACTED]"
        else:
            filtered_message = ""

        self.audit.emit(
            AuditEvent(
                boundary="action",
                action=decision.action,
                policy_name=decision.policy_name,
                reason=decision.reason,
                data_tags=tags,
                agent_id=source_agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                metadata={
                    "phase": "agent_message",
                    "action_type": "agent_to_agent",
                    "message_length": len(body),
                    "filtered_length": len(filtered_message),
                    "approval_request_id": approval_id,
                    "destination_agent_id": destination_agent_id,
                },
            )
        )
        return {
            "decision": {
                "action": decision.action,
                "policy_name": decision.policy_name,
                "reason": decision.reason,
            },
            "data_tags": tags,
            "filtered_message": filtered_message,
            "approval_request_id": approval_id,
        }

    def _auto_purge_memory(self, *, trigger: str, agent_id: str) -> int:
        if not self.memory or not self.memory_auto_purge_expired:
            return 0
        purged = self.memory.purge_expired()
        if purged:
            self._emit_memory_retention_event(
                agent_id=agent_id,
                reason=f"Purged {purged} expired memory entr{'y' if purged == 1 else 'ies'}",
                metadata={"phase": "retention_purge", "trigger": trigger, "purged_count": purged},
            )
        return purged

    def _emit_memory_retention_event(
        self,
        *,
        agent_id: str,
        reason: str,
        metadata: dict[str, Any],
        action: str = "allow",
        policy_name: str | None = "memory-retention",
        data_tags: list[str] | None = None,
        session_id: str | None = None,
        source_agent_id: str | None = None,
        destination_agent_id: str | None = None,
    ) -> None:
        self.audit.emit(
            AuditEvent(
                boundary="memory",
                action=action,
                policy_name=policy_name,
                reason=reason,
                data_tags=list(data_tags or []),
                agent_id=agent_id,
                session_id=session_id,
                source_agent_id=source_agent_id,
                destination_agent_id=destination_agent_id,
                metadata=dict(metadata),
            )
        )

advanced property

advanced: 'AdvancedAPI'

Access advanced API methods (contracts, identities, capabilities, secrets, etc.).

__init__

__init__(policy_engine: PolicyEngine, classifier: Classifier, audit_logger: AuditLogger, memory_controller: MemoryController | None = None, contract_registry: ToolContractRegistry | None = None, identity_registry: AgentIdentityRegistry | None = None, capability_manager: CapabilityTokenManager | None = None, secret_manager: SecretManager | None = None, approval_manager: ApprovalManager | None = None, plugin_manager: PluginManager | None = None, memory_auto_purge_expired: bool = True) -> None

Initialize the SafeAI runtime orchestrator.

SafeAI is the central facade that wires together all boundary-enforcement components (scanning, guarding, interception, policy evaluation, auditing, memory, contracts, identities, capabilities, secrets, approvals, and plugins).

Parameters:

Name Type Description Default
policy_engine PolicyEngine

Engine that evaluates policy rules against data-tag contexts.

required
classifier Classifier

Detector-backed classifier used to tag data flowing through boundaries.

required
audit_logger AuditLogger

Logger that persists audit events to a JSONL file.

required
memory_controller MemoryController | None

Optional schema-enforced agent memory store.

None
contract_registry ToolContractRegistry | None

Optional registry of tool-level data-tag contracts.

None
identity_registry AgentIdentityRegistry | None

Optional registry of agent identity declarations.

None
capability_manager CapabilityTokenManager | None

Optional manager for scoped capability tokens.

None
secret_manager SecretManager | None

Optional secret resolution manager.

None
approval_manager ApprovalManager | None

Optional human-in-the-loop approval gate.

None
plugin_manager PluginManager | None

Optional plugin manager for third-party extensions.

None
memory_auto_purge_expired bool

If True, automatically purge expired memory entries on every read/write operation.

True
Source code in safeai/api.py
def __init__(
    self,
    policy_engine: PolicyEngine,
    classifier: Classifier,
    audit_logger: AuditLogger,
    memory_controller: MemoryController | None = None,
    contract_registry: ToolContractRegistry | None = None,
    identity_registry: AgentIdentityRegistry | None = None,
    capability_manager: CapabilityTokenManager | None = None,
    secret_manager: SecretManager | None = None,
    approval_manager: ApprovalManager | None = None,
    plugin_manager: PluginManager | None = None,
    memory_auto_purge_expired: bool = True,
) -> None:
    """Initialize the SafeAI runtime orchestrator.

    SafeAI is the central facade that wires together all boundary-enforcement
    components (scanning, guarding, interception, policy evaluation, auditing,
    memory, contracts, identities, capabilities, secrets, approvals, and plugins).

    Args:
        policy_engine: Engine that evaluates policy rules against data-tag contexts.
        classifier: Detector-backed classifier used to tag data flowing through boundaries.
        audit_logger: Logger that persists audit events to a JSONL file.
        memory_controller: Optional schema-enforced agent memory store.
        contract_registry: Optional registry of tool-level data-tag contracts.
        identity_registry: Optional registry of agent identity declarations.
        capability_manager: Optional manager for scoped capability tokens.
        secret_manager: Optional secret resolution manager.
        approval_manager: Optional human-in-the-loop approval gate.
        plugin_manager: Optional plugin manager for third-party extensions.
        memory_auto_purge_expired: If True, automatically purge expired memory
            entries on every read/write operation.
    """
    self.policy_engine = policy_engine
    self.classifier = classifier
    self.audit = audit_logger
    self.memory = memory_controller
    self.contracts = contract_registry or ToolContractRegistry()
    self.identities = identity_registry or AgentIdentityRegistry()
    self.capabilities = capability_manager or CapabilityTokenManager()
    self.secrets = secret_manager or SecretManager(capability_manager=self.capabilities)
    self.approvals = approval_manager or ApprovalManager()
    self.plugins = plugin_manager or PluginManager()
    self.templates = PolicyTemplateCatalog(plugin_manager=self.plugins)
    self.memory_auto_purge_expired = memory_auto_purge_expired
    self._ai_backends: Any = None  # Lazy: AIBackendRegistry
    self._input = InputScanner(classifier=classifier, policy_engine=policy_engine, audit_logger=audit_logger)
    self._structured = StructuredScanner(
        classifier=classifier,
        policy_engine=policy_engine,
        audit_logger=audit_logger,
    )
    self._output = OutputGuard(classifier=classifier, policy_engine=policy_engine, audit_logger=audit_logger)
    self._action = ActionInterceptor(
        policy_engine=policy_engine,
        audit_logger=audit_logger,
        contract_registry=self.contracts,
        identity_registry=self.identities,
        capability_manager=self.capabilities,
        approval_manager=self.approvals,
        classifier=classifier,
    )

quickstart classmethod

quickstart(*, block_secrets: bool = True, redact_pii: bool = True, block_pii: bool = False, custom_rules: list[dict] | None = None, audit_path: str | None = None) -> 'SafeAI'

Create a ready-to-use SafeAI instance with sensible defaults — no config files needed.

Basic usage::

from safeai import SafeAI
ai = SafeAI.quickstart()

Customise what gets enforced::

# Block PII instead of redacting it
ai = SafeAI.quickstart(block_pii=True, redact_pii=False)

# Secrets only, ignore PII
ai = SafeAI.quickstart(redact_pii=False)

# Everything off except your own rules
ai = SafeAI.quickstart(block_secrets=False, redact_pii=False, custom_rules=[
    {"name": "my-rule", "boundary": ["input"], "priority": 10,
     "condition": {"data_tags": ["secret.credential"]},
     "action": "block", "reason": "No creds allowed."},
])

Parameters:

Name Type Description Default
block_secrets bool

Block API keys, tokens, and credentials (default True).

True
redact_pii bool

Redact emails, phone numbers, SSNs in outputs (default True).

True
block_pii bool

Block PII entirely instead of redacting (default False). If both redact_pii and block_pii are True, block wins.

False
custom_rules list[dict] | None

Extra policy rules (list of dicts) added before the default-allow rules. Same format as policy YAML.

None
audit_path str | None

File path for audit log. Defaults to a temp file.

None
Source code in safeai/api.py
@classmethod
def quickstart(
    cls,
    *,
    block_secrets: bool = True,
    redact_pii: bool = True,
    block_pii: bool = False,
    custom_rules: list[dict] | None = None,
    audit_path: str | None = None,
) -> "SafeAI":
    """Create a ready-to-use SafeAI instance with sensible defaults — no config files needed.

    Basic usage::

        from safeai import SafeAI
        ai = SafeAI.quickstart()

    Customise what gets enforced::

        # Block PII instead of redacting it
        ai = SafeAI.quickstart(block_pii=True, redact_pii=False)

        # Secrets only, ignore PII
        ai = SafeAI.quickstart(redact_pii=False)

        # Everything off except your own rules
        ai = SafeAI.quickstart(block_secrets=False, redact_pii=False, custom_rules=[
            {"name": "my-rule", "boundary": ["input"], "priority": 10,
             "condition": {"data_tags": ["secret.credential"]},
             "action": "block", "reason": "No creds allowed."},
        ])

    Args:
        block_secrets: Block API keys, tokens, and credentials (default True).
        redact_pii: Redact emails, phone numbers, SSNs in outputs (default True).
        block_pii: Block PII entirely instead of redacting (default False).
                   If both redact_pii and block_pii are True, block wins.
        custom_rules: Extra policy rules (list of dicts) added before the
                      default-allow rules. Same format as policy YAML.
        audit_path: File path for audit log. Defaults to a temp file.
    """
    rules: list[dict] = []

    if block_secrets:
        rules.append({
            "name": "block-secrets-everywhere",
            "boundary": ["input", "action", "output"],
            "priority": 10,
            "condition": {"data_tags": ["secret.credential", "secret.token", "secret"]},
            "action": "block",
            "reason": "Secrets must never cross any boundary.",
        })

    if block_pii:
        rules.append({
            "name": "block-personal-data",
            "boundary": ["input", "action", "output"],
            "priority": 20,
            "condition": {"data_tags": ["personal", "personal.pii", "personal.phi", "personal.financial"]},
            "action": "block",
            "reason": "Personal data must not cross any boundary.",
        })
    elif redact_pii:
        rules.append({
            "name": "redact-personal-data-in-output",
            "boundary": ["output"],
            "priority": 20,
            "condition": {"data_tags": ["personal", "personal.pii", "personal.phi", "personal.financial"]},
            "action": "redact",
            "reason": "Personal data must not appear in outbound responses.",
        })

    if custom_rules:
        rules.extend(custom_rules)

    # Default-allow fallbacks (always last)
    for boundary in ("input", "action", "output"):
        rules.append({
            "name": f"allow-{boundary}-by-default",
            "boundary": [boundary],
            "priority": 1000,
            "action": "allow",
            "reason": "Allow when no restrictive policy matched.",
        })

    policy_engine = PolicyEngine(normalize_rules(rules))
    classifier = Classifier(patterns=list(all_detectors()))
    _audit_path = audit_path or str(Path(tempfile.gettempdir()) / "safeai-audit.jsonl")
    audit = AuditLogger(_audit_path)
    return cls(
        policy_engine=policy_engine,
        classifier=classifier,
        audit_logger=audit,
    )

from_config classmethod

from_config(path: str | Path) -> 'SafeAI'

Create a SafeAI instance from a YAML/JSON configuration file.

Loads policy rules, memory schemas, tool contracts, agent identities, plugins, audit settings, and approval configuration from the paths declared in the config file.

Parameters:

Name Type Description Default
path str | Path

Path to the SafeAI configuration file (YAML or JSON).

required

Returns:

Type Description
'SafeAI'

A fully configured SafeAI instance.

Source code in safeai/api.py
@classmethod
def from_config(cls, path: str | Path) -> "SafeAI":
    """Create a SafeAI instance from a YAML/JSON configuration file.

    Loads policy rules, memory schemas, tool contracts, agent identities,
    plugins, audit settings, and approval configuration from the paths
    declared in the config file.

    Args:
        path: Path to the SafeAI configuration file (YAML or JSON).

    Returns:
        A fully configured SafeAI instance.
    """
    cfg = load_config(path)
    config_path = Path(path).expanduser().resolve()
    policy_files, raw_rules = load_policy_bundle(config_path, cfg.paths.policy_files, version=cfg.version)
    policy_engine = PolicyEngine(normalize_rules(raw_rules))

    def _reload_rules():
        _, fresh_rules = load_policy_bundle(config_path, cfg.paths.policy_files, version=cfg.version)
        return normalize_rules(fresh_rules)

    policy_engine.register_reload(policy_files, _reload_rules)
    _, memory_docs = load_memory_bundle(config_path, cfg.paths.memory_schema_files, version=cfg.version)
    memory = MemoryController.from_documents(memory_docs) if memory_docs else None
    _, contract_docs = load_contract_bundle(config_path, cfg.paths.contract_files, version=cfg.version)
    contracts = ToolContractRegistry(normalize_contracts(contract_docs)) if contract_docs else ToolContractRegistry()
    _, identity_docs = load_identity_bundle(config_path, cfg.paths.identity_files, version=cfg.version)
    identities = (
        AgentIdentityRegistry(normalize_agent_identities(identity_docs))
        if identity_docs
        else AgentIdentityRegistry()
    )
    plugin_manager = (
        PluginManager.from_patterns(config_path=config_path, patterns=cfg.plugins.plugin_files)
        if cfg.plugins.enabled
        else PluginManager()
    )
    classifier = Classifier(patterns=[*all_detectors(), *plugin_manager.detector_patterns()])
    audit = AuditLogger(_resolve_optional_path(config_path, cfg.audit.file_path))
    capabilities = CapabilityTokenManager()
    approvals = ApprovalManager(
        file_path=_resolve_optional_path(config_path, cfg.approvals.file_path),
        default_ttl=cfg.approvals.default_ttl,
    )
    instance = cls(
        policy_engine=policy_engine,
        classifier=classifier,
        audit_logger=audit,
        memory_controller=memory,
        contract_registry=contracts,
        identity_registry=identities,
        capability_manager=capabilities,
        secret_manager=SecretManager(capability_manager=capabilities),
        approval_manager=approvals,
        plugin_manager=plugin_manager,
        memory_auto_purge_expired=cfg.memory_runtime.auto_purge_expired,
    )

    # Auto-register secret backends from config
    if cfg.secrets.enabled:
        for backend_cfg in cfg.secrets.backends:
            try:
                instance._register_secret_backend_from_config(backend_cfg)
            except Exception as exc:
                import logging
                logging.getLogger(__name__).warning(
                    "Failed to register secret backend '%s': %s", backend_cfg.name, exc
                )

    return instance

scan_input

scan_input(data: str, agent_id: str = 'unknown') -> ScanResult

Scan text data through the input boundary.

Classifies the input, evaluates policy rules, and returns a decision (allow, block, or redact) along with any detections.

Parameters:

Name Type Description Default
data str

Raw text to scan.

required
agent_id str

Identifier of the agent submitting the input.

'unknown'

Returns:

Type Description
ScanResult

ScanResult containing the policy decision, detections, and filtered text.

Source code in safeai/api.py
def scan_input(self, data: str, agent_id: str = "unknown") -> ScanResult:
    """Scan text data through the input boundary.

    Classifies the input, evaluates policy rules, and returns a decision
    (allow, block, or redact) along with any detections.

    Args:
        data: Raw text to scan.
        agent_id: Identifier of the agent submitting the input.

    Returns:
        ScanResult containing the policy decision, detections, and filtered text.
    """
    return self._input.scan(data, agent_id=agent_id)

guard_output

guard_output(data: str, agent_id: str = 'unknown') -> GuardResult

Guard text data at the output boundary.

Classifies the outbound text, evaluates policy rules, and returns a decision (allow, block, or redact) with any detections.

Parameters:

Name Type Description Default
data str

Outbound text to guard.

required
agent_id str

Identifier of the agent producing the output.

'unknown'

Returns:

Type Description
GuardResult

GuardResult containing the policy decision, detections, and filtered text.

Source code in safeai/api.py
def guard_output(self, data: str, agent_id: str = "unknown") -> GuardResult:
    """Guard text data at the output boundary.

    Classifies the outbound text, evaluates policy rules, and returns a
    decision (allow, block, or redact) with any detections.

    Args:
        data: Outbound text to guard.
        agent_id: Identifier of the agent producing the output.

    Returns:
        GuardResult containing the policy decision, detections, and filtered text.
    """
    return self._output.guard(data, agent_id=agent_id)

scan_structured_input

scan_structured_input(payload: Any, *, agent_id: str = 'unknown') -> StructuredScanResult

Scan a structured payload (dict, list, or nested object) through the input boundary.

Recursively walks the payload, classifies string values, evaluates policy rules, and returns detections with JSON-path locations.

Parameters:

Name Type Description Default
payload Any

Structured data (typically a dict or list) to scan.

required
agent_id str

Identifier of the agent submitting the payload.

'unknown'

Returns:

Type Description
StructuredScanResult

StructuredScanResult with the policy decision, path-level detections,

StructuredScanResult

and a filtered copy of the payload.

Source code in safeai/api.py
def scan_structured_input(self, payload: Any, *, agent_id: str = "unknown") -> StructuredScanResult:
    """Scan a structured payload (dict, list, or nested object) through the input boundary.

    Recursively walks the payload, classifies string values, evaluates
    policy rules, and returns detections with JSON-path locations.

    Args:
        payload: Structured data (typically a dict or list) to scan.
        agent_id: Identifier of the agent submitting the payload.

    Returns:
        StructuredScanResult with the policy decision, path-level detections,
        and a filtered copy of the payload.
    """
    return self._structured.scan(payload, agent_id=agent_id)

scan_file_input

scan_file_input(file_path: str | Path, *, agent_id: str = 'unknown') -> FileScanResult

Scan a file through the input boundary.

Supports JSON files (structured scan) and all other text files (text scan).

Parameters:

Name Type Description Default
file_path str | Path

Path to the file to scan.

required
agent_id str

Agent requesting the scan.

'unknown'

Returns:

Type Description
FileScanResult

FileScanResult with mode, decision, detections, and filtered content.

FileScanResult

Supports dict-style access for backward compatibility.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in safeai/api.py
def scan_file_input(self, file_path: str | Path, *, agent_id: str = "unknown") -> FileScanResult:
    """Scan a file through the input boundary.

    Supports JSON files (structured scan) and all other text files (text scan).

    Args:
        file_path: Path to the file to scan.
        agent_id: Agent requesting the scan.

    Returns:
        FileScanResult with mode, decision, detections, and filtered content.
        Supports dict-style access for backward compatibility.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    resolved = Path(file_path).expanduser().resolve()
    if not resolved.exists() or not resolved.is_file():
        raise FileNotFoundError(f"file not found: {resolved}")
    raw = resolved.read_bytes()
    suffix = resolved.suffix.strip().lower()
    size_bytes = len(raw)

    if suffix == ".json":
        payload = json.loads(raw.decode("utf-8", errors="strict"))
        structured = self.scan_structured_input(payload, agent_id=agent_id)
        return FileScanResult(
            mode="structured",
            file_path=str(resolved),
            size_bytes=size_bytes,
            decision={
                "action": structured.decision.action,
                "policy_name": structured.decision.policy_name,
                "reason": structured.decision.reason,
            },
            detections=[
                {
                    "path": item.path,
                    "detector": item.detector,
                    "tag": item.tag,
                    "start": item.start,
                    "end": item.end,
                }
                for item in structured.detections
            ],
            filtered=structured.filtered,
        )

    text = raw.decode("utf-8", errors="replace")
    scan = self.scan_input(text, agent_id=agent_id)
    return FileScanResult(
        mode="text",
        file_path=str(resolved),
        size_bytes=size_bytes,
        decision={
            "action": scan.decision.action,
            "policy_name": scan.decision.policy_name,
            "reason": scan.decision.reason,
        },
        detections=[
            {
                "detector": item.detector,
                "tag": item.tag,
                "start": item.start,
                "end": item.end,
            }
            for item in scan.detections
        ],
        filtered=scan.filtered,
    )

reload_policies

reload_policies() -> bool

Reload policies only when watched files changed.

Source code in safeai/api.py
def reload_policies(self) -> bool:
    """Reload policies only when watched files changed."""
    return self.policy_engine.reload_if_changed()

force_reload_policies

force_reload_policies() -> bool

Always reload policies from configured files.

Source code in safeai/api.py
def force_reload_policies(self) -> bool:
    """Always reload policies from configured files."""
    return self.policy_engine.reload()

memory_write

memory_write(key: str, value: Any, *, agent_id: str = 'unknown', strict: bool = False) -> MemoryWriteResult

Write a value to schema-enforced agent memory.

Parameters:

Name Type Description Default
key str

Field name defined in the memory schema.

required
value Any

Value to store. Must match the field's declared type.

required
agent_id str

Agent performing the write.

'unknown'
strict bool

If True, raise MemoryValidationError on failure instead of returning False.

False

Returns:

Name Type Description
A MemoryWriteResult

class:MemoryWriteResult with success and reason fields.

MemoryWriteResult

Supports bool() conversion for backward compatibility.

Source code in safeai/api.py
def memory_write(
    self, key: str, value: Any, *, agent_id: str = "unknown", strict: bool = False
) -> MemoryWriteResult:
    """Write a value to schema-enforced agent memory.

    Args:
        key: Field name defined in the memory schema.
        value: Value to store. Must match the field's declared type.
        agent_id: Agent performing the write.
        strict: If True, raise MemoryValidationError on failure instead of returning False.

    Returns:
        A :class:`MemoryWriteResult` with ``success`` and ``reason`` fields.
        Supports ``bool()`` conversion for backward compatibility.
    """
    if not self.memory:
        return MemoryWriteResult(success=False, reason="no memory configured")
    self._auto_purge_memory(trigger="memory_write", agent_id=agent_id)
    return self.memory.write(key=key, value=value, agent_id=agent_id, strict=strict)

memory_read

memory_read(key: str, *, agent_id: str = 'unknown') -> MemoryReadResult

Read a value from agent memory.

Parameters:

Name Type Description Default
key str

Field name to read.

required
agent_id str

Agent performing the read.

'unknown'

Returns:

Name Type Description
A MemoryReadResult

class:MemoryReadResult with found, value, and reason fields.

Source code in safeai/api.py
def memory_read(self, key: str, *, agent_id: str = "unknown") -> MemoryReadResult:
    """Read a value from agent memory.

    Args:
        key: Field name to read.
        agent_id: Agent performing the read.

    Returns:
        A :class:`MemoryReadResult` with ``found``, ``value``, and ``reason`` fields.
    """
    if not self.memory:
        return MemoryReadResult(found=False, reason="no memory configured")
    self._auto_purge_memory(trigger="memory_read", agent_id=agent_id)
    return self.memory.read(key=key, agent_id=agent_id)

memory_purge_expired

memory_purge_expired() -> int

Manually purge all expired entries from agent memory.

Emits an audit event when entries are purged.

Returns:

Type Description
int

The number of memory entries that were purged.

Source code in safeai/api.py
def memory_purge_expired(self) -> int:
    """Manually purge all expired entries from agent memory.

    Emits an audit event when entries are purged.

    Returns:
        The number of memory entries that were purged.
    """
    if not self.memory:
        return 0
    purged = self.memory.purge_expired()
    if purged:
        self._emit_memory_retention_event(
            agent_id="system",
            reason=f"Purged {purged} expired memory entr{'y' if purged == 1 else 'ies'}",
            metadata={"phase": "retention_purge", "trigger": "manual", "purged_count": purged},
        )
    return purged

resolve_memory_handle

resolve_memory_handle(handle_id: str, *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None) -> Any

Resolve an encrypted memory handle, subject to policy gating.

Looks up the handle metadata, evaluates action-boundary policy rules against the handle's data tag, and — if allowed — decrypts and returns the stored value. Returns None when the handle is missing, policy blocks access, or decryption fails.

Parameters:

Name Type Description Default
handle_id str

Opaque identifier returned by a previous memory write.

required
agent_id str

Agent requesting the resolution.

'unknown'
session_id str | None

Optional session scope for audit context.

None
source_agent_id str | None

Optional originating agent for multi-agent flows.

None
destination_agent_id str | None

Optional target agent for multi-agent flows.

None

Returns:

Type Description
Any

The decrypted value, or None if resolution is denied or fails.

Source code in safeai/api.py
def resolve_memory_handle(
    self,
    handle_id: str,
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
) -> Any:
    """Resolve an encrypted memory handle, subject to policy gating.

    Looks up the handle metadata, evaluates action-boundary policy rules
    against the handle's data tag, and — if allowed — decrypts and returns
    the stored value.  Returns None when the handle is missing, policy
    blocks access, or decryption fails.

    Args:
        handle_id: Opaque identifier returned by a previous memory write.
        agent_id: Agent requesting the resolution.
        session_id: Optional session scope for audit context.
        source_agent_id: Optional originating agent for multi-agent flows.
        destination_agent_id: Optional target agent for multi-agent flows.

    Returns:
        The decrypted value, or None if resolution is denied or fails.
    """
    if not self.memory:
        return None
    metadata = self.memory.handle_metadata(handle_id)
    if metadata is None:
        self._emit_memory_retention_event(
            agent_id=agent_id,
            reason=f"Memory handle '{handle_id}' not found",
            metadata={
                "phase": "handle_resolve",
                "handle_id": handle_id,
                "resolution": "missing",
            },
            action="block",
            policy_name="memory-handle",
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
        )
        return None

    decision = self.policy_engine.evaluate(
        PolicyContext(
            boundary="action",
            data_tags=[metadata["tag"]],
            agent_id=agent_id,
            tool_name="memory.resolve_handle",
        )
    )
    if decision.action in {"block", "redact", "require_approval"}:
        self._emit_memory_retention_event(
            agent_id=agent_id,
            reason=decision.reason,
            metadata={
                "phase": "handle_resolve",
                "handle_id": handle_id,
                "resolution": "policy_blocked",
                "handle_tag": metadata["tag"],
            },
            action=decision.action,
            policy_name=decision.policy_name,
            data_tags=[metadata["tag"]],
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
        )
        return None

    try:
        resolved = self.memory.resolve_handle(handle_id, agent_id=agent_id)
    except Exception as exc:
        self._emit_memory_retention_event(
            agent_id=agent_id,
            reason=str(exc),
            metadata={
                "phase": "handle_resolve",
                "handle_id": handle_id,
                "resolution": "resolve_failed",
                "handle_tag": metadata["tag"],
            },
            action="block",
            policy_name="memory-handle",
            data_tags=[metadata["tag"]],
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
        )
        return None

    self._emit_memory_retention_event(
        agent_id=agent_id,
        reason="Resolved encrypted memory handle",
        metadata={
            "phase": "handle_resolve",
            "handle_id": handle_id,
            "resolution": "allow",
            "handle_tag": metadata["tag"],
            "encrypted": True,
        },
        action="allow",
        policy_name=decision.policy_name or "memory-handle",
        data_tags=[metadata["tag"]],
        session_id=session_id,
        source_agent_id=source_agent_id,
        destination_agent_id=destination_agent_id,
    )
    return resolved

query_audit

query_audit(**filters: Any) -> list[dict[str, Any]]

Query the audit log with optional filters.

Parameters:

Name Type Description Default
**filters Any

Keyword arguments forwarded to the audit logger's query method (e.g., event_id, agent_id, boundary, last, limit).

{}

Returns:

Type Description
list[dict[str, Any]]

A list of audit event dictionaries matching the filters.

Source code in safeai/api.py
def query_audit(self, **filters: Any) -> list[dict[str, Any]]:
    """Query the audit log with optional filters.

    Args:
        **filters: Keyword arguments forwarded to the audit logger's query
            method (e.g., ``event_id``, ``agent_id``, ``boundary``, ``last``,
            ``limit``).

    Returns:
        A list of audit event dictionaries matching the filters.
    """
    return self.audit.query(**filters)

validate_tool_request

validate_tool_request(tool_name: str, data_tags: list[str]) -> ContractValidationResult

Validate a tool invocation against its registered data-tag contract.

Parameters:

Name Type Description Default
tool_name str

Name of the tool being invoked.

required
data_tags list[str]

Data tags present in the request payload.

required

Returns:

Type Description
ContractValidationResult

ContractValidationResult indicating whether the contract allows the

ContractValidationResult

given data tags.

Source code in safeai/api.py
def validate_tool_request(self, tool_name: str, data_tags: list[str]) -> ContractValidationResult:
    """Validate a tool invocation against its registered data-tag contract.

    Args:
        tool_name: Name of the tool being invoked.
        data_tags: Data tags present in the request payload.

    Returns:
        ContractValidationResult indicating whether the contract allows the
        given data tags.
    """
    return self.contracts.validate_request(tool_name=tool_name, data_tags=data_tags)

validate_agent_identity

validate_agent_identity(agent_id: str, *, tool_name: str | None = None, data_tags: list[str] | None = None) -> AgentIdentityValidationResult

Validate an agent's declared identity and permissions.

Checks that the agent is registered and, optionally, that it is permitted to invoke the specified tool or handle the given data tags.

Parameters:

Name Type Description Default
agent_id str

Identifier of the agent to validate.

required
tool_name str | None

Optional tool name to check against the agent's allowed tools.

None
data_tags list[str] | None

Optional data tags to check against the agent's allowed tags.

None

Returns:

Type Description
AgentIdentityValidationResult

AgentIdentityValidationResult with a valid flag and reason.

Source code in safeai/api.py
def validate_agent_identity(
    self,
    agent_id: str,
    *,
    tool_name: str | None = None,
    data_tags: list[str] | None = None,
) -> AgentIdentityValidationResult:
    """Validate an agent's declared identity and permissions.

    Checks that the agent is registered and, optionally, that it is
    permitted to invoke the specified tool or handle the given data tags.

    Args:
        agent_id: Identifier of the agent to validate.
        tool_name: Optional tool name to check against the agent's allowed tools.
        data_tags: Optional data tags to check against the agent's allowed tags.

    Returns:
        AgentIdentityValidationResult with a valid flag and reason.
    """
    return self.identities.validate(agent_id=agent_id, tool_name=tool_name, data_tags=data_tags)

issue_capability_token

issue_capability_token(*, agent_id: str, tool_name: str, actions: list[str], ttl: str = '10m', secret_keys: list[str] | None = None, session_id: str | None = None, metadata: dict[str, Any] | None = None) -> Any

Issue a scoped, time-limited capability token to an agent.

Capability tokens grant an agent permission to perform specific actions on a specific tool, optionally scoped to a session and a set of secret keys.

Parameters:

Name Type Description Default
agent_id str

Agent the token is issued to.

required
tool_name str

Tool the token grants access to.

required
actions list[str]

List of permitted actions (e.g., ["invoke", "read"]).

required
ttl str

Time-to-live string (e.g., "10m", "1h").

'10m'
secret_keys list[str] | None

Optional list of secret keys the token may resolve.

None
session_id str | None

Optional session scope for the token.

None
metadata dict[str, Any] | None

Optional extra metadata stored with the token.

None

Returns:

Type Description
Any

The issued capability token object.

Example::

token = ai.issue_capability_token(
    agent_id="data-agent",
    tool_name="db_query",
    actions=["invoke"],
    ttl="5m",
    secret_keys=["DB_PASSWORD"],
)
result = ai.validate_capability_token(
    token.token_id,
    agent_id="data-agent",
    tool_name="db_query",
)
Source code in safeai/api.py
def issue_capability_token(
    self,
    *,
    agent_id: str,
    tool_name: str,
    actions: list[str],
    ttl: str = "10m",
    secret_keys: list[str] | None = None,
    session_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> Any:
    """Issue a scoped, time-limited capability token to an agent.

    Capability tokens grant an agent permission to perform specific actions
    on a specific tool, optionally scoped to a session and a set of secret
    keys.

    Args:
        agent_id: Agent the token is issued to.
        tool_name: Tool the token grants access to.
        actions: List of permitted actions (e.g., ``["invoke", "read"]``).
        ttl: Time-to-live string (e.g., ``"10m"``, ``"1h"``).
        secret_keys: Optional list of secret keys the token may resolve.
        session_id: Optional session scope for the token.
        metadata: Optional extra metadata stored with the token.

    Returns:
        The issued capability token object.

    Example::

        token = ai.issue_capability_token(
            agent_id="data-agent",
            tool_name="db_query",
            actions=["invoke"],
            ttl="5m",
            secret_keys=["DB_PASSWORD"],
        )
        result = ai.validate_capability_token(
            token.token_id,
            agent_id="data-agent",
            tool_name="db_query",
        )
    """
    return self.capabilities.issue(
        agent_id=agent_id,
        tool_name=tool_name,
        actions=actions,
        ttl=ttl,
        secret_keys=secret_keys,
        session_id=session_id,
        metadata=metadata,
    )

validate_capability_token

validate_capability_token(token_id: str, *, agent_id: str, tool_name: str, action: str = 'invoke', session_id: str | None = None) -> CapabilityValidationResult

Validate a capability token for a specific agent, tool, and action.

Parameters:

Name Type Description Default
token_id str

Identifier of the capability token to validate.

required
agent_id str

Agent presenting the token.

required
tool_name str

Tool the agent wants to use.

required
action str

Action the agent wants to perform (default "invoke").

'invoke'
session_id str | None

Optional session scope to validate against.

None

Returns:

Type Description
CapabilityValidationResult

CapabilityValidationResult indicating whether the token is valid.

Source code in safeai/api.py
def validate_capability_token(
    self,
    token_id: str,
    *,
    agent_id: str,
    tool_name: str,
    action: str = "invoke",
    session_id: str | None = None,
) -> CapabilityValidationResult:
    """Validate a capability token for a specific agent, tool, and action.

    Args:
        token_id: Identifier of the capability token to validate.
        agent_id: Agent presenting the token.
        tool_name: Tool the agent wants to use.
        action: Action the agent wants to perform (default ``"invoke"``).
        session_id: Optional session scope to validate against.

    Returns:
        CapabilityValidationResult indicating whether the token is valid.
    """
    return self.capabilities.validate(
        token_id,
        agent_id=agent_id,
        tool_name=tool_name,
        action=action,
        session_id=session_id,
    )

revoke_capability_token

revoke_capability_token(token_id: str) -> bool

Revoke a previously issued capability token.

Parameters:

Name Type Description Default
token_id str

Identifier of the token to revoke.

required

Returns:

Type Description
bool

True if the token was found and revoked, False otherwise.

Source code in safeai/api.py
def revoke_capability_token(self, token_id: str) -> bool:
    """Revoke a previously issued capability token.

    Args:
        token_id: Identifier of the token to revoke.

    Returns:
        True if the token was found and revoked, False otherwise.
    """
    return self.capabilities.revoke(token_id)

purge_expired_capability_tokens

purge_expired_capability_tokens() -> int

Remove all expired capability tokens from the token store.

Returns:

Type Description
int

The number of tokens that were purged.

Source code in safeai/api.py
def purge_expired_capability_tokens(self) -> int:
    """Remove all expired capability tokens from the token store.

    Returns:
        The number of tokens that were purged.
    """
    return self.capabilities.purge_expired()

list_approval_requests

list_approval_requests(*, status: str | None = None, agent_id: str | None = None, tool_name: str | None = None, newest_first: bool = True, limit: int = 100) -> list[ApprovalRequest]

List human-in-the-loop approval requests.

Parameters:

Name Type Description Default
status str | None

Filter by status ("pending", "approved", "denied", or "expired"). None returns all statuses.

None
agent_id str | None

Filter by the requesting agent.

None
tool_name str | None

Filter by the tool the request targets.

None
newest_first bool

If True, return newest requests first.

True
limit int

Maximum number of requests to return.

100

Returns:

Type Description
list[ApprovalRequest]

A list of ApprovalRequest objects matching the filters.

Source code in safeai/api.py
def list_approval_requests(
    self,
    *,
    status: str | None = None,
    agent_id: str | None = None,
    tool_name: str | None = None,
    newest_first: bool = True,
    limit: int = 100,
) -> list[ApprovalRequest]:
    """List human-in-the-loop approval requests.

    Args:
        status: Filter by status (``"pending"``, ``"approved"``, ``"denied"``,
            or ``"expired"``). None returns all statuses.
        agent_id: Filter by the requesting agent.
        tool_name: Filter by the tool the request targets.
        newest_first: If True, return newest requests first.
        limit: Maximum number of requests to return.

    Returns:
        A list of ApprovalRequest objects matching the filters.
    """
    typed_status = status if status in {"pending", "approved", "denied", "expired"} else None
    return self.approvals.list_requests(
        status=typed_status,  # type: ignore[arg-type]
        agent_id=agent_id,
        tool_name=tool_name,
        newest_first=newest_first,
        limit=limit,
    )

approve_request

approve_request(request_id: str, *, approver_id: str, note: str | None = None) -> bool

Approve a pending approval request.

Parameters:

Name Type Description Default
request_id str

Identifier of the approval request.

required
approver_id str

Identifier of the human or system approving the request.

required
note str | None

Optional free-text note attached to the approval.

None

Returns:

Type Description
bool

True if the request was successfully approved, False otherwise.

Source code in safeai/api.py
def approve_request(self, request_id: str, *, approver_id: str, note: str | None = None) -> bool:
    """Approve a pending approval request.

    Args:
        request_id: Identifier of the approval request.
        approver_id: Identifier of the human or system approving the request.
        note: Optional free-text note attached to the approval.

    Returns:
        True if the request was successfully approved, False otherwise.
    """
    return self.approvals.approve(request_id, approver_id=approver_id, note=note)

deny_request

deny_request(request_id: str, *, approver_id: str, note: str | None = None) -> bool

Deny a pending approval request.

Parameters:

Name Type Description Default
request_id str

Identifier of the approval request.

required
approver_id str

Identifier of the human or system denying the request.

required
note str | None

Optional free-text note attached to the denial.

None

Returns:

Type Description
bool

True if the request was successfully denied, False otherwise.

Source code in safeai/api.py
def deny_request(self, request_id: str, *, approver_id: str, note: str | None = None) -> bool:
    """Deny a pending approval request.

    Args:
        request_id: Identifier of the approval request.
        approver_id: Identifier of the human or system denying the request.
        note: Optional free-text note attached to the denial.

    Returns:
        True if the request was successfully denied, False otherwise.
    """
    return self.approvals.deny(request_id, approver_id=approver_id, note=note)

register_secret_backend

register_secret_backend(name: str, backend: SecretBackend, *, replace: bool = False) -> None

Register a named secret backend for secret resolution.

Parameters:

Name Type Description Default
name str

Unique name for the backend (e.g., "vault", "env").

required
backend SecretBackend

A SecretBackend implementation that can resolve secret keys.

required
replace bool

If True, replace an existing backend with the same name.

False
Source code in safeai/api.py
def register_secret_backend(
    self,
    name: str,
    backend: SecretBackend,
    *,
    replace: bool = False,
) -> None:
    """Register a named secret backend for secret resolution.

    Args:
        name: Unique name for the backend (e.g., ``"vault"``, ``"env"``).
        backend: A SecretBackend implementation that can resolve secret keys.
        replace: If True, replace an existing backend with the same name.
    """
    self.secrets.register_backend(name, backend, replace=replace)

list_secret_backends

list_secret_backends() -> list[str]

List the names of all registered secret backends.

Returns:

Type Description
list[str]

A list of backend name strings.

Source code in safeai/api.py
def list_secret_backends(self) -> list[str]:
    """List the names of all registered secret backends.

    Returns:
        A list of backend name strings.
    """
    return self.secrets.list_backends()

resolve_secret

resolve_secret(*, token_id: str, secret_key: str, agent_id: str, tool_name: str, action: str = 'invoke', session_id: str | None = None, backend: str = 'env') -> ResolvedSecret

Resolve a single secret key using a capability token, with full audit logging.

Validates the capability token, retrieves the secret from the specified backend, and emits an audit event recording the outcome.

Parameters:

Name Type Description Default
token_id str

Capability token authorizing the secret access.

required
secret_key str

Key of the secret to resolve (e.g., "DB_PASSWORD").

required
agent_id str

Agent requesting the secret.

required
tool_name str

Tool the secret is being resolved for.

required
action str

Capability action to validate (default "invoke").

'invoke'
session_id str | None

Optional session scope for token validation.

None
backend str

Name of the secret backend to use (default "env").

'env'

Returns:

Type Description
ResolvedSecret

ResolvedSecret containing the secret value and metadata.

Raises:

Type Description
SecretAccessDeniedError

If the capability token is invalid or does not authorize the requested secret.

SecretNotFoundError

If the secret key does not exist in the backend.

Example::

token = ai.issue_capability_token(
    agent_id="worker",
    tool_name="api_call",
    actions=["invoke"],
    secret_keys=["API_KEY"],
)
secret = ai.resolve_secret(
    token_id=token.token_id,
    secret_key="API_KEY",
    agent_id="worker",
    tool_name="api_call",
)
Source code in safeai/api.py
def resolve_secret(
    self,
    *,
    token_id: str,
    secret_key: str,
    agent_id: str,
    tool_name: str,
    action: str = "invoke",
    session_id: str | None = None,
    backend: str = "env",
) -> ResolvedSecret:
    """Resolve a single secret key using a capability token, with full audit logging.

    Validates the capability token, retrieves the secret from the specified
    backend, and emits an audit event recording the outcome.

    Args:
        token_id: Capability token authorizing the secret access.
        secret_key: Key of the secret to resolve (e.g., ``"DB_PASSWORD"``).
        agent_id: Agent requesting the secret.
        tool_name: Tool the secret is being resolved for.
        action: Capability action to validate (default ``"invoke"``).
        session_id: Optional session scope for token validation.
        backend: Name of the secret backend to use (default ``"env"``).

    Returns:
        ResolvedSecret containing the secret value and metadata.

    Raises:
        SecretAccessDeniedError: If the capability token is invalid or
            does not authorize the requested secret.
        SecretNotFoundError: If the secret key does not exist in the backend.

    Example::

        token = ai.issue_capability_token(
            agent_id="worker",
            tool_name="api_call",
            actions=["invoke"],
            secret_keys=["API_KEY"],
        )
        secret = ai.resolve_secret(
            token_id=token.token_id,
            secret_key="API_KEY",
            agent_id="worker",
            tool_name="api_call",
        )
    """
    try:
        resolved = self.secrets.resolve_secret(
            token_id=token_id,
            secret_key=secret_key,
            agent_id=agent_id,
            tool_name=tool_name,
            action=action,
            session_id=session_id,
            backend=backend,
        )
    except SecretError as exc:
        event_action = "block" if isinstance(exc, SecretAccessDeniedError) else "deny"
        self.audit.emit(
            AuditEvent(
                boundary="action",
                action=event_action,
                policy_name="secret-manager",
                reason=str(exc),
                data_tags=["secret"],
                agent_id=agent_id,
                tool_name=tool_name,
                session_id=session_id,
                metadata={
                    "phase": "secret_resolve",
                    "secret_backend": backend,
                    "secret_key": secret_key,
                    "capability_token_id": token_id,
                    "result": "error",
                },
            )
        )
        raise
    self.audit.emit(
        AuditEvent(
            boundary="action",
            action="allow",
            policy_name="secret-manager",
            reason="secret resolved by scoped capability",
            data_tags=["secret"],
            agent_id=agent_id,
            tool_name=tool_name,
            session_id=session_id,
            metadata={
                "phase": "secret_resolve",
                "secret_backend": backend,
                "secret_key": secret_key,
                "capability_token_id": token_id,
                "result": "allow",
            },
        )
    )
    return resolved

resolve_secrets

resolve_secrets(*, token_id: str, secret_keys: list[str], agent_id: str, tool_name: str, action: str = 'invoke', session_id: str | None = None, backend: str = 'env') -> dict[str, ResolvedSecret]

Resolve multiple secret keys in a single call.

Iterates over the requested keys, resolving each via resolve_secret. If any key is not found, raises SecretNotFoundError after attempting all.

Parameters:

Name Type Description Default
token_id str

Capability token authorizing the secret access.

required
secret_keys list[str]

List of secret keys to resolve.

required
agent_id str

Agent requesting the secrets.

required
tool_name str

Tool the secrets are being resolved for.

required
action str

Capability action to validate (default "invoke").

'invoke'
session_id str | None

Optional session scope for token validation.

None
backend str

Name of the secret backend to use (default "env").

'env'

Returns:

Type Description
dict[str, ResolvedSecret]

A dict mapping each secret key to its ResolvedSecret.

Raises:

Type Description
SecretNotFoundError

If one or more keys could not be found.

Source code in safeai/api.py
def resolve_secrets(
    self,
    *,
    token_id: str,
    secret_keys: list[str],
    agent_id: str,
    tool_name: str,
    action: str = "invoke",
    session_id: str | None = None,
    backend: str = "env",
) -> dict[str, ResolvedSecret]:
    """Resolve multiple secret keys in a single call.

    Iterates over the requested keys, resolving each via ``resolve_secret``.
    If any key is not found, raises SecretNotFoundError after attempting all.

    Args:
        token_id: Capability token authorizing the secret access.
        secret_keys: List of secret keys to resolve.
        agent_id: Agent requesting the secrets.
        tool_name: Tool the secrets are being resolved for.
        action: Capability action to validate (default ``"invoke"``).
        session_id: Optional session scope for token validation.
        backend: Name of the secret backend to use (default ``"env"``).

    Returns:
        A dict mapping each secret key to its ResolvedSecret.

    Raises:
        SecretNotFoundError: If one or more keys could not be found.
    """
    rows: dict[str, ResolvedSecret] = {}
    missing: list[str] = []
    for key in secret_keys:
        try:
            rows[key] = self.resolve_secret(
                token_id=token_id,
                secret_key=key,
                agent_id=agent_id,
                tool_name=tool_name,
                action=action,
                session_id=session_id,
                backend=backend,
            )
        except SecretNotFoundError:
            missing.append(key)
            continue
    if missing:
        raise SecretNotFoundError(
            f"Unable to resolve secret key(s) from backend '{backend}': {','.join(sorted(set(missing)))}"
        )
    return rows

intercept_tool_request

intercept_tool_request(tool_name: str, parameters: dict[str, Any], data_tags: list[str], *, agent_id: str = 'unknown', session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, action_type: str | None = None, capability_token_id: str | None = None, capability_action: str = 'invoke', approval_request_id: str | None = None) -> InterceptResult

Intercept a tool invocation at the action boundary.

Runs the full interception pipeline: policy evaluation, contract validation, identity checks, capability-token verification, and approval gating. Returns a decision (allow, block, redact, or require_approval) with audit logging.

Parameters:

Name Type Description Default
tool_name str

Name of the tool being invoked.

required
parameters dict[str, Any]

Parameters the agent is passing to the tool.

required
data_tags list[str]

Data tags present in the request payload.

required
agent_id str

Identifier of the invoking agent.

'unknown'
session_id str | None

Optional session scope for the request.

None
source_agent_id str | None

Optional originating agent in multi-agent flows.

None
destination_agent_id str | None

Optional target agent in multi-agent flows.

None
action_type str | None

Optional label for the kind of action (e.g., "tool_call").

None
capability_token_id str | None

Optional capability token authorizing the call.

None
capability_action str

Action to validate on the token (default "invoke").

'invoke'
approval_request_id str | None

Optional pre-existing approval request to validate.

None

Returns:

Type Description
InterceptResult

InterceptResult with the decision, detections, and filtered parameters.

Example::

result = ai.intercept_tool_request(
    tool_name="send_email",
    parameters={"to": "user@example.com", "body": "Hello"},
    data_tags=["personal.pii"],
    agent_id="assistant",
)
if result.decision.action == "allow":
    send_email(**result.filtered_parameters)
Source code in safeai/api.py
def intercept_tool_request(
    self,
    tool_name: str,
    parameters: dict[str, Any],
    data_tags: list[str],
    *,
    agent_id: str = "unknown",
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    action_type: str | None = None,
    capability_token_id: str | None = None,
    capability_action: str = "invoke",
    approval_request_id: str | None = None,
) -> InterceptResult:
    """Intercept a tool invocation at the action boundary.

    Runs the full interception pipeline: policy evaluation, contract
    validation, identity checks, capability-token verification, and
    approval gating.  Returns a decision (allow, block, redact, or
    require_approval) with audit logging.

    Args:
        tool_name: Name of the tool being invoked.
        parameters: Parameters the agent is passing to the tool.
        data_tags: Data tags present in the request payload.
        agent_id: Identifier of the invoking agent.
        session_id: Optional session scope for the request.
        source_agent_id: Optional originating agent in multi-agent flows.
        destination_agent_id: Optional target agent in multi-agent flows.
        action_type: Optional label for the kind of action (e.g., ``"tool_call"``).
        capability_token_id: Optional capability token authorizing the call.
        capability_action: Action to validate on the token (default ``"invoke"``).
        approval_request_id: Optional pre-existing approval request to validate.

    Returns:
        InterceptResult with the decision, detections, and filtered parameters.

    Example::

        result = ai.intercept_tool_request(
            tool_name="send_email",
            parameters={"to": "user@example.com", "body": "Hello"},
            data_tags=["personal.pii"],
            agent_id="assistant",
        )
        if result.decision.action == "allow":
            send_email(**result.filtered_parameters)
    """
    return self._action.intercept_request(
        ToolCall(
            tool_name=tool_name,
            agent_id=agent_id,
            parameters=dict(parameters),
            data_tags=list(data_tags),
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type=action_type,
            capability_token_id=capability_token_id,
            capability_action=capability_action,
            approval_request_id=approval_request_id,
        )
    )

intercept_tool_response

intercept_tool_response(tool_name: str, response: dict[str, Any], *, agent_id: str = 'unknown', request_data_tags: list[str] | None = None, session_id: str | None = None, source_agent_id: str | None = None, destination_agent_id: str | None = None, action_type: str | None = None) -> ResponseInterceptResult

Intercept a tool's response at the action boundary.

Classifies the response payload, evaluates policy rules, and returns a decision with optional redaction of sensitive fields.

Parameters:

Name Type Description Default
tool_name str

Name of the tool that produced the response.

required
response dict[str, Any]

The tool's response payload as a dict.

required
agent_id str

Identifier of the agent receiving the response.

'unknown'
request_data_tags list[str] | None

Data tags from the original request, for context.

None
session_id str | None

Optional session scope.

None
source_agent_id str | None

Optional originating agent in multi-agent flows.

None
destination_agent_id str | None

Optional target agent in multi-agent flows.

None
action_type str | None

Optional label for the kind of action.

None

Returns:

Type Description
ResponseInterceptResult

ResponseInterceptResult with the decision and filtered response.

Source code in safeai/api.py
def intercept_tool_response(
    self,
    tool_name: str,
    response: dict[str, Any],
    *,
    agent_id: str = "unknown",
    request_data_tags: list[str] | None = None,
    session_id: str | None = None,
    source_agent_id: str | None = None,
    destination_agent_id: str | None = None,
    action_type: str | None = None,
) -> ResponseInterceptResult:
    """Intercept a tool's response at the action boundary.

    Classifies the response payload, evaluates policy rules, and returns
    a decision with optional redaction of sensitive fields.

    Args:
        tool_name: Name of the tool that produced the response.
        response: The tool's response payload as a dict.
        agent_id: Identifier of the agent receiving the response.
        request_data_tags: Data tags from the original request, for context.
        session_id: Optional session scope.
        source_agent_id: Optional originating agent in multi-agent flows.
        destination_agent_id: Optional target agent in multi-agent flows.
        action_type: Optional label for the kind of action.

    Returns:
        ResponseInterceptResult with the decision and filtered response.
    """
    return self._action.intercept_response(
        ToolCall(
            tool_name=tool_name,
            agent_id=agent_id,
            parameters={},
            data_tags=list(request_data_tags or []),
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            action_type=action_type,
        ),
        dict(response),
    )

wrap

wrap(fn: Any) -> Any

Wrap a function for use with framework adapters.

Parameters:

Name Type Description Default
fn Any

The callable to wrap.

required

Returns:

Type Description
Any

A wrapped callable that delegates to the original function.

Source code in safeai/api.py
def wrap(self, fn: Any) -> Any:
    """Wrap a function for use with framework adapters.

    Args:
        fn: The callable to wrap.

    Returns:
        A wrapped callable that delegates to the original function.
    """

    def _wrapped(*args: Any, **kwargs: Any) -> Any:
        return fn(*args, **kwargs)

    return _wrapped

langchain_adapter

langchain_adapter()

Return a LangChain adapter bound to this SafeAI instance.

Source code in safeai/api.py
def langchain_adapter(self):
    """Return a LangChain adapter bound to this SafeAI instance."""
    from safeai.middleware.langchain import SafeAILangChainAdapter

    return SafeAILangChainAdapter(self)

claude_adk_adapter

claude_adk_adapter()

Return a Claude ADK adapter bound to this SafeAI instance.

Source code in safeai/api.py
def claude_adk_adapter(self):
    """Return a Claude ADK adapter bound to this SafeAI instance."""
    from safeai.middleware.claude_adk import SafeAIClaudeADKAdapter

    return SafeAIClaudeADKAdapter(self)

google_adk_adapter

google_adk_adapter()

Return a Google ADK adapter bound to this SafeAI instance.

Source code in safeai/api.py
def google_adk_adapter(self):
    """Return a Google ADK adapter bound to this SafeAI instance."""
    from safeai.middleware.google_adk import SafeAIGoogleADKAdapter

    return SafeAIGoogleADKAdapter(self)

crewai_adapter

crewai_adapter()

Return a CrewAI adapter bound to this SafeAI instance.

Source code in safeai/api.py
def crewai_adapter(self):
    """Return a CrewAI adapter bound to this SafeAI instance."""
    from safeai.middleware.crewai import SafeAICrewAIAdapter

    return SafeAICrewAIAdapter(self)

autogen_adapter

autogen_adapter()

Return an AutoGen adapter bound to this SafeAI instance.

Source code in safeai/api.py
def autogen_adapter(self):
    """Return an AutoGen adapter bound to this SafeAI instance."""
    from safeai.middleware.autogen import SafeAIAutoGenAdapter

    return SafeAIAutoGenAdapter(self)

list_plugins

list_plugins() -> list[dict[str, Any]]

List all loaded plugins and their metadata.

Returns:

Type Description
list[dict[str, Any]]

A list of dicts, each describing a loaded plugin.

Source code in safeai/api.py
def list_plugins(self) -> list[dict[str, Any]]:
    """List all loaded plugins and their metadata.

    Returns:
        A list of dicts, each describing a loaded plugin.
    """
    return self.plugins.list_plugins()

list_plugin_adapters

list_plugin_adapters() -> list[str]

List the names of all adapter classes provided by loaded plugins.

Returns:

Type Description
list[str]

A list of adapter name strings.

Source code in safeai/api.py
def list_plugin_adapters(self) -> list[str]:
    """List the names of all adapter classes provided by loaded plugins.

    Returns:
        A list of adapter name strings.
    """
    return self.plugins.adapter_names()

plugin_adapter

plugin_adapter(name: str) -> Any

Build and return a plugin adapter instance by name.

Parameters:

Name Type Description Default
name str

Name of the adapter to instantiate.

required

Returns:

Type Description
Any

An adapter instance bound to this SafeAI runtime.

Source code in safeai/api.py
def plugin_adapter(self, name: str) -> Any:
    """Build and return a plugin adapter instance by name.

    Args:
        name: Name of the adapter to instantiate.

    Returns:
        An adapter instance bound to this SafeAI runtime.
    """
    return self.plugins.build_adapter(name, self)

list_policy_templates

list_policy_templates() -> list[dict[str, Any]]

List all available policy templates from the built-in catalog and plugins.

Returns:

Type Description
list[dict[str, Any]]

A list of dicts, each describing a policy template with its name,

list[dict[str, Any]]

description, and tags.

Source code in safeai/api.py
def list_policy_templates(self) -> list[dict[str, Any]]:
    """List all available policy templates from the built-in catalog and plugins.

    Returns:
        A list of dicts, each describing a policy template with its name,
        description, and tags.
    """
    return self.templates.list_templates()

load_policy_template

load_policy_template(name: str) -> dict[str, Any]

Load the full content of a policy template by name.

Parameters:

Name Type Description Default
name str

Name of the template to load.

required

Returns:

Type Description
dict[str, Any]

A dict containing the template's rules, metadata, and description.

Source code in safeai/api.py
def load_policy_template(self, name: str) -> dict[str, Any]:
    """Load the full content of a policy template by name.

    Args:
        name: Name of the template to load.

    Returns:
        A dict containing the template's rules, metadata, and description.
    """
    return self.templates.load(name)

search_policy_templates

search_policy_templates(**kwargs: Any) -> list[dict[str, Any]]

Search policy templates by tags, keywords, or other criteria.

Parameters:

Name Type Description Default
**kwargs Any

Search filters forwarded to the template catalog's search method (e.g., tags, keyword).

{}

Returns:

Type Description
list[dict[str, Any]]

A list of matching template metadata dicts.

Source code in safeai/api.py
def search_policy_templates(self, **kwargs: Any) -> list[dict[str, Any]]:
    """Search policy templates by tags, keywords, or other criteria.

    Args:
        **kwargs: Search filters forwarded to the template catalog's search
            method (e.g., ``tags``, ``keyword``).

    Returns:
        A list of matching template metadata dicts.
    """
    return self.templates.search(**kwargs)

install_policy_template

install_policy_template(name: str) -> str

Install a policy template into the current project.

Writes the template's policy YAML file into the project's policy directory so it is loaded on next initialization.

Parameters:

Name Type Description Default
name str

Name of the template to install.

required

Returns:

Type Description
str

The file path where the template was written.

Source code in safeai/api.py
def install_policy_template(self, name: str) -> str:
    """Install a policy template into the current project.

    Writes the template's policy YAML file into the project's policy
    directory so it is loaded on next initialization.

    Args:
        name: Name of the template to install.

    Returns:
        The file path where the template was written.
    """
    return self.templates.install(name)

register_ai_backend

register_ai_backend(name: str, backend: Any, *, default: bool = True) -> None

Register an AI backend for the intelligence layer.

Parameters:

Name Type Description Default
name str

Unique name for the backend (e.g., "openai", "anthropic").

required
backend Any

An AI backend instance implementing the backend protocol.

required
default bool

If True, set this backend as the default for intelligence calls.

True
Source code in safeai/api.py
def register_ai_backend(self, name: str, backend: Any, *, default: bool = True) -> None:
    """Register an AI backend for the intelligence layer.

    Args:
        name: Unique name for the backend (e.g., ``"openai"``, ``"anthropic"``).
        backend: An AI backend instance implementing the backend protocol.
        default: If True, set this backend as the default for intelligence calls.
    """
    registry = self._ensure_ai_registry()
    registry.register(name, backend, default=default)

list_ai_backends

list_ai_backends() -> list[str]

List the names of all registered AI backends.

Returns:

Type Description
list[str]

A list of backend name strings.

Source code in safeai/api.py
def list_ai_backends(self) -> list[str]:
    """List the names of all registered AI backends.

    Returns:
        A list of backend name strings.
    """
    return self._ensure_ai_registry().list_backends()

intelligence_auto_config

intelligence_auto_config(project_path: str = '.', framework_hint: str | None = None) -> Any

Auto-generate SafeAI configuration for a project using AI analysis.

Scans the project structure and, optionally, uses a framework hint to produce recommended policy rules, contracts, and identity declarations.

Parameters:

Name Type Description Default
project_path str

Path to the project directory to analyze.

'.'
framework_hint str | None

Optional framework name (e.g., "langchain") to tailor the recommendations.

None

Returns:

Type Description
Any

An AdvisorResult containing the generated configuration advice.

Source code in safeai/api.py
def intelligence_auto_config(
    self, project_path: str = ".", framework_hint: str | None = None
) -> Any:
    """Auto-generate SafeAI configuration for a project using AI analysis.

    Scans the project structure and, optionally, uses a framework hint to
    produce recommended policy rules, contracts, and identity declarations.

    Args:
        project_path: Path to the project directory to analyze.
        framework_hint: Optional framework name (e.g., ``"langchain"``) to
            tailor the recommendations.

    Returns:
        An AdvisorResult containing the generated configuration advice.
    """
    from safeai.intelligence.auto_config import AutoConfigAdvisor
    from safeai.intelligence.sanitizer import MetadataSanitizer

    backend = self._ensure_ai_registry().get()
    advisor = AutoConfigAdvisor(backend=backend, sanitizer=MetadataSanitizer())
    return advisor.advise(project_path=project_path, framework_hint=framework_hint)

intelligence_recommend

intelligence_recommend(since: str = '7d') -> Any

Generate policy recommendations based on recent audit events.

Analyzes audit history from the specified time window and uses the AI backend to suggest policy improvements.

Parameters:

Name Type Description Default
since str

Time window for audit events (e.g., "7d", "24h").

'7d'

Returns:

Type Description
Any

An AdvisorResult containing recommended policy changes.

Source code in safeai/api.py
def intelligence_recommend(self, since: str = "7d") -> Any:
    """Generate policy recommendations based on recent audit events.

    Analyzes audit history from the specified time window and uses the AI
    backend to suggest policy improvements.

    Args:
        since: Time window for audit events (e.g., ``"7d"``, ``"24h"``).

    Returns:
        An AdvisorResult containing recommended policy changes.
    """
    from safeai.intelligence.recommender import RecommenderAdvisor
    from safeai.intelligence.sanitizer import MetadataSanitizer

    backend = self._ensure_ai_registry().get()
    sanitizer = MetadataSanitizer()
    events = self.query_audit(last=since)
    advisor = RecommenderAdvisor(backend=backend, sanitizer=sanitizer)
    return advisor.advise(events=events)

intelligence_explain

intelligence_explain(event_id: str) -> Any

Explain a specific audit event using AI-powered incident analysis.

Retrieves the event and surrounding context, then asks the AI backend to produce a human-readable explanation of what happened and why.

Parameters:

Name Type Description Default
event_id str

Identifier of the audit event to explain.

required

Returns:

Type Description
Any

An AdvisorResult with the incident explanation, or an error result

Any

if the event is not found.

Source code in safeai/api.py
def intelligence_explain(self, event_id: str) -> Any:
    """Explain a specific audit event using AI-powered incident analysis.

    Retrieves the event and surrounding context, then asks the AI backend
    to produce a human-readable explanation of what happened and why.

    Args:
        event_id: Identifier of the audit event to explain.

    Returns:
        An AdvisorResult with the incident explanation, or an error result
        if the event is not found.
    """
    from safeai.intelligence.incident import IncidentAdvisor
    from safeai.intelligence.sanitizer import MetadataSanitizer

    backend = self._ensure_ai_registry().get()
    sanitizer = MetadataSanitizer()
    events = self.query_audit(event_id=event_id)
    target = events[0] if events else None
    if not target:
        from safeai.intelligence.advisor import AdvisorResult

        return AdvisorResult(
            advisor_name="incident",
            status="error",
            summary=f"Event '{event_id}' not found.",
        )
    # Get surrounding context
    context_events = self.query_audit(last="1h", limit=5)
    advisor = IncidentAdvisor(backend=backend, sanitizer=sanitizer)
    return advisor.advise(event=target, context_events=context_events)

intelligence_compliance

intelligence_compliance(framework: str = 'hipaa', config_path: str | None = None) -> Any

Check current SafeAI configuration against a compliance framework.

Uses the AI backend to evaluate whether the loaded policies satisfy the requirements of the specified compliance framework.

Parameters:

Name Type Description Default
framework str

Compliance framework to check (e.g., "hipaa", "gdpr").

'hipaa'
config_path str | None

Optional path to a SafeAI config file to analyze.

None

Returns:

Type Description
Any

An AdvisorResult with compliance findings and gaps.

Source code in safeai/api.py
def intelligence_compliance(
    self, framework: str = "hipaa", config_path: str | None = None
) -> Any:
    """Check current SafeAI configuration against a compliance framework.

    Uses the AI backend to evaluate whether the loaded policies satisfy
    the requirements of the specified compliance framework.

    Args:
        framework: Compliance framework to check (e.g., ``"hipaa"``, ``"gdpr"``).
        config_path: Optional path to a SafeAI config file to analyze.

    Returns:
        An AdvisorResult with compliance findings and gaps.
    """
    from safeai.intelligence.compliance import ComplianceAdvisor
    from safeai.intelligence.sanitizer import MetadataSanitizer

    backend = self._ensure_ai_registry().get()
    advisor = ComplianceAdvisor(backend=backend, sanitizer=MetadataSanitizer())
    return advisor.advise(framework=framework, config_path=config_path)

intelligence_integrate

intelligence_integrate(target: str = 'langchain', project_path: str = '.') -> Any

Get AI-powered advice for integrating SafeAI with a target framework.

Analyzes the project and produces step-by-step integration guidance tailored to the specified framework.

Parameters:

Name Type Description Default
target str

Framework to integrate with (e.g., "langchain", "crewai").

'langchain'
project_path str

Path to the project directory.

'.'

Returns:

Type Description
Any

An AdvisorResult with integration instructions and code snippets.

Source code in safeai/api.py
def intelligence_integrate(self, target: str = "langchain", project_path: str = ".") -> Any:
    """Get AI-powered advice for integrating SafeAI with a target framework.

    Analyzes the project and produces step-by-step integration guidance
    tailored to the specified framework.

    Args:
        target: Framework to integrate with (e.g., ``"langchain"``, ``"crewai"``).
        project_path: Path to the project directory.

    Returns:
        An AdvisorResult with integration instructions and code snippets.
    """
    from safeai.intelligence.integration import IntegrationAdvisor
    from safeai.intelligence.sanitizer import MetadataSanitizer

    backend = self._ensure_ai_registry().get()
    advisor = IntegrationAdvisor(backend=backend, sanitizer=MetadataSanitizer())
    return advisor.advise(target=target, project_path=project_path)

intercept_agent_message

intercept_agent_message(*, message: str, source_agent_id: str, destination_agent_id: str, data_tags: list[str] | None = None, session_id: str | None = None, approval_request_id: str | None = None) -> dict[str, Any]

Intercept an agent-to-agent message at the action boundary.

Classifies the message body, merges detected tags with any explicitly provided tags, evaluates policy rules, handles approval gating, and emits an audit event. The message may be allowed, redacted, or blocked.

Parameters:

Name Type Description Default
message str

The text message being sent between agents.

required
source_agent_id str

Identifier of the sending agent.

required
destination_agent_id str

Identifier of the receiving agent.

required
data_tags list[str] | None

Optional explicit data tags to include alongside auto-detected tags.

None
session_id str | None

Optional session scope for policy and approval context.

None
approval_request_id str | None

Optional pre-existing approval request ID to validate instead of creating a new one.

None

Returns:

Type Description
dict[str, Any]

A dict with keys "decision" (action, policy_name, reason),

dict[str, Any]

"data_tags", "filtered_message", and "approval_request_id".

Example::

result = ai.intercept_agent_message(
    message="Patient SSN is 123-45-6789",
    source_agent_id="triage-agent",
    destination_agent_id="billing-agent",
)
if result["decision"]["action"] == "allow":
    send_to_agent(result["filtered_message"])
Source code in safeai/api.py
def intercept_agent_message(
    self,
    *,
    message: str,
    source_agent_id: str,
    destination_agent_id: str,
    data_tags: list[str] | None = None,
    session_id: str | None = None,
    approval_request_id: str | None = None,
) -> dict[str, Any]:
    """Intercept an agent-to-agent message at the action boundary.

    Classifies the message body, merges detected tags with any explicitly
    provided tags, evaluates policy rules, handles approval gating, and
    emits an audit event.  The message may be allowed, redacted, or blocked.

    Args:
        message: The text message being sent between agents.
        source_agent_id: Identifier of the sending agent.
        destination_agent_id: Identifier of the receiving agent.
        data_tags: Optional explicit data tags to include alongside
            auto-detected tags.
        session_id: Optional session scope for policy and approval context.
        approval_request_id: Optional pre-existing approval request ID to
            validate instead of creating a new one.

    Returns:
        A dict with keys ``"decision"`` (action, policy_name, reason),
        ``"data_tags"``, ``"filtered_message"``, and ``"approval_request_id"``.

    Example::

        result = ai.intercept_agent_message(
            message="Patient SSN is 123-45-6789",
            source_agent_id="triage-agent",
            destination_agent_id="billing-agent",
        )
        if result["decision"]["action"] == "allow":
            send_to_agent(result["filtered_message"])
    """
    body = str(message)
    detected_tags = {item.tag for item in self.classifier.classify_text(body)}
    explicit_tags = {str(tag).strip().lower() for tag in (data_tags or []) if str(tag).strip()}
    tags = sorted(explicit_tags.union(detected_tags))
    decision = self.policy_engine.evaluate(
        PolicyContext(
            boundary="action",
            data_tags=tags,
            agent_id=source_agent_id,
            tool_name="agent_to_agent",
            action_type="agent_to_agent",
        )
    )
    approval_id: str | None = None
    if decision.action == "require_approval":
        if approval_request_id:
            validation = self.approvals.validate(
                approval_request_id,
                agent_id=source_agent_id,
                tool_name="agent_to_agent",
                session_id=session_id,
            )
            approval_id = approval_request_id
            if validation.allowed:
                decision = decision.__class__(
                    action="allow",
                    policy_name=decision.policy_name or "approval-gate",
                    reason=f"approval request '{approval_request_id}' approved",
                )
            elif validation.request and validation.request.status == "denied":
                decision = decision.__class__(
                    action="block",
                    policy_name="approval-gate",
                    reason=validation.reason,
                )
        else:
            created = self.approvals.create_request(
                reason=decision.reason,
                policy_name=decision.policy_name or "approval-gate",
                agent_id=source_agent_id,
                tool_name="agent_to_agent",
                session_id=session_id,
                action_type="agent_to_agent",
                data_tags=tags,
                metadata={"destination_agent_id": destination_agent_id},
                dedupe_key="|".join(
                    [
                        source_agent_id,
                        destination_agent_id,
                        session_id or "-",
                        ",".join(tags),
                        str(hash(body)),
                    ]
                ),
            )
            approval_id = created.request_id

    if decision.action == "allow":
        filtered_message = body
    elif decision.action == "redact":
        filtered_message = "[REDACTED]"
    else:
        filtered_message = ""

    self.audit.emit(
        AuditEvent(
            boundary="action",
            action=decision.action,
            policy_name=decision.policy_name,
            reason=decision.reason,
            data_tags=tags,
            agent_id=source_agent_id,
            session_id=session_id,
            source_agent_id=source_agent_id,
            destination_agent_id=destination_agent_id,
            metadata={
                "phase": "agent_message",
                "action_type": "agent_to_agent",
                "message_length": len(body),
                "filtered_length": len(filtered_message),
                "approval_request_id": approval_id,
                "destination_agent_id": destination_agent_id,
            },
        )
    )
    return {
        "decision": {
            "action": decision.action,
            "policy_name": decision.policy_name,
            "reason": decision.reason,
        },
        "data_tags": tags,
        "filtered_message": filtered_message,
        "approval_request_id": approval_id,
    }