lynx

mcp
Security Audit
Fail
Health Warn
  • License — License: NOASSERTION
  • Description — Repository has a description
  • Active repo — Last push 0 days ago
  • Low visibility — Only 6 GitHub stars
Code Fail
  • rm -rf — Recursive force deletion command in examples/02_block_dangerous.py
  • rm -rf — Recursive force deletion command in examples/05_real_llm_blocked.py
Permissions Pass
  • Permissions — No dangerous permissions requested

No AI report is available for this listing yet.

SUMMARY

Framework-agnostic policy-gated durable execution for AI agents. Every tool call gets a YAML policy check, a checkpoint, and a hash-chained audit event.

README.md

Lynx

PyPI
Python versions
License
CI

A stateless, type-safe policy kernel for AI agent tool calls.

Pure functions over immutable values. No database. No globals. No leaks. Five verdicts. Streaming events to user-owned sinks.

import asyncio
from lynx import (
    ToolSet, tool, load_policy_file, run_agent,
    stdout_sink, auto_deny,
)

@tool(reversible=False, scope=("filesystem:write",))
async def shell(cmd: str) -> str:
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    out, _ = await proc.communicate()
    return out.decode()

result = await run_agent(
    my_agent,
    task="clean up old logs",
    tools=ToolSet.from_functions(shell),
    policy=load_policy_file("policy.yaml"),
    sinks=(stdout_sink(),),
    on_approval=auto_deny("no approvals configured"),
    environment="prod",          # policy can match on context.environment
    # principal=Principal(kind="user", id="hadi"),  # optional
    # workspace=".",                                 # optional
    # budget=Budget(steps=50, duration_seconds=600), # optional
    # correlation_id=None,                           # auto-generated if None
)
# result: { correlation_id, bundle_id, final_answer, error, steps_taken }
# Lynx holds NOTHING. No DB. No state. No leaks.

What v2 does

  • Policy-gated execution at the tool-call boundary. Five verdicts: allow / deny / dry_run / approve_required / transform.
  • Streaming events to your sinks. We never store events — your sink can buffer, write to disk, ship to OTel, post to a webhook, whatever you choose.
  • Pure functions everywhere. The kernel is one function: run_agent(agent, task, *, tools, policy, sinks, on_approval, ...). No Runtime class. No singleton.
  • Immutable values. Every public type is frozen=True, slots=True. Mutation raises at runtime; mypy catches it at write time.
  • No globals. No tool registry, no broker, no module-level state. ToolSet is built explicitly at call site.
  • Hot-swappable policy. Pass a different PolicyBundle on the next run_agent call — the bundle is an immutable value; the kernel holds nothing between calls. (Mid-run reload is not supported; build a new bundle and use it on the next run.)
  • Durable runs, no double side effects (opt-in). Pass a RunStore you implement over your own storage and a stable run_id: a crashed run resumes at the first incomplete step — the model is not re-called for completed steps (no re-burned tokens) and journaled actions are not re-executed (no double charges). Two racing workers resolve to one winner; the loser exits superseded before executing anything.

What v2 does NOT do

  • No storage — durability journals to a RunStore you implement on your Redis/Postgres/Dynamo (the contract is two methods and one sentence); audit events stream to your sinks. Lynx never opens a file or a connection.
  • No process supervision — Lynx does not restart dead workers; your supervisor (systemd, k8s, a queue) does. Lynx makes the restart cheap and safe.
  • No prompt filtering — that's NeMo Guardrails or Guardrails AI.
  • No cluster orchestration — that's Temporal or Inngest.
  • No agent framework — that's LangGraph / CrewAI; we wrap them via adapters.

Install

pip install lynx-agent                    # core (3 deps)
pip install lynx-agent[anthropic]         # Claude adapter
pip install lynx-agent[openai]            # GPT adapter
pip install lynx-agent[langgraph]
pip install lynx-agent[crewai]
pip install lynx-agent[mcp]

Quickstart

pip install lynx-agent
lynx init           # writes one file: policy.yaml
python examples/01_hello_allow.py

How it works

                ┌────────────────────────────────────────────┐
                │  Agent (any framework)                     │
                └──────────────────┬─────────────────────────┘
                                   │  ToolCall
                                   ▼
              ╔═══════════════════════════════════════════╗
              ║  run_agent (pure function)                ║
              ║   1. PDP evaluates → Decision             ║
              ║   2. Mediator dispatches by verdict       ║
              ║   3. Sinks called with each AuditEvent    ║
              ║   4. Approval handler called sync if needed║
              ╚═══════════════════════════════════════════╝
                                   │ side effect
                                   ▼
                ┌────────────────────────────────────────────┐
                │  Real world                                │
                └────────────────────────────────────────────┘

