cubepi
Health Warn
- License — License: MIT
- Description — Repository has a description
- Active repo — Last push 0 days ago
- Low visibility — Only 8 GitHub stars
Code Pass
- Code scan — Scanned 12 files during light audit, no dangerous patterns found
Permissions Pass
- Permissions — No dangerous permissions requested
No AI report is available for this listing yet.
CubePi — An async-first Python agent kernel inspired by Pi, built for coding agents, tools, and continuous sessions.
CubePi is a Pythonic, async-native agent framework designed for high performance, readability, and production-grade persistence. It provides a leaner alternative to graph-based agent runtimes by modeling agent logic as a linear while loop that developers can easily trace and debug.
Why CubePi
| langgraph | CubePi | |
|---|---|---|
| Abstraction | Graph nodes + edges + channels — you model your agent as a state machine | Plain async functions — run_agent_loop is a while loop you can read in 5 minutes |
| Streaming | Callback-based, multiple handler types | async for event in stream — one pattern everywhere |
| Checkpointing | Full snapshot per step — serializes entire message list on every channel change | Append-only — writes only new messages, O(1) DB I/O regardless of conversation length |
| Dependencies | Pulls in langchain-core, langgraph-sdk, and transitive deps | 3 core deps: pydantic, anthropic, openai |
| Tool execution | Tools are graph nodes with manual wiring | Declare tools as functions, framework handles routing and parallel execution |
| Multi-provider | Via langchain chat model adapters | Native Provider protocol — Anthropic, OpenAI built in, add your own with one class |
| Middleware | Graph-level middleware on node entry/exit | Agent-level middleware with 8 typed hooks and declarative composition rules |
| Observability | LangSmith / Langfuse integration, full trace visualization | Native OpenTelemetry — Tracer, Meter, GenAI semconv, OTLP / JSONL exporters built in |
Install
pip install cubepi
# Optional extras
pip install cubepi[sqlite] # SQLite checkpointer
pip install cubepi[postgres] # Postgres checkpointer
pip install cubepi[mysql] # MySQL checkpointer
pip install cubepi[mcp] # MCP tool loaders
pip install cubepi[tracing] # OpenTelemetry tracing + metrics
pip install cubepi[tracing-otlp] # Adds the OTLP/HTTP span exporter
pip install cubepi[trace-cli] # `cubepi trace` terminal viewer
Or with uv:
uv add cubepi
uv add cubepi[sqlite,postgres,mysql,mcp,tracing]
Quick Start
import asyncio
from cubepi import Agent, tool
from cubepi.providers.anthropic import AnthropicProvider
provider = AnthropicProvider(provider_id="anthropic", api_key="sk-...")
@tool
async def get_weather(city: str) -> str:
"Get current weather for a city."
return f"72°F and sunny in {city}"
agent = Agent(
model=provider.model("claude-sonnet-4-5-20250929"),
tools=[get_weather],
system_prompt="You are a helpful weather assistant.",
)
def on_event(event, signal=None):
if event.type == "text_delta":
print(event.delta, end="", flush=True)
agent.subscribe(on_event)
asyncio.run(agent.prompt("What's the weather in Tokyo?"))
For a guided tour of the architecture, browse the
DeepWiki for this repo or the
Core Concepts guide.
Core Concepts
Providers
Abstract LLM interaction behind a Provider protocol. All providers return MessageStream — an async iterator of StreamEvents.
from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.openai import OpenAIProvider
from cubepi.providers import FauxProvider
# Real providers
anthropic = AnthropicProvider(provider_id="anthropic", api_key="...")
openai = OpenAIProvider(provider_id="openai", api_key="...")
# Test provider — no API calls, fully deterministic
faux = FauxProvider(provider_id="faux")
faux.set_responses(["Hello!", "How can I help?"])
Tools
Decorate an async function with @tool: the input schema is derived from the
typed parameters, the docstring becomes the description, and the framework
handles argument parsing, parallel execution, and error wrapping.
from cubepi import tool
@tool
async def search(query: str) -> str:
"Search the web."
return f"Results for: {query}"
Need a shared params model, dynamic construction, or execution_mode? The
longhand AgentTool(...) is equivalent and fully supported:
from pydantic import BaseModel
from cubepi import AgentTool
from cubepi.agent.types import AgentToolResult
from cubepi.providers.base import TextContent
class SearchParams(BaseModel):
query: str
async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
return AgentToolResult(content=[TextContent(text=f"Results for: {params.query}")])
search = AgentTool(
name="search",
description="Search the web",
parameters=SearchParams,
execute=execute,
execution_mode="parallel", # or "sequential"
)
Middleware
Composable hooks that modify behavior without touching the core loop:
from cubepi import Middleware, compose_middleware
from cubepi.agent.types import BeforeToolCallResult
class LoggingMiddleware(Middleware):
async def transform_context(self, messages, *, ctx, signal=None):
print(f"Context has {len(messages)} messages")
return messages
class SafetyMiddleware(Middleware):
async def before_tool_call(self, ctx, *, signal=None):
if ctx.tool_call.name == "dangerous_tool":
return BeforeToolCallResult(block=True, reason="Blocked by policy")
return None
hooks = compose_middleware([LoggingMiddleware(), SafetyMiddleware()])
Composition rules:
| Hook | Rule |
|---|---|
transform_context |
Chained — each receives previous result |
convert_to_llm |
Last implementation wins |
before_tool_call |
Any block stops execution |
after_tool_call |
Later overrides earlier |
transform_system_prompt |
Chained — each receives previous result |
after_model_response |
Returns TurnAction; last decision wins, messages concatenate |
should_stop_after_turn |
Any true stops |
on_run_end |
Messages concatenate; non-empty result triggers one extra model turn |
Checkpointer
Persist conversation state with append-only semantics:
from cubepi.checkpointer import (
MemoryCheckpointer,
SQLiteCheckpointer,
PostgresCheckpointer,
MySQLCheckpointer,
)
# In-memory for dev/test
cp = MemoryCheckpointer()
# SQLite for lightweight persistence
async with SQLiteCheckpointer("agent.db") as cp:
agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")
# Postgres for production
async with PostgresCheckpointer("postgresql://...") as cp:
agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")
# MySQL for production
async with MySQLCheckpointer("mysql://...") as cp:
agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")
Postgres and MySQL never issue DDL at runtime — your app owns the schema via
Alembic. See the host-integration runbooks
(Postgres ·
MySQL) and the runnableexamples/.
FauxProvider for Testing
Ship your agent tests without API keys:
from cubepi.providers import FauxProvider, faux_text, faux_tool_call, faux_assistant_message
provider = FauxProvider(provider_id="faux")
provider.set_responses([
faux_assistant_message([
faux_tool_call("search", {"query": "python"}),
]),
faux_assistant_message("Here are the results..."),
])
agent = Agent(model=provider.model("test"), tools=[search_tool])
agent.subscribe(lambda event, signal=None: None) # subscribe before prompt to receive events
await agent.prompt("Search for python")
# Streams realistic deltas — content_block_start, text_delta, etc.
Tracing
Attach a Tracer and every agent run produces OpenTelemetry spans
aligned with the GenAI Semantic Conventions —
ingestible by Jaeger, Tempo, Honeycomb, Datadog, AWS X-Ray, or any
OTLP-compatible backend without custom instrumentation:
from cubepi.tracing import Tracer, tracing_context
from cubepi.tracing.exporters import JsonlSpanExporter
async with (
Tracer(
service_name="my-bot",
agent_name="assistant",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
) as tracer,
tracer.attached(agent),
):
with tracing_context(tags=["beta-arm"], metadata={"user_id": "u-42"}):
await agent.prompt("Hello.")
# On exit: detach (closes any cancelled-run spans + flush) + tracer shutdown.
Span tree per run:
trace
└── invoke_agent 14425.8ms [0x1cd97cdb] ← one per agent.prompt()
├── cubepi.turn 1283.1ms [0x5cfda93e] ← one per LLM round-trip
│ ├── chat deepseek-v4-flash 1208.7ms tok 6845/68 [0x0d130229]
│ └── execute_tool subagent 9610.2ms subagent [0x38bdd10a]
│ └── invoke_agent 9601.0ms [0x8094f99b] ← subagent run, nested
│ └── cubepi.turn 9598.4ms [0x57c5cfc7]
│ ├── chat deepseek-v4-flash 1190.3ms [0x8205ca6b]
│ └── execute_tool web_search 6500.2ms web_search [0xca4e59fc]
└── cubepi.turn 491.9ms ERROR [0xce25f242]
└── chat deepseek-v4-flash 427.2ms ERROR [0x0bff68ec]
└── error: Error code: 400 - ... `tool_use` ids were found without
`tool_result` blocks immediately after: call_01_...
No prompts / model outputs are recorded by default. Opt in withTracer(record_content=True) plus a redact callback for PII. Pair
with Meter(...) for gen_ai.client.operation.duration / TTFC /
token-usage histograms. Full guide: https://cubepi.ai/docs/guides/tracing/overview
Inspecting traces from the terminal
With JsonlSpanExporter writing to ./cubepi-traces, inspect runs with thecubepi trace CLI (install the extra: pip install cubepi[trace-cli]). All
subcommands take --dir (default ./cubepi-traces):
cubepi trace ls # recent runs, newest first; the `input`
# column shows the user message + `status`
cubepi trace view <run_id> # render a run as a tree; errors print inline
# under the failing span (no flag needed).
# A unique run-id PREFIX is enough.
cubepi trace view <run> --content # also expand prompts / tool args / results
cubepi trace view <run> -v # expand ALL span attributes (verbose)
cubepi trace follow <run_id> # stream spans live as they complete
cubepi trace stats --by model # token / latency / error aggregates
cubepi trace stats --by tool --since 2026-01-01
Typical debugging flow: ls (find the run by its input), thenview <prefix> and read the inline error: line under any ERROR span. Need
content only recorded with Tracer(record_content=True).
Token / cache fields. The recorder reconciles to the GenAI semconv, sogen_ai.usage.input_tokens is the inclusive total prompt
(input + cache_read + cache_creation) and gen_ai.usage.cache_read.input_tokens
is a subset of it. From trace fields, cache hit rate iscache_read / input_tokens (≤ 100%) — do not add cache_read to the
denominator.
Coding agents debugging cubepi/consumer apps can install thecubepi-trace skill:
npx skills add cubeplexai/cubepi@cubepi-trace -a claude-code
AI Agents
Two skills are available for coding agents (Claude Code, Cursor, Codex, …) working
with this repo:
| Skill | Install | Purpose |
|---|---|---|
cubepi |
npx skills add cubeplexai/cubepi@cubepi -a claude-code |
Build agents — API reference, tools, middleware, checkpointing, MCP, HITL |
cubepi-trace |
npx skills add cubeplexai/cubepi@cubepi-trace -a claude-code |
Debug runs — inspect OTel spans, token counts, tool results, streaming failures |
Requirements
- Python >= 3.11
- Core:
pydantic,anthropic,openai - Optional:
aiosqlite([sqlite]),asyncpg+sqlalchemy+msgpack([postgres]),aiomysql+sqlalchemy+msgpack+cryptography([mysql]),mcp([mcp]),opentelemetry-sdk([tracing]),opentelemetry-exporter-otlp-proto-http([tracing-otlp]),rich([trace-cli])
Credits
Architecture inspired by pi-agent-core (TypeScript); CubePi is an independent Python reimplementation with Pydantic v2, asyncio-native primitives, and built-in checkpointing.
License
MIT
Reviews (0)
Sign in to leave a review.
Leave a reviewNo results found