Skip to content

Resolver SDK

The amplifier-resolver-sdk is how you build resolvers. It handles the entire communication layer between your code and the Resolve platform — JSON-RPC transport, event emission, state management, input requests, reality checks — so you only write run().

from amplifier_resolver_sdk import Resolver, ResolverResult
class MyResolver(Resolver):
async def run(self, instance) -> ResolverResult:
await self.emit("resolver:phase", {"name": "working"})
# ... your logic here ...
return ResolverResult(status="completed", summary="done")
if __name__ == "__main__":
MyResolver().serve()

That’s the whole surface. Everything else is handled for you.


The SDK is installed into your resolver’s worker container via manifest.json. It is not published to PyPI — install directly from GitHub:

Terminal window
# With uv (recommended — what the platform uses)
uv pip install 'amplifier-resolver-sdk @ git+https://github.com/microsoft/amplifier-resolver-sdk@main'
# With pip
pip install 'amplifier-resolver-sdk @ git+https://github.com/microsoft/amplifier-resolver-sdk@main'

In manifest.json setup commands (the standard pattern):

{
"container": {
"setup_commands": [
"if [ -d /opt/amplifier-resolver-sdk ]; then
uv pip install --reinstall --no-cache --python /opt/uv-tools/amplifier/bin/python /opt/amplifier-resolver-sdk;
else
GIT_TERMINAL_PROMPT=0 uv pip install --python /opt/uv-tools/amplifier/bin/python 'amplifier-resolver-sdk @ git+https://github.com/microsoft/amplifier-resolver-sdk@main';
fi"
]
}
}

The if [ -d /opt/amplifier-resolver-sdk ] guard enables local development overrides — the platform mounts a local SDK copy there during dev. In production it falls through to the GitHub install.


Import and subclass Resolver. Override the methods you need. At minimum, implement run().

from amplifier_resolver_sdk import Resolver, ResolverResult
class MyResolver(Resolver):
# ── Required properties ─────────────────────────────────────────
name = "my-resolver"
version = "1.0.0"
description = "Does the thing"
supports_resume = False
# ── Required: implement run() ───────────────────────────────────
async def run(self, instance) -> ResolverResult:
...
# ── Optional overrides ──────────────────────────────────────────
def instantiation_schema(self, params=None): # A2UI form schema
...
def workspace_spec(self, params, available_credentials=None): # repos to clone
...
def container_setup(self, params): # extra setup commands/bundles
...
def message_types(self, instance_state): # available message types
...
async def on_input_response(self, config, request_id, payload):
...
async def on_message(self, config, message_type, payload):
...
async def stop(self, reason):
...

The base class implements all the JSON-RPC dispatch. You never deal with the transport directly.


Always call serve() as the entry point. The SDK:

  1. Redirects sys.stdout to JSON-RPC (your print() calls go to stderr — see below)
  2. Sets up the NDJSON transport on stdin/stdout
  3. Dispatches incoming JSON-RPC calls to your methods
  4. Blocks until the platform signals shutdown
if __name__ == "__main__":
MyResolver().serve()

Emit structured events to the instance’s event stream. This is the primary way your resolver communicates progress, phase transitions, and artifacts to consumers.

# Phase announcement
await self.emit("resolver:phase", {"name": "planning"})
# Progress update
await self.emit("resolver:progress", {
"milestone": "implementing 3 of 5 endpoints",
"fraction": 0.6,
})
# Artifact (PR, branch, file, summary, verdict...)
await self.emit("resolver:artifact", {
"kind": "pr",
"value": "https://github.com/myorg/myrepo/pull/42",
"metadata": {"branch": "resolve/add-ping", "commits": 3},
})
# Completion (REQUIRED on every exit path)
await self.emit("resolver:completed", {
"outcome": "success",
"summary": "PR created: https://github.com/myorg/myrepo/pull/42",
"duration_s": 847.2,
"artifact_count": 1,
})

Events are written to events.jsonl immediately (flushed on write) and mirrored to all SSE consumers by the platform monitor.

See Event Vocabulary for the full resolver:* taxonomy.


Two methods for writing state.json — the point-in-time snapshot of the instance:

self.update_state(**kwargs) — full overwrite

Section titled “self.update_state(**kwargs) — full overwrite”

Overwrites state.json entirely. Every key not in kwargs is lost.

# Safe: writing the complete state
self.update_state(
status="running",
phase="implementing",
plan_steps=5,
completed_steps=2,
)

self.merge_state(**kwargs) — read-modify-write

Section titled “self.merge_state(**kwargs) — read-modify-write”

Reads the current state.json, overlays kwargs, writes back. Safe for partial updates.

# Safe: updating one field without losing the rest
self.merge_state(phase="implementing") # preserves plan_steps
self.merge_state(completed_steps=3) # preserves phase
# Pattern: use update_state for initial state, merge_state for incremental updates
self.update_state(
status="running",
phase="planning",
plan_steps=0,
)
for step in plan:
# work...
self.merge_state(completed_steps=step.index)

state.json is written atomically via os.replace() — consumers never see a partial write.


Pause the instance and request human input with a structured A2UI form:

# Inside run() — request a decision at a gate
decision_future = await self.request_input(
request_id="build-gate",
prompt="Build failed after 3 attempts. How should we proceed?",
schema={
"type": "form",
"components": [
{
"type": "select",
"id": "decision",
"label": "Decision",
"options": [
{"value": "retry", "label": "Retry"},
{"value": "abort", "label": "Abort"},
{"value": "skip_tests", "label": "Skip tests and deliver"},
],
"checks": [
{
"condition": {"call": "required", "args": [{"path": "/decision"}]},
"message": "Please select a decision"
}
]
}
]
}
)
# The instance transitions to awaiting_input
# Block until the consumer responds via POST /instances/{id}/input-requests/build-gate
payload = await decision_future
decision = payload["decision"]
if decision == "retry":
# retry logic
elif decision == "abort":
return ResolverResult(status="failed", error="User aborted at build gate")

request_input() returns an asyncio.Future that resolves when the consumer responds. The platform also calls your on_input_response() method — you can use either the future or the callback, but not both for the same request.

ID uniqueness: request_id must be unique within the instance. Use a counter or descriptive slug like "build-gate-attempt-3".


Invoke a clean-room verification of your software from within run():

# Start a reality check
handle = await self.start_reality_check(
acceptance_criteria={
"spec": "GET /api/ping returns 200 with body {\"pong\": true}",
"checks": [
{
"method": "GET",
"path": "/api/ping",
"expected_status": 200,
"expected_body_contains": '"pong"',
}
],
},
software_path="https://github.com/myorg/myrepo",
environment={"language": "python", "type": "fastapi", "port": 8000},
dtu_lifecycle="auto", # destroy on pass; preserve on fail
timeout_seconds=1200,
)
# Optionally stream progress while waiting
async for event in self.reality_check_events(handle):
await self.emit("resolver:phase", {
"name": f"rc:{event['event_type'].split('.')[-1]}"
})
# Block until verdict
result = await self.wait_for_reality_check(handle)
# result.verdict: "pass" | "fail" | "partial" | None (infrastructure failure)
# result.failure_mode: None | "software" | "infrastructure" | "config" | "timeout" | "cancelled"
# result.report: dict with criterion-by-criterion results
if result.verdict == "pass":
await self.emit("resolver:artifact", {"kind": "verdict", "value": "pass"})
# proceed to delivery
else:
# handle failure

See Reality Check for the full reference.


The session_factory parameter in run() provides LLM sessions. Each session is an Amplifier agent session with the configured bundle, tools, and provider.

async def run(self, config, session_factory, emitter, *, resume=False):
# Create a named session
session = await session_factory.create(name="plan-the-work")
# Run a prompt and get the response
response = await session.run(
prompt=f"Analyze this spec and produce an implementation plan:\n\n{config.params['spec']}"
)
plan_text = response.output

Sessions are independent — each has its own context window. Create a session per phase or logical unit of work. Sessions are cheap to create; the bundle setup cost is paid once and amortized across all sessions created from the same factory.


The SDK communicates with the platform host over JSON-RPC 2.0 on stdio (NDJSON-framed — one JSON object per line). You never interact with this directly; it’s documented here for transparency and debugging.

sequenceDiagram participant H as Platform Host participant R as Resolver Process (stdio) H->>R: resolver.initialize(config) Note right of R: loads InstanceConfig<br/>sets up SDK internals R-->>H: {status: "ready"} H->>R: resolver.run({resume: false}) R-->>H: {status: "started"} activate R Note over R: run() executes… loop Notifications (fire-and-forget, no id) R-->>H: emit resolver:phase {name} R-->>H: emit resolver:artifact {kind, value} R-->>H: update state.json end H->>R: resolver.inputResponse({request_id, payload}) R-->>H: null H->>R: resolver.getMessageTypes({instance_state}) R-->>H: [{name, label, schema}] H->>R: resolver.stop({reason}) R-->>H: {acknowledged: true} deactivate R
JSON-RPC 2.0 over stdio — requests have an id, notifications (events) do not

RPC methods the platform calls on the resolver:

Method When Timeout
resolver.initialize(config) Before run — send instance config 30s
resolver.run({resume}) Start or resume None (long-running)
resolver.stop({reason}) Graceful stop requested 30s
resolver.inputResponse({request_id, payload}) User responded to input request 5s
resolver.message({message_type, payload}) User sent a message 5s
resolver.getInstantiationSchema({params}) Before instance creation 10s
resolver.getWorkspaceSpec({params}) Before container setup 10s
resolver.getMessageTypes({instance_state}) On GET /message-types 5s

Notifications the resolver sends to the platform (no response expected):

  • Events, state updates, input request signals — all sent as JSON-RPC notifications with no id field

"""minimal_resolver/__main__.py — complete working example."""
import logging
import sys
from amplifier_resolver_sdk import Resolver, ResolverResult
logger = logging.getLogger(__name__)
class MinimalResolver(Resolver):
name = "minimal"
version = "0.1.0"
description = "Minimal resolver — for documentation purposes"
supports_resume = False
def instantiation_schema(self, params=None):
return {
"type": "form",
"components": [
{
"type": "textarea",
"id": "task",
"label": "Task",
"placeholder": "What should I do?",
"checks": [
{
"condition": {"call": "required", "args": [{"path": "/task"}]},
"message": "Task is required"
}
]
}
]
}
async def run(self, config, session_factory, emitter, *, resume=False):
task = config.params.get("task", "no task given")
logger.info("Running task: %s", task) # safe: goes to stderr
try:
await self.emit("resolver:created", {
"resolver_name": self.name,
"resolver_version": self.version,
"instance_id": config.instance_id,
"params": {"task": task},
"capabilities_granted": [],
})
await self.emit("resolver:phase", {"name": "working"})
self.update_state(status="running", phase="working")
# Spawn an LLM session
session = await session_factory.create(name="main")
response = await session.run(prompt=f"Complete this task: {task}")
await self.emit("resolver:artifact", {
"kind": "summary",
"value": response.output,
})
await self.emit("resolver:completed", {
"outcome": "success",
"summary": "Task completed",
"duration_s": 0.0,
"artifact_count": 1,
})
return ResolverResult(status="completed", output=response.output)
except Exception as e:
logger.exception("Resolver failed")
await self.emit("resolver:completed", {
"outcome": "failed",
"summary": str(e),
"duration_s": 0.0,
"artifact_count": 0,
"error": {"code": "unexpected_error", "detail": str(e)},
})
return ResolverResult(status="failed", error=str(e))
if __name__ == "__main__":
MinimalResolver().serve()