Each agent step:

  1. Build ActionRequest from the agent's ToolCall
  2. evaluate(policy, request, context) returns a Decision (pure function)
  3. mediate(request, decision, tools, on_approval) dispatches
  4. Each step emits a few events; sinks consume them
  5. Result is appended to a new conversation tuple; old tuple is freed

Tools — @tool and ToolSet

Every tool is an async def decorated with @tool. The decorator attaches an
immutable ToolDef to the function (no global registry); you bundle decorated
functions into a ToolSet explicitly at the call site.

from lynx import tool

@tool(
    cost="low",                     # "low" | "medium" | "high" (default "low")
    reversible=False,               # if False, dry_run requires a .shadow
    scope=("filesystem:write",),    # free-form tags policy can match on
    blast_radius_hint=None,         # int | None — opaque to the kernel; readable by your rules via declared.blast_radius_hint
    name=None,                      # override; default = fn.__name__
    description=None,               # override; default = first line of docstring
)
async def write_file(path: str, content: str) -> str:
    """Save text to a file."""
    Path(path).write_text(content)
    return f"wrote {len(content)} bytes to {path}"

Shadows — pure previews for dry_run

If a tool is irreversible and policy chooses dry_run, the kernel calls the
shadow instead of the real function. Shadows must be pure (no I/O, no side
effects) and return a JSON-serializable preview.

@write_file.shadow
async def _write_file_shadow(path: str, content: str) -> dict:
    p = Path(path)
    return {
        "would_write": path,
        "bytes": len(content.encode()),
        "would_overwrite": p.exists(),
        "preview": content[:120],
    }

If no shadow is registered and policy defaults on_missing_shadow: approve_required
(the default), an irreversible tool with no rule match falls through to approval
rather than running blind.

Alternative attachment form:

from lynx import shadow

@shadow(write_file)
async def _write_file_shadow(path, content): ...

ToolSet — immutable, built at call site

from lynx import ToolSet

tools = ToolSet.from_functions(write_file, shell, get_customer)

tools.names()                        # ("get_customer", "shell", "write_file")
tools.get("write_file")              # ToolDef
tools.with_tool(other_def)           # returns NEW ToolSet
tools.without_tool("shell")          # returns NEW ToolSet
tools.union(other_toolset)           # returns NEW ToolSet
len(tools)                           # 3

Every operation returns a new ToolSet; the original is untouched.

Policy — full reference

A policy is a frozen PolicyBundle produced by compile_policy(yaml_str) or
load_policy_file(path). Bundles are content-addressed by bundle.id and safe
to hot-reload — the kernel holds no policy state between calls.

YAML schema

version: 1                        # int; currently only 1 is defined

defaults:
  on_no_match: deny               # verdict when no rule matches a request
  on_missing_shadow: approve_required
                                  # verdict when no rule matches AND the tool
                                  # is irreversible AND has no .shadow

predicates:                       # named, reusable matchers
  in_prod: { context.environment: prod }
  is_kubectl: { tool: kubectl }
  is_destructive_sql:
    tool: sql_exec
    args.sql.matches: '(?i)\b(UPDATE|DELETE)\b'

rules:
  - id: hard-block-rm-rf-root     # str; defaults to "rule_<index>"
    priority: 100                 # int; higher runs first (default 0)
    description: "..."            # optional, free-form
    match: { ... }                # see "Match expressions" below
    decision: deny                # one of the five verdicts
    reason: "rm -rf / is hard-blocked"
    approvers: ["[email protected]"]   # only used by approve_required
    timeout_seconds: 1800                # only used by approve_required
    transform: { ... }                   # only used by transform

Rules are sorted by (-priority, file order). The first matching rule wins.
Python rules (see below) are interleaved with YAML rules by priority — a
higher-priority YAML rule beats a lower-priority Python rule, and vice versa.

The five verdicts

Verdict What the mediator does
allow Call tool.fn(**args) normally.
deny Skip execution. Inject a [denied] tool message into the conversation.
dry_run Call tool.shadow_fn(**args) instead of fn. Real side effects suppressed.
approve_required Call on_approval(...) synchronously. On grant, proceed as allow; on deny, behave as deny.
transform Rewrite args per the transform: block, then call fn(**rewritten_args).

Match expressions

Match expressions read fields off the live ActionRequest and ExecutionContext.

Paths (the part before the operator):

Path prefix Reads from
tool The tool name (string)
args.<name>... The arguments the agent proposed
declared.<name> Tool metadata: cost, reversible, scope, blast_radius_hint, has_shadow
context.<name> principal, environment, workspace, correlation_id, step_seq, timestamp, extra

Operators (suffix the path with .<op>):

Operator Meaning Example
(none) / .eq Equality tool: kubectl
.matches Regex re.search (RE2-style guards reject catastrophic backtracking) args.cmd.matches: '^rm\s+-rf'
.in Value is in the listed sequence args.customer_id.in: ["C-789"]
.contains Container contains the value declared.scope.contains: filesystem:write
.contains_any Container contains any listed value declared.scope.contains_any: [a, b]
.contains_all Container contains all listed values declared.scope.contains_all: [a, b]
.gt .ge .lt .le Numeric comparison args.amount_usd.gt: 500
.between lo <= v <= hi args.amount_usd.between: [50, 500]
.not_between Inverse of between

Composition at any level:

match:
  all_of:
    - is_kubectl                       # named predicate
    - in_prod
    - args.command.matches: '^(apply|delete|patch)\b'
  # any_of: [ ... ]
  # not: { tool: shell }

transform: block

decision: transform
transform:
  jsonpath: "$.args.sql"               # default "$.args"; the target arg key
  append: " AND tenant_id = 'TENANT-A'" # one of: set | append | delete
  • set: <value> — replace the value at jsonpath
  • append: <value> — string-concatenate to the existing value
  • delete: true — remove the key from args

Python rules

Anything you can't express in YAML, write as a Python predicate. Rules are
explicit arguments to compile_policy; there is no decorator and no registry.

from lynx import compile_policy
from lynx.policy import allow, deny, dry_run, approve_required, transform

def block_paths_outside_workspace(req, ctx):
    if req.tool != "shell":
        return None                                   # skip — let YAML decide
    if path_escapes(req.args["cmd"], ctx.workspace):
        return deny("path escapes workspace")
    return None

bundle = compile_policy(
    yaml_source,
    python_rules=(block_paths_outside_workspace,),
    python_rule_priorities=(("block_paths_outside_workspace", 100),),
)

Each Python rule is (ActionRequest, ExecutionContext) -> Decision | None.
Return None to defer; the first non-None result wins. Python and YAML
rules are interleaved in a single priority-sorted evaluation order (default
priority 0). If a rule raises during evaluation, it is recorded as a
diagnostic marker in Decision.matched_rules (e.g. <rule_error:my_rule:TypeError>)
and evaluation continues — buggy rules never silently fail-open.

Decision constructors

For Python rules and tests:

from lynx.policy import allow, deny, dry_run, approve_required, transform

allow(reason="", matched_rules=())
deny(reason, matched_rules=())
dry_run(reason="", matched_rules=())
approve_required(approvers=(), timeout_seconds=1800, reason="", matched_rules=())
transform(transform_args={"sql": "..."}, reason="", matched_rules=())

Default behavior when no rule matches

  1. If the tool is irreversible AND has no shadowdefaults.on_missing_shadow
    (default approve_required).
  2. Otherwise → defaults.on_no_match (default deny).

The matched rule id will be "<default:on_missing_shadow>" or
"<default:on_no_match>" so you can see the fall-through in audit events.

run_agent — all kwargs

result = await run_agent(
    agent,                              # implements async step(conv) -> ToolCall | FinalAnswer
    task,                               # str — becomes the first user Message
    *,
    tools,                              # ToolSet
    policy,                             # PolicyBundle
    sinks=(),                           # Iterable[Sink]
    on_approval=None,                   # ApprovalHandler; defaults to auto_deny
    budget=Budget(steps=50, duration_seconds=600),
    principal=Principal(kind="user", id="anonymous"),
    environment="dev",                  # policy reads this via context.environment
    workspace=".",                      # policy reads this via context.workspace
    correlation_id=None,                # auto-generated UUID4 if None
)

Sinks — the audit replacement

from lynx import stdout_sink, jsonl_sink, multi_sink

# Pretty-print + persist to jsonl in one go
with open("audit.jsonl", "a") as f:
    sink = multi_sink(stdout_sink(), jsonl_sink(f))
    await run_agent(..., sinks=(sink,))
# File is yours. You close it. You rotate it. You ship it where you want.

Built-in sinks:

Sink What it does
stdout_sink(stream=...) Pretty-print events
jsonl_sink(handle) One JSON line per event
noop_sink() Discard (for tests)
multi_sink(*sinks) Fan out concurrently
callback_sink(fn) Wrap any async callable

Write your own — it's just async def __call__(event: AuditEvent) -> None.

Approvals — synchronous handlers

from lynx import cli_prompt_approval, callback_approval, ApprovalDecision

# Built-in: prompt on stdin
await run_agent(..., on_approval=cli_prompt_approval())

# Or bring your own
async def slack_approval(req):
    msg = await slack.post(f"Approve {req.request.tool}?")
    button = await slack.wait_for_click(msg, timeout=3600)
    return ApprovalDecision(granted=button == "approve", approver=button.user)

await run_agent(..., on_approval=callback_approval(slack_approval))

The run_agent call blocks on the handler. No queue. No broker. No cross-process resume. Your handler decides how to wait.

Durability — crash-resume without double side effects

Opt in by passing a RunStore (your storage, your dependency) and a stable run_id:

result = await run_agent(
    agent, task,
    tools=tools, policy=policy,
    store=my_store,                 # you implement two methods (below)
    run_id="invoice-2026-0611",     # stable across retries
)
# Process dies mid-run? Your supervisor retries the same call.
# Completed steps replay from the journal: the model is NOT re-called,
# journaled actions are NOT re-executed. A finished run returns the same
# answer forever. Two racing workers resolve to one; the loser returns
# error="superseded: ..." having executed nothing.

The whole RunStore contract:

class MyStore:                       # Redis / Postgres / Dynamo / a dict
    async def append(self, record: StepRecord) -> None:
        # MUST atomically raise DuplicateRecord if (run_id, seq) exists.
        # Postgres: PRIMARY KEY (run_id, seq). Redis: HSETNX. That's it.
        ...
    async def load(self, run_id: str) -> Sequence[StepRecord]:
        ...                          # ordered by seq

That one uniqueness rule is the concurrency story: the write-ahead intent
journaled before every action is the claim — no leases, no TTLs, nothing
to clean up when a worker dies. See
examples/24_durable_resume.py for a
complete ~15-line store plus crash, resume, and supersede in action, and
docs/integration-cookbook.md for Redis /
Postgres / file-backed recipes.

The crash window, handled honestly. If a worker dies between executing
an action and journaling its result, the action may have run. On resume,
Lynx re-proposes it to policy with context.extra.uncertain_retry: true
so your policy decides: re-run it (idempotent tools), deny it, or escalate
to a human:

- id: never-rerun-uncertain-payments
  match: { context.extra.uncertain_retry: true, declared.reversible: false }
  decision: approve_required

Inspect any journal with replay(records) (pure function) or lynx trace records.jsonl (for file-backed stores).

Scope, honestly: Lynx does not restart dead processes (your supervisor does);
durability needs no database, but distributed durability — runs surviving
machine loss, multiple workers — needs your database. Budgets count
replayed steps (resume a budget-exhausted run by passing a larger budget);
duration_seconds is per-attempt. Tool args/results should be
JSON-serializable (LLM tool calls always are). Resuming under a different
policy emits a run.bundle_changed warning; resuming with a different
ToolSet, or with an agent that isn't a pure function of the conversation
(e.g. the single-shot CrewAI adapter), is out of contract.

Examples

# File What it shows
01 01_hello_allow.py Smallest possible run
02 02_block_dangerous.py DENY for rm -rf /
03 03_preview_writes.py DRY_RUN with file shadow
04 04_human_approval.py Sync approval via stdin
05 05_real_llm_blocked.py Real Claude / GPT
06 06_streaming_to_jsonl.py Audit replacement: jsonl sink
07 07_refund_workflow.py Multi-tier refund rules
08 08_sql_transform.py TRANSFORM verdict
09 09_fastapi_service.py FastAPI integration
10 10_devops_assistant.py All five verdicts (one policy, run in staging + prod)
11 11_flask_service.py Flask integration
12 12_django_service.py Django integration

CLI — six commands

lynx --version
lynx init                        # writes policy.yaml (only)
lynx run <script>                # runs an async main()
lynx trace <records.jsonl>       # reconstruct a journaled run
lynx policy lint                 # validates a YAML
lynx policy bundle-id            # content-addressed ID

Migrating from v1.x

v1's Runtime, runtime.run/resume/approve/deny, SQLite store, audit chain, and approval broker are all gone. Replace:

v1 v2
runtime.run(agent, task=...) run_agent(agent, task, tools=..., policy=..., sinks=..., on_approval=...)
runtime.resume(run_id) Re-call run_agent with the same store= + run_id= — completed steps replay from your journal
runtime.approve(approval_id) Doesn't exist — handler returns ApprovalDecision synchronously
runtime.audit_chain(run_id) Doesn't exist — wire jsonl_sink or your own sink
get_registry() Doesn't exist — ToolSet.from_functions(*decorated_fns)
enable_otel() Will land as otel_sink(tracer) in v2.1
lynx ps / trace / audit / resume / approvals All gone — your sink owns the story

If you need any of those primitives, pin v1.0.x:

pip install "lynx-agent<2.0"

v1 will keep getting security fixes per the SECURITY.md policy.

Status

v2.0 — public API committed. SemVer from here. Production-ready for the documented scope.

Design

License

Apache 2.0.

Reviews (0)

No results found