loading…
Search for a command to run...
loading…
Enables AI coding agents to execute formal, stateful workflows with typed contracts, postcondition enforcement, and structured retry logic.
Enables AI coding agents to execute formal, stateful workflows with typed contracts, postcondition enforcement, and structured retry logic.
PyPI — stratum-mcp PyPI — stratum-py License
State machine dispatch server for AI agent workflows.
Stratum gives AI coding agents (Claude Code, Codex, etc.) a formal execution model. Instead of improvising a plan and retrying blindly, the agent writes a typed spec, the server tracks state, enforces postconditions, and returns structured failure context on retry. Every step produces an auditable trace record.
Two shipped components:
stratum-mcp -- MCP server for Claude Code. Validates .stratum.yaml specs, manages flow execution state, enforces typed contracts and postconditions. Published on PyPI.stratum-py -- Python library with @infer, @contract, @compute, @flow decorators for building production LLM systems. Published on PyPI.Governed workflows as auditable flows — on any agent, not just one vendor. Unlike a single-vendor in-context orchestrator, Stratum runs as an MCP server and a library under Claude Code, Codex, or any MCP host; enforces typed contracts and ensure postconditions on every flow execution; stops at real human gates; dispatches Claude and Codex agents in one flow (so an independent reviewer can be a different model from the implementer); and persists flow state across sessions. Where you want raw in-context fan-out, reach for an in-host workflow runtime; where you want the run governed, portable, and auditable, that's a Stratum workflow.
pip install stratum-mcp
stratum-mcp install
install does three things:
.claude/mcp.json to register the MCP serverCLAUDE.md~/.claude/skills/~/.stratum/hooks/Restart Claude Code to activate.
To remove:
stratum-mcp uninstall # removes everything
stratum-mcp uninstall --keep-skills # keeps user-customized skills
pip install stratum-py
Requires Python 3.11+. Dependencies: litellm>=1.0, pydantic>=2.0.
Set an API key for your LLM provider (GROQ_API_KEY, ANTHROPIC_API_KEY, OPENAI_API_KEY, etc.) and specify the model in model=.
When Claude Code has Stratum installed, it uses it automatically for non-trivial tasks:
.stratum.yaml spec internally (never shown to you)stratum_plan to validate the spec and get the first stepstratum_step_done after each step -- the server checks postconditionsstratum_audit at the end for a full execution traceYou see plain English narration throughout. The spec, state management, and postcondition enforcement happen behind the scenes.
Stratum uses these two terms deliberately — they are not interchangeable.
A workflow is the authored definition: the named, registered, version-controlled artifact you write once and rerun (a .stratum.yaml spec with a workflow: block, discoverable via stratum_list_workflows). A flow is the executable DAG definition: a DAG of steps (flows:, @flow). Running a flow produces a flow execution — a single live instance tracked as a FlowState with a flow_id. A workflow composes flows; running a workflow produces flow executions.
The split mirrors Temporal (Workflow Definition vs Execution) and Airflow (DAG vs DAG Run). Rule of thumb: if you can git diff it, it's a workflow; if it has a flow_id and a state, it's a flow execution.
A flow is a directed acyclic graph of steps with typed inputs and an output contract. Flows are defined either in .stratum.yaml specs (executed by the MCP server) or with the @flow decorator in the Python library.
A step is a single unit of work within a flow. Each step has an ID, an execution mode (function, inline, or flow), inputs that can reference flow-level inputs or prior step outputs, and optional postconditions.
Functions are reusable definitions that steps reference. A function declares its mode (infer, compute, or gate), intent, input schema, output contract, postconditions, retry count, and optional model/budget.
Contracts define the shape of step outputs. They are named schemas with typed fields, validated against step results before postconditions run.
Ensures are postcondition expressions evaluated against step results. If any ensure fails, the step is retried with the specific violation. Example: result.confidence > 0.7.
Each step has a retry budget. When an ensure or schema validation fails, the step is retried up to the declared limit. The agent receives the specific violations, not a blank replay.
Gate steps pause execution and wait for external resolution (human approval, agent decision, or system timeout). Gates support approve, revise, and kill outcomes with configurable routing.
A .stratum.yaml spec has four top-level sections: version, contracts, functions, and flows. An optional workflow block declares the spec as a registered workflow.
version: "0.1"
contracts:
SentimentResult:
label: {type: string}
confidence: {type: number}
functions:
classify:
mode: infer
intent: "Classify the sentiment of this text"
input: {text: {type: string}}
output: SentimentResult
ensure:
- "result.label != ''"
- "result.confidence > 0.7"
retries: 2
flows:
run:
input: {text: {type: string}}
output: SentimentResult
steps:
- id: s1
function: classify
inputs: {text: "$.input.text"}
version: "0.2"
contracts:
WorkOutput:
result: {type: string}
quality_score: {type: number}
functions:
do_work:
mode: infer
intent: "Produce the deliverable"
input: {text: {type: string}}
output: WorkOutput
ensure:
- "result.quality_score >= 0.8"
retries: 3
review_gate:
mode: gate
timeout: 3600
flows:
reviewed_work:
input: {text: {type: string}}
output: WorkOutput
max_rounds: 3
steps:
- id: work
function: do_work
inputs: {text: "$.input.text"}
- id: review
function: review_gate
on_approve: ~
on_revise: work
on_kill: ~
version (required)version: "0.1" # or "0.2"
Version "0.2" adds gates, inline steps, flow composition, policies, iterations, skip_if, routing, and workflows.
contractsNamed output schemas. Each contract is an object whose keys are field names and values are {type: <type>} objects.
contracts:
MyContract:
field_name: {type: string}
score: {type: number}
tags: {type: array}
Supported types: string, number, integer, boolean, array, object.
functionsReusable step definitions referenced by function steps.
functions:
my_function:
mode: infer | compute | gate # required
intent: "What this function does" # required for infer/compute
input: # required for infer/compute
param_name: {type: string}
output: ContractName # required for infer/compute
ensure: # optional (forbidden on gate)
- "result.field > 0"
retries: 3 # optional, default 3 (forbidden on gate)
budget: # optional (forbidden on gate)
ms: 5000
usd: 0.01
model: "gpt-4o" # optional
timeout: 3600 # optional, gate only — seconds before auto-kill
Gate functions only require mode: gate. They must not have ensure, budget, or retries.
flowsFlow definitions with input schema, output contract, and ordered steps.
flows:
my_flow:
input:
param: {type: string}
output: ContractName # optional for gate-only flows
budget: # optional
ms: 30000
usd: 0.10
max_rounds: 5 # optional (v0.2) — max gate revise cycles
steps:
- id: step_id # required, unique within flow
# Execution mode — exactly one of:
function: my_function # references a function definition
intent: "Do something" # inline step (v0.2)
flow: sub_flow_name # sub-flow invocation (v0.2)
# Common fields:
inputs: # optional
param: "$.input.param"
prior: "$.steps.s1.output.field"
depends_on: [s1, s2] # optional — explicit dependencies
output_schema: # optional — JSON Schema for result validation
type: object
required: [done]
properties:
done: {type: boolean}
# Inline step fields (v0.2, only with intent):
agent: claude # optional — agent assignment
ensure: # optional
- "result.done == True"
retries: 2 # optional, default 1
output_contract: MyContract # optional
model: "gpt-4o" # optional
budget: # optional
ms: 5000
# Gate step fields (v0.2, only with function referencing a gate):
on_approve: next_step | ~ # required — step to route to, or null for completion
on_revise: earlier_step # required — must target topologically earlier step
on_kill: cleanup_step | ~ # required — step to route to, or null for termination
policy: gate | flag | skip # optional — auto-resolution policy
policy_fallback: gate # optional — requires policy
# Non-gate routing (v0.2):
on_fail: recovery_step # optional — route on retry exhaustion (requires ensure)
next: loop_back_step # optional — override linear advancement on success
# Conditional skip (v0.2):
skip_if: "$.steps.s1.output.skip == True" # optional (forbidden on gates)
skip_reason: "Already done" # optional
# Iteration (v0.2):
max_iterations: 10 # optional (forbidden on gates)
exit_criterion: "result.quality >= 0.9" # optional, requires max_iterations
accumulate: "result.findings" # optional; dedup items across rounds
accumulate_key: "item.id" # optional; requires accumulate
workflow (v0.2)Self-registering workflow declaration. Allows stratum_list_workflows to discover specs.
workflow:
name: my-workflow # lowercase, hyphens only
description: "What this workflow does"
input:
param_name:
type: string
required: true
default: "value"
Workflow input keys must exactly match the entry flow's input keys.
Step inputs use $ references to chain data through the flow:
| Pattern | Resolves to |
|---|---|
$.input.<field> |
Flow-level input value |
$.steps.<step_id>.output |
Full output of a prior step |
$.steps.<step_id>.output.<field> |
Specific field from a prior step's output |
literal_value |
Passed through as-is |
References create implicit dependencies. The server also uses explicit depends_on for topological ordering (Kahn's algorithm).
All tools are exposed via the MCP protocol. Claude Code calls them as tool invocations.
stratum_validateValidate a .stratum.yaml spec without creating a flow.
Inputs: spec (str, inline YAML)
Returns: {valid: bool, errors: list}
stratum_planValidate a spec, create execution state, and return the first step to execute.
Inputs:
spec (str) -- inline YAMLflow (str) -- flow nameinputs (dict) -- flow-level inputsReturns: Step dispatch object with status: "execute_step" or status: "await_gate", including:
flow_id -- unique identifier for this executionstep_id, step_number, total_stepsfunction, intent, inputs (resolved)output_contract, output_fields, ensureretries_remainingagent, step_modestratum_step_doneReport a completed step result. The server validates the result against output schemas and ensure expressions.
Inputs:
flow_id (str)step_id (str)result (dict) -- step output matching the output contractReturns one of:
status: "execute_step")status: "ensure_failed", violations, retries_remaining)status: "schema_failed", violations)status: "complete", output, trace, total_duration_ms)status: "error", error_type: "retries_exhausted")routed_from, violations)stratum_auditReturn the full execution trace for a flow.
Inputs: flow_id (str)
Returns:
flow_id, flow_name, status (complete, in_progress, killed)steps_completed, total_stepstrace -- array of step records (step_id, function_name, attempts, duration_ms, type, round)round, rounds -- round history for gate revise cyclesiterations, archived_iterations -- iteration historychild_audits -- audit snapshots from sub-flow executionstotal_duration_msstratum_gate_resolveResolve a gate step with a human/agent/system decision.
Inputs:
flow_id (str)step_id (str) -- must be the current gate stepoutcome (str) -- "approve", "revise", or "kill"rationale (str) -- human-readable reasonresolved_by (str) -- "human", "agent", or "system"Returns: Next step, flow completion, or flow termination.
stratum_check_timeoutsCheck whether a pending gate step has exceeded its configured timeout. Auto-kills with resolved_by: "system" if expired.
Inputs: flow_id (str)
stratum_skip_stepExplicitly skip the current step (cannot skip gate steps).
Inputs: flow_id (str), step_id (str), reason (str)
stratum_commitSave a named checkpoint of the current flow state.
Inputs: flow_id (str), label (str)
stratum_revertRoll back flow state to a previously committed checkpoint.
Inputs: flow_id (str), label (str)
stratum_iteration_startStart an iteration loop on the current step (requires max_iterations in the spec).
Inputs: flow_id (str), step_id (str)
stratum_iteration_reportReport one iteration result. Evaluates exit_criterion, increments count, checks max_iterations.
Inputs: flow_id (str), step_id (str), result (dict)
Returns: iteration_continue or iteration_exit with outcome.
stratum_iteration_abortAbort an active iteration loop before completion.
Inputs: flow_id (str), step_id (str), reason (str)
stratum_compile_speckitCompile a spec-kit tasks directory into a .stratum.yaml flow.
Inputs: tasks_dir (str), flow_name (str, default "tasks")
Returns: {status, yaml, flow_name, steps} on success.
stratum_list_workflowsScan a directory for *.stratum.yaml files with workflow: blocks. Returns registered workflows with name, description, input schema, and file path. Detects duplicate names.
stratum_draft_pipelinePush a pipeline draft to the PipelineEditor UI via .stratum/pipeline-draft.json.
Reference a named function definition. The function provides the intent, input schema, output contract, ensures, retries, and model.
steps:
- id: classify
function: classify_sentiment
inputs: {text: "$.input.text"}
Self-contained steps with intent and optional agent. No function reference needed. Execution fields (ensure, retries, output_contract, model, budget) are declared directly on the step.
steps:
- id: analyze
intent: "Analyze the codebase for security vulnerabilities"
agent: claude
ensure:
- "result.vulnerabilities_checked == True"
retries: 2
output_contract: AnalysisResult
Invoke a sub-flow defined in the same spec. The child flow runs to completion, and its output is unwrapped into the parent step's result.
steps:
- id: run_tests
flow: test_suite
inputs: {project: "$.input.project"}
ensure:
- "result.all_passed == True"
on_fail: fix_tests
Flow steps must not have agent, retries, model, or budget. They may have ensure and on_fail.
Every step must have exactly one of function, intent, or flow. Having zero or more than one is a semantic error caught at parse time.
Ensures are Python expressions evaluated against the step result. The result is available as result (dicts are wrapped in SimpleNamespace for attribute access).
ensure:
- "result.confidence > 0.7"
- "result.label != ''"
- "len(result.items) > 0"
- "result.status in ('success', 'partial')"
| Function | Signature | Description |
|---|---|---|
file_exists |
file_exists(path) |
Returns True if the file exists on disk |
file_contains |
file_contains(path, substring) |
Returns True if the file contains the substring (10 MB limit) |
len |
len(x) |
Standard Python len |
bool |
bool(x) |
Standard Python bool |
int |
int(x) |
Standard Python int |
str |
str(x) |
Standard Python str |
__builtins__ is always empty -- no access to os, sys, import, etc.__) are blocked at compile timeWhen an ensure expression fails, the server returns:
{
"status": "ensure_failed",
"violations": ["ensure 'result.confidence > 0.7' failed"],
"retries_remaining": 2
}
The agent receives the specific violations and can target its fix accordingly.
Contracts define expected output shapes. Fields specify their type:
contracts:
AnalysisResult:
summary: {type: string}
score: {type: number}
issues: {type: array}
metadata: {type: object}
is_valid: {type: boolean}
When a function or step references an output contract, the server resolves the contract fields and includes them in the step dispatch so the agent knows what shape to produce.
Steps can declare a full JSON Schema for structural validation. This runs before ensure expressions:
steps:
- id: s1
function: do_work
inputs: {text: "$.input.text"}
output_schema:
type: object
required: [done, tests_pass]
properties:
done: {type: boolean}
tests_pass: {type: boolean}
Schema violations are returned as status: "schema_failed" with specific error messages.
Gates are approval checkpoints that pause flow execution until resolved externally.
functions:
approval:
mode: gate
timeout: 3600 # optional — auto-kill after 1 hour
flows:
my_flow:
max_rounds: 3 # optional — limit revise cycles
steps:
- id: work
function: do_work
inputs: {text: "$.input.text"}
- id: review
function: approval
on_approve: ~ # null = complete the flow
on_revise: work # must target a topologically earlier step
on_kill: ~ # null = terminate the flow
ensure, budget, or retriesskip_if or output_schemaon_approve and on_kill (even if null)on_revise must be non-null and must target a topologically earlier stepon_revise must not self-referenceGates are resolved via stratum_gate_resolve or the CLI stratum-mcp gate command:
on_approve target (or completes flow if null)on_revise targeton_kill target (or terminates flow with status: "killed" if null)resolved_by is one of "human", "agent", or "system".
Gate steps can have an auto-resolution policy:
- id: review
function: approval
policy: skip # auto-approve without pausing
policy_fallback: gate # fall back to manual gate if policy fails
on_approve: ~
on_revise: work
on_kill: ~
| Policy | Behavior |
|---|---|
gate |
Default -- pause and wait for resolution |
flag |
Auto-approve, but log a PolicyRecord in the trace |
skip |
Auto-approve silently |
policy_fallback requires policy to be set.
If a gate function has timeout set, stratum_check_timeouts will auto-kill the gate with resolved_by: "system" when the timeout expires.
When a gate is resolved with revise, the current round's trace is archived into state.rounds, the active trace is reset, and the round counter increments. max_rounds on the flow limits how many revise cycles are allowed.
Steps with flow: invoke a sub-flow defined in the same spec. The parent flow suspends until the child completes.
flows:
integration_tests:
input: {project: {type: string}}
output: TestResult
steps:
- id: run
intent: "Run integration tests"
ensure:
- "result.all_passed == True"
deploy:
input: {project: {type: string}}
output: DeployResult
steps:
- id: test
flow: integration_tests
inputs: {project: "$.input.project"}
on_fail: rollback
- id: release
intent: "Deploy to production"
- id: rollback
intent: "Roll back deployment"
FlowState and flow_idchild_audits on the parentagent, retries, model, or budgeton_fail -- Recovery RoutingRoutes to a named recovery step when retries are exhausted (requires ensure or output_schema):
- id: generate
function: generate_code
inputs: {spec: "$.input.spec"}
on_fail: manual_fix
- id: manual_fix
intent: "Fix the generated code manually"
The failed step's output is preserved so the recovery step can access it via $.steps.generate.output.
next -- Success RoutingOverrides linear step advancement on success. Enables review loops:
- id: write
intent: "Write the code"
next: review
- id: review
intent: "Review the code"
ensure:
- "result.approved == True"
on_fail: write
When next routes to a step, that step's attempts are cleared for fresh execution.
Steps can be conditionally skipped based on prior outputs:
- id: deploy
intent: "Deploy to staging"
skip_if: "$.steps.tests.output.all_passed == False"
skip_reason: "Tests failed, skipping deployment"
Skipped steps have their output set to None. Gate steps cannot have skip_if.
skip_if expressions support $ references, Python-style booleans (True, False, None), and YAML-style literals (true, false, null).
Steps with max_iterations support counted sub-loops. The agent iterates until exit_criterion is met or the maximum count is reached.
- id: refine
intent: "Improve the output quality"
max_iterations: 5
exit_criterion: "result.quality >= 0.95"
Iteration workflow:
stratum_iteration_start(flow_id, step_id) -- begins the loopstratum_iteration_report(flow_id, step_id, result) -- reports each iteration; server evaluates exit_criterioniteration_continue or iteration_exitstratum_iteration_abort(flow_id, step_id, reason) -- early exitGate steps cannot use iterations. exit_criterion requires max_iterations. Iteration history is preserved across gate revise cycles in archived_iterations.
An iteration step can accumulate deduped items across rounds and exit when it stops finding
new ones. Declare accumulate (an expression returning the round's item list) and optionally
accumulate_key (a per-item dedup key, binding item):
- id: hunt
intent: "Find issues; keep hunting until two dry rounds"
max_iterations: 20
accumulate: "result.findings" # list extracted from each result
accumulate_key: "item.id" # optional; default dedups on the whole item
exit_criterion: "dry_streak >= 2" # K consecutive zero-new rounds → exit
exit_criterion additionally sees accumulator (the deduped list), accumulated_count,
new_count (items added this round), and dry_streak (consecutive zero-new rounds) — compose
them freely, e.g. "dry_streak >= 2 or accumulated_count >= 50". On exit the deduped set is
merged into the step's authoritative output as accumulated/accumulated_count. A malformed
accumulate/accumulate_key is reported as accumulate_error and freezes dry_streak (a
broken extractor can't fake dryness); accumulator loops are governed solely by exit_criterion
and max_iterations (fingerprint-stagnation does not preempt them). Not valid on gate,
decompose, or parallel_dispatch steps.
Named snapshots of flow state for rollback scenarios.
# During execution:
stratum_commit(flow_id, "after_analysis")
# ... later steps fail ...
stratum_revert(flow_id, "after_analysis")
Checkpoints capture: step_outputs, attempts, records, current_idx, round state, iteration state, and child flow state. They survive server restarts (persisted to disk).
Each function or inline step declares a retry count (default 3 for functions, 1 for inline steps). The server tracks attempts per step.
stratum_step_doneoutput_schema (if declared) -- structural errors firstensure expressionsensure_failed or schema_failed with violations and remaining retry counton_fail is set: routes to recovery stepon_fail: returns retries_exhausted errorFlow state is persisted to ~/.stratum/flows/{flow_id}.json after each state mutation. Flows survive MCP server restarts. Timing fields are reset on restore (step durations may be inaccurate for the resumed step).
On any tool call, if the flow is not in memory, the server attempts to restore it from disk. The spec is re-parsed, steps are re-sorted, and execution resumes from the persisted current_idx.
Workflow declarations make specs discoverable via stratum_list_workflows:
version: "0.2"
workflow:
name: code-review
description: "Three-pass code review: security, logic, performance"
input:
files:
type: array
required: true
depth:
type: string
required: false
default: "standard"
stratum_list_workflows scans a directory for *.stratum.yaml files with workflow: blocks and returns their metadata. Duplicate workflow names are reported as errors.
The task compiler converts spec-kit task files (tasks/*.md) into .stratum.yaml flows.
# Task: [P] Implement authentication
Add JWT-based authentication to the API.
## Acceptance Criteria
- [ ] file src/auth/middleware.ts exists
- [ ] file src/auth/middleware.ts contains "verifyToken"
- [ ] tests pass
- [ ] no lint errors
- [ ] Error messages are user-friendly
[P] in the title marks the task as parallelizablefile X exists compiles to file_exists("X")file X contains Y compiles to file_contains("X", "Y")tests pass compiles to result.tests_pass == Trueno lint errors compiles to result.lint_clean == Trueintent[P]) share the same predecessor with no edges between themstratum-mcp compile tasks/ --output flow.stratum.yaml --flow my_flow
Or via MCP tool: stratum_compile_speckit(tasks_dir, flow_name).
Eleven skills are installed by stratum-mcp install:
| Skill | Purpose |
|---|---|
/stratum-onboard |
Read a new codebase cold, write project-specific MEMORY.md |
/stratum-plan |
Design a feature, present for review -- no implementation |
/stratum-feature |
Full feature build: read patterns, design, implement, test |
/stratum-review |
Three-pass code review: security, logic, performance |
/stratum-debug |
Hypothesis-driven debugging with elimination |
/stratum-refactor |
File splitting with planned extraction order |
/stratum-migrate |
Rewrite bare LLM calls as @infer + @contract |
/stratum-test |
Write test suite for untested code (golden flows, error-path harness) |
/stratum-learn |
Extract patterns from session transcripts into MEMORY.md |
/stratum-build |
Compile tasks, drive execution via stratum_plan loop |
/stratum-speckit |
Bridge skill for spec-kit phase execution |
Each skill reads project-specific patterns from MEMORY.md before writing its spec and writes new patterns after stratum_audit.
stratum-mcp # Start stdio MCP server (for Claude Code)
stratum-mcp install # Configure Claude Code project for Stratum
stratum-mcp uninstall # Remove Stratum configuration
stratum-mcp uninstall --keep-skills # Remove config but keep skill files
stratum-mcp validate <file> # Validate a .stratum.yaml spec
stratum-mcp compile <dir> # Compile tasks/*.md to .stratum.yaml
--output <file> # Write to file instead of stdout
--flow <name> # Flow name (default: "tasks")
stratum-mcp query flows # List all persisted flows (JSON)
stratum-mcp query flow <id> # Full state for a single flow (JSON)
stratum-mcp query gates # List all pending gate steps (JSON)
stratum-mcp gate approve <flow_id> <step_id> [--note "reason"]
stratum-mcp gate reject <flow_id> <step_id> [--note "reason"]
stratum-mcp gate revise <flow_id> <step_id> [--note "reason"]
--resolved-by human|agent|system # Who resolved (default: human)
stratum-mcp help # Show help
Persisted flows are stored in ~/.stratum/flows/{flow_id}.json. This directory is created automatically.
Hooks are installed to ~/.stratum/hooks/ and registered in .claude/settings.json:
| Hook Event | Script | Behavior |
|---|---|---|
SessionStart |
stratum-session-start.sh |
Inject relevant MEMORY.md entries |
Stop |
stratum-session-stop.sh |
Append session summary to MEMORY.md |
PostToolUseFailure |
stratum-post-tool-failure.sh |
Record ensure failures and tool errors |
The MCP server is registered in .claude/mcp.json:
{
"mcpServers": {
"stratum": {
"command": "stratum-mcp"
}
}
}
The execution model block appended to CLAUDE.md instructs the agent to use Stratum for non-trivial tasks:
## Stratum Execution Model
For non-trivial tasks, use Stratum internally:
1. Write a .stratum.yaml spec -- never show it to the user
2. Call stratum_plan to validate and get the first step
3. Narrate progress in plain English as you execute each step
4. Call stratum_step_done after each step -- the server checks your work
5. If a step fails postconditions, fix it silently and retry
6. Call stratum_audit at the end and include the trace in the commit
The stratum-py library provides decorators for building production LLM systems directly in Python. This is independent of the MCP server.
from stratum import contract, infer, compute, flow, Budget
@contract
class SentimentResult(BaseModel):
label: Literal["positive", "negative", "neutral"]
confidence: float
reasoning: str
@infer(
intent="Classify the emotional tone of customer feedback",
ensure=lambda r: r.confidence > 0.7,
model="groq/llama-3.3-70b-versatile",
budget=Budget(ms=8000, usd=0.01),
retries=3,
)
def classify_sentiment(text: str) -> SentimentResult: ...
@compute
def format_result(result: SentimentResult) -> str:
return f"[{result.label}] {result.confidence:.0%}"
@flow(budget=Budget(ms=30000, usd=0.05))
async def analyse_batch(texts: list[str]) -> list[SentimentResult]:
return [await classify_sentiment(text=t) for t in texts]
| Feature | Description |
|---|---|
@infer / @compute |
Identical type signatures -- swap without downstream changes |
@contract |
Pydantic BaseModel compiled to JSON Schema with content hash |
@flow |
Async flow wrapper with Budget and ContextVar-scoped flow_id |
@refine |
Convergence loop -- iterates until until(result) passes |
parallel(require=) |
"all" / "any" / N / 0 modes via asyncio.TaskGroup |
debate() |
Multi-agent structured argumentation with synthesizer |
await_human() |
HITL gate -- suspends flow until ReviewSink resolves |
quorum= |
N parallel calls with agreement threshold |
stable= |
Probabilistic output wrapping (Probabilistic[T]) |
opaque[T] |
Prompt injection protection -- excluded from tool-call schema |
Budget(ms=, usd=, tokens=) |
Hard time + cost + token limits |
| OTLP trace export | Built-in emitter, no OTel SDK dependency |
| Exception | Trigger |
|---|---|
PostconditionFailed |
ensure violations after all retries |
PreconditionFailed |
given condition false before LLM call |
ParseFailure |
LLM output cannot be parsed against contract |
BudgetExceeded |
Time or cost budget exceeded |
ConvergenceFailure |
@refine exhausted max_iterations |
ConsensusFailure |
quorum could not reach threshold agreement |
HITLTimeoutError |
await_human wall-clock timeout |
StabilityAssertionError |
Probabilistic[T].assert_stable() below threshold |
StratumCompileError |
Static violations at decoration time |
Working examples in examples/:
| File | What it demonstrates |
|---|---|
| 01_sentiment.py | @infer + @contract + @flow + @compute end-to-end |
| 02_migrate.py | Migrating @infer to @compute without changing callers |
| 03_parallel.py | Three concurrent @infer calls with parallel(require="all") |
| 04_refine.py | @refine convergence loop until quality passes |
| 05_debate.py | debate() -- two agents argue, synthesizer resolves |
| 06_hitl.py | await_human -- human-in-the-loop approval gate |
git clone https://github.com/smartmemory/stratum
cd stratum
git config core.hooksPath .githooks
cd stratum-mcp
pip install -e ".[dev]"
pytest tests/
pip install -e ".[dev]"
pytest tests/
The MCP server has 418+ tests across contracts, invariants, integration, and end-to-end suites. The Python library has 321+ tests including real LLM integration tests.
PyPI publishing runs automatically via GitHub Actions when pyproject.toml changes on main. Required secrets:
PYPI_TOKEN_MCP -- scoped to stratum-mcpPYPI_TOKEN_PY -- scoped to stratum-pyВыполни в терминале:
claude mcp add stratum-mcp-server -- npx Безопасность
Низкий рискАвтоматическая эвристика по публичным данным — не гарантия безопасности.