Python SDK reference
Complete reference for the ax parameter in Python nodes: structured logging with ax.log, secrets with ax.secrets, agent memory with ax.agent.memory, and flow reflection with ax.reflection.flow.
View as MarkdownEvery Python node handler receives the platform through a single parameter:
ax, typed as AxiomContext. This page is the complete reference for that
surface — structured logging, secrets, agent memory, flow reflection, and
flow mutation. There is no pip package to install: the Axiom CLI generates
the AxiomContext interface into your package at gen/axiom_context.py
(regenerated by the CLI; do not edit), and every capability reaches the
platform through the sidecar — node code never calls platform services
directly (see sandboxing and tenancy).
Prerequisites. Examples on this page assume a Python package scaffolded
with axiom init demo/chat-tools --language python, two messages created
with axiom create message ChatRequest --fields "session_id:string; text:string" and axiom create message ChatReply --fields "text:string",
and a node created with axiom create node AnswerQuestion --input ChatRequest --output ChatReply (see
create a node in Python).
Handler signatures
A unary Python node is one function in nodes/. The function name is the
snake_case of the node name, the handler file is nodes/<snake_name>.py,
and the test file is nodes/<snake_name>_test.py. Local message types come
from the generated gen.messages_pb2 module:
# nodes/answer_question.py — generated shape of a unary Python node
from gen.messages_pb2 import ChatRequest, ChatReply
from gen.axiom_context import AxiomContext
def answer_question(ax: AxiomContext, input: ChatRequest) -> ChatReply:
"""Answers a chat message."""
return ChatReply()The handler may also be declared async def — the platform detects an
awaitable result and runs it to completion. Declare your handler async def
whenever you await the ax.agent.memory APIs.
A pipeline node (created with --type pipeline) is a generator: it takes an
iterator of input frames and yields output frames. For the entry node of a
flow running in pipeline mode, the iterator yields exactly one item:
# nodes/stream_replies.py — generated shape of a Python pipeline node
from typing import Iterator
from gen.messages_pb2 import ChatRequest, ChatReply
from gen.axiom_context import AxiomContext
def stream_replies(ax: AxiomContext, inputs: Iterator[ChatRequest]) -> Iterator[ChatReply]:
"""Streams replies for each incoming request frame."""
for inp in inputs:
yield ChatReply(text=inp.text)The handler's full source — docstring included — is captured at publish
time and shown in the Axiom registry's View source panel, and it feeds
the AI-generated description displayed on the node's card. Replace the
generated placeholder docstring before pushing, and set the node's
description field in axiom.yaml (or pass --description to
axiom create node) to state the description directly.
AxiomContext at a glance
| Attribute | Type | Purpose |
|---|---|---|
ax.log | AxiomLogger | Structured logger for this invocation — use instead of print() |
ax.secrets | AxiomSecrets | Read-only access to tenant secrets |
ax.agent.memory | AxiomAgentMemory | Durable agent memory, scoped to flow and tenant |
ax.reflection.flow | AxiomFlowReflection | Read-only view of the running flow graph and current position |
ax.mutation.flow | AxiomMutationFlow | Append nodes and edges to the running flow (mutation-capable nodes only) |
ax.execution_id | str | ID of the current execution, injected by the platform |
New platform capabilities are added to AxiomContext — node function
signatures never change to accommodate them.
Logging with ax.log
ax.log has four methods, each taking a message string plus arbitrary
keyword attributes:
# nodes/answer_question.py — inside the handler body
ax.log.debug("raw input", size=len(input.text))
ax.log.info("answering", session_id=input.session_id)
ax.log.warn("input truncated", limit=4096)
ax.log.error("model call failed", attempt=3)In local development (axiom dev) the logger writes concise plain text to
the terminal. In production it writes JSON records carrying level, msg,
trace_id, span_id, execution_id, and the node name plus your keyword
attributes — making every log line searchable by execution and linkable to
its distributed trace. Use ax.log instead of print() for anything you
want visible in production.
Each invocation receives its own logger instance pre-configured with that invocation's trace context — never share a logger across calls, and never construct one yourself.
Reading secrets with ax.secrets
ax.secrets.get(name) returns a (value, ok) tuple: (value, True) when
the named secret is present, ("", False) otherwise. Values are plaintext
strings; the platform handles encryption and decryption.
# nodes/answer_question.py — inside the handler body
api_key, ok = ax.secrets.get("OPENAI_KEY")
if not ok:
raise ValueError("secret OPENAI_KEY is not registered for this tenant")Declare every secret name the node reads under the node's
required_secrets list in axiom.yaml so users know what to register
before invoking. axiom validate scans handler source for
ax.secrets.get("NAME") calls and warns (without failing) when a
referenced name is missing from required_secrets; the scan is best-effort
and cannot see dynamically constructed names. Secrets are registered per
tenant in the console — see
manage secrets in a flow.
Agent memory with ax.agent.memory
All memory read and write methods are coroutines — await them, and
declare the handler async def. (session(...) itself is a plain call —
no await.) Memory is scoped to the flow and tenant; you never pass tenant
or flow identifiers, and any a node supplies are ignored by the sidecar.
For what the tiers mean and how consolidation works, see
agent memory.
Flow-scoped operations
await ax.agent.memory.search(query, limit=5)— semantic search over the flow's memories; returns a list ofMemoryEntry.await ax.agent.memory.write(content, importance=0.5)— store a flow-scoped fact; returns the new memory's ID.
Session operations
ax.agent.memory.session(session_id) addresses one session (the ID comes
from your input message — the platform never infers it). The session object
provides:
await session.history.last(n)— the most recentnconversation turns, as a list ofConversationTurn.await session.history.append(role=..., content=...)— record a turn; both arguments are keyword-only, androleis one ofuser,assistant,tool, orsystem.await session.search(query, limit=5)/await session.write(content, importance=0.5)— session-scoped memory entries.await session.end()— formally close the session and trigger consolidation.
Complete example
# nodes/answer_question.py
from gen.messages_pb2 import ChatRequest, ChatReply
from gen.axiom_context import AxiomContext
async def answer_question(ax: AxiomContext, input: ChatRequest) -> ChatReply:
"""Answers a chat message using session history and flow memory."""
session = ax.agent.memory.session(input.session_id)
recent = await session.history.last(20)
facts = await ax.agent.memory.search(input.text, limit=5)
ax.log.info("context loaded", turns=len(recent), facts=len(facts))
reply = f"You said: {input.text}" # replace with your model call
await session.history.append(role="user", content=input.text)
await session.history.append(role="assistant", content=reply)
return ChatReply(text=reply)Failure behavior
Memory operations never raise. When the memory service is unavailable,
reads return empty lists, writes return an empty string, and append /
end are no-ops — your node keeps running without memory rather than
failing the execution.
Memory data types
search returns MemoryEntry objects; history.last returns
ConversationTurn objects.
MemoryEntry
| Field | Type | Meaning |
|---|---|---|
id | str | Memory entry ID |
content | str | The stored statement |
scope_level | str | tenant, flow, or session |
memory_type | str | episodic, semantic, or procedural |
importance | float | Importance score |
confidence | float | Confidence score |
score | float | Relevance score — populated on retrieval only |
created_at | int | Unix milliseconds |
ConversationTurn
| Field | Type | Meaning |
|---|---|---|
id | str | Turn ID |
session_id | str | Session the turn belongs to |
role | str | user, assistant, tool, or system |
content | str | Turn text |
created_at | int | Unix milliseconds |
tool_name | str | Tool name, for tool turns |
tool_call_id | str | Tool call correlation ID |
Inspecting the running flow with ax.reflection.flow
ax.reflection.flow is a read-only view of the flow's compiled graph and
the current invocation's position in it. Five properties:
nodes—list[ReflectionNode], every node placement in the graph.edges—list[ReflectionEdge], the forward edges.loop_edges—list[ReflectionEdge], the loop-back edges.position— aFlowPositionfor the current invocation.graph_id— the artifact ID of the graph this node runs in (the sub-flow's own ID when running inside a sub-flow).
# nodes/answer_question.py — inside the handler body
pos = ax.reflection.flow.position
downstream = [e for e in ax.reflection.flow.edges
if e.src_instance == pos.current_instance]
iteration = pos.loop_iterations.get(pos.current_instance, 0)
ax.log.info("graph view",
nodes=len(ax.reflection.flow.nodes),
downstream=len(downstream),
iteration=iteration,
graph_id=ax.reflection.flow.graph_id)If the platform does not supply reflection data for an invocation, the view
degrades gracefully: the lists are empty, graph_id is "", and
position is all zeros — reading reflection never raises.
Reflection data types
ReflectionNode — one node placement: instance_id (int), node_ulid
(str), name (str), package_name (str), package_version (str),
node_type (node, subflow, or pipeline), input_message_name and
output_message_name (fully-qualified Protocol Buffers message names), and
canvas_node_id (str).
ReflectionEdge — one edge: src_instance (int), dst_instance
(int), canvas_edge_id (str), has_condition (bool), has_adapter
(bool), max_iterations (int — meaningful only on entries from
loop_edges), and condition_summary (a ConditionSummary when the edge
is conditional, else None). has_condition and has_adapter are
structural flags only — the compiled adapter recipe is not exposed.
ConditionSummary — an agent-readable digest of a conditional edge's
dispatch predicate: field (str), op (str), and operands
(list[str]). For example field="tools", op="EQ",
operands=["ToolX"] means the edge fires when "ToolX" is in the source
output's tools field — enough for a node to make idempotent decisions
(such as skipping a tool that is already wired) without parsing the
compiled condition.
FlowPosition — where this invocation sits: current_instance (int),
depth (int, 0 at the root flow), loop_iterations (dict[int, int]
keyed by the loop's destination instance), and subflow_stack_graph_ids
(list[str], ordered root → immediate parent).
Adding to the running flow with ax.mutation.flow
Nodes declared with mutation_capable: true in axiom.yaml (default
false) may append nodes and edges to the running flow during their
handler:
ax.mutation.flow.add_node(package, version, canvas_position=None)— buffer a new node placement; returns theintinstance ID to use as the endpoint of subsequentadd_edgecalls in the same invocation.ax.mutation.flow.add_edge(src_instance, dst_instance, condition=None)— buffer an edge between two instance IDs (existing or just added). Passcondition={"op": "EQ", "field": "tools", "value": "ToolX"}to make the edge fire only when the predicate holds on the source node's output (on a repeated field,EQhas membership semantics); omitconditionfor an unconditional edge.
# nodes/plan_tools.py — inside a mutation-capable handler body
search_iid = ax.mutation.flow.add_node(
package="axiom/search-tools", version="0.4.0",
)
ax.mutation.flow.add_edge(
src_instance=ax.reflection.flow.position.current_instance,
dst_instance=search_iid,
)Calls buffer locally and are applied by the platform after the handler
returns. If the platform rejects a buffered mutation, the SDK surfaces an
AxiomMutationError whose .message attribute holds the human-readable
reason.
Testing nodes that use AxiomContext
axiom create node generates nodes/<snake_name>_test.py containing a
_TestContext class — a minimal AxiomContext with a silent logger,
secrets served from a dict, no-op memory, and execution_id set to a test
value. Supply secrets your node reads via
secrets_map:
# nodes/answer_question_test.py — replace the generated test function with:
import asyncio
def test_answer_question():
ax = _TestContext(secrets_map={"OPENAI_KEY": "sk-test"})
input_msg = ChatRequest(session_id="s1", text="hi")
result = asyncio.run(answer_question(ax, input_msg))
assert isinstance(result, ChatReply)
assert result.text == "You said: hi"A plain def handler is called directly instead — the generated test stub
already does that; asyncio.run is needed only for async def handlers.
The _TestContext memory operations are awaitable no-ops, so handlers that
use ax.agent.memory run unchanged in unit tests.
Run tests with axiom test, which uses pytest for Python packages. Tests
run again inside the publish build, and a failing test fails the push.
axiom validate warns (without blocking) when a node has no test — a node
with zero tests would otherwise pass silently. Assert output field values
meaningfully, not just the return type.