Resolver SDK
The amplifier-resolver-sdk is how you build resolvers. It handles the
entire communication layer between your code and the Resolve platform — JSON-RPC
transport, event emission, state management, input requests, reality checks — so
you only write run().
from amplifier_resolver_sdk import Resolver, ResolverResult
class MyResolver(Resolver): async def run(self, instance) -> ResolverResult: await self.emit("resolver:phase", {"name": "working"}) # ... your logic here ... return ResolverResult(status="completed", summary="done")
if __name__ == "__main__": MyResolver().serve()That’s the whole surface. Everything else is handled for you.
Installation
Section titled “Installation”The SDK is installed into your resolver’s worker container via manifest.json.
It is not published to PyPI — install directly from GitHub:
# With uv (recommended — what the platform uses)uv pip install 'amplifier-resolver-sdk @ git+https://github.com/microsoft/amplifier-resolver-sdk@main'
# With pippip install 'amplifier-resolver-sdk @ git+https://github.com/microsoft/amplifier-resolver-sdk@main'In manifest.json setup commands (the standard pattern):
{ "container": { "setup_commands": [ "if [ -d /opt/amplifier-resolver-sdk ]; then uv pip install --reinstall --no-cache --python /opt/uv-tools/amplifier/bin/python /opt/amplifier-resolver-sdk; else GIT_TERMINAL_PROMPT=0 uv pip install --python /opt/uv-tools/amplifier/bin/python 'amplifier-resolver-sdk @ git+https://github.com/microsoft/amplifier-resolver-sdk@main'; fi" ] }}The if [ -d /opt/amplifier-resolver-sdk ] guard enables local development
overrides — the platform mounts a local SDK copy there during dev. In production it
falls through to the GitHub install.
Resolver base class
Section titled “Resolver base class”Import and subclass Resolver. Override the methods you need. At minimum, implement
run().
from amplifier_resolver_sdk import Resolver, ResolverResult
class MyResolver(Resolver):
# ── Required properties ───────────────────────────────────────── name = "my-resolver" version = "1.0.0" description = "Does the thing" supports_resume = False
# ── Required: implement run() ─────────────────────────────────── async def run(self, instance) -> ResolverResult: ...
# ── Optional overrides ────────────────────────────────────────── def instantiation_schema(self, params=None): # A2UI form schema ...
def workspace_spec(self, params, available_credentials=None): # repos to clone ...
def container_setup(self, params): # extra setup commands/bundles ...
def message_types(self, instance_state): # available message types ...
async def on_input_response(self, config, request_id, payload): ...
async def on_message(self, config, message_type, payload): ...
async def stop(self, reason): ...The base class implements all the JSON-RPC dispatch. You never deal with the transport directly.
serve() — the entry point
Section titled “serve() — the entry point”Always call serve() as the entry point. The SDK:
- Redirects
sys.stdoutto JSON-RPC (yourprint()calls go to stderr — see below) - Sets up the NDJSON transport on stdin/stdout
- Dispatches incoming JSON-RPC calls to your methods
- Blocks until the platform signals shutdown
if __name__ == "__main__": MyResolver().serve()self.emit() — event emission
Section titled “self.emit() — event emission”Emit structured events to the instance’s event stream. This is the primary way your resolver communicates progress, phase transitions, and artifacts to consumers.
# Phase announcementawait self.emit("resolver:phase", {"name": "planning"})
# Progress updateawait self.emit("resolver:progress", { "milestone": "implementing 3 of 5 endpoints", "fraction": 0.6,})
# Artifact (PR, branch, file, summary, verdict...)await self.emit("resolver:artifact", { "kind": "pr", "value": "https://github.com/myorg/myrepo/pull/42", "metadata": {"branch": "resolve/add-ping", "commits": 3},})
# Completion (REQUIRED on every exit path)await self.emit("resolver:completed", { "outcome": "success", "summary": "PR created: https://github.com/myorg/myrepo/pull/42", "duration_s": 847.2, "artifact_count": 1,})Events are written to events.jsonl immediately (flushed on write) and mirrored to
all SSE consumers by the platform monitor.
See Event Vocabulary for the full resolver:* taxonomy.
State management
Section titled “State management”Two methods for writing state.json — the point-in-time snapshot of the instance:
self.update_state(**kwargs) — full overwrite
Section titled “self.update_state(**kwargs) — full overwrite”Overwrites state.json entirely. Every key not in kwargs is lost.
# Safe: writing the complete stateself.update_state( status="running", phase="implementing", plan_steps=5, completed_steps=2,)self.merge_state(**kwargs) — read-modify-write
Section titled “self.merge_state(**kwargs) — read-modify-write”Reads the current state.json, overlays kwargs, writes back. Safe for partial
updates.
# Safe: updating one field without losing the restself.merge_state(phase="implementing") # preserves plan_stepsself.merge_state(completed_steps=3) # preserves phase# Pattern: use update_state for initial state, merge_state for incremental updatesself.update_state( status="running", phase="planning", plan_steps=0,)
for step in plan: # work... self.merge_state(completed_steps=step.index)state.json is written atomically via os.replace() — consumers never see a
partial write.
Input requests
Section titled “Input requests”Pause the instance and request human input with a structured A2UI form:
# Inside run() — request a decision at a gatedecision_future = await self.request_input( request_id="build-gate", prompt="Build failed after 3 attempts. How should we proceed?", schema={ "type": "form", "components": [ { "type": "select", "id": "decision", "label": "Decision", "options": [ {"value": "retry", "label": "Retry"}, {"value": "abort", "label": "Abort"}, {"value": "skip_tests", "label": "Skip tests and deliver"}, ], "checks": [ { "condition": {"call": "required", "args": [{"path": "/decision"}]}, "message": "Please select a decision" } ] } ] })
# The instance transitions to awaiting_input# Block until the consumer responds via POST /instances/{id}/input-requests/build-gatepayload = await decision_future
decision = payload["decision"]if decision == "retry": # retry logicelif decision == "abort": return ResolverResult(status="failed", error="User aborted at build gate")request_input() returns an asyncio.Future that resolves when the consumer
responds. The platform also calls your on_input_response() method — you can use
either the future or the callback, but not both for the same request.
ID uniqueness: request_id must be unique within the instance. Use a counter
or descriptive slug like "build-gate-attempt-3".
Reality checks
Section titled “Reality checks”Invoke a clean-room verification of your software from within run():
# Start a reality checkhandle = await self.start_reality_check( acceptance_criteria={ "spec": "GET /api/ping returns 200 with body {\"pong\": true}", "checks": [ { "method": "GET", "path": "/api/ping", "expected_status": 200, "expected_body_contains": '"pong"', } ], }, software_path="https://github.com/myorg/myrepo", environment={"language": "python", "type": "fastapi", "port": 8000}, dtu_lifecycle="auto", # destroy on pass; preserve on fail timeout_seconds=1200,)
# Optionally stream progress while waitingasync for event in self.reality_check_events(handle): await self.emit("resolver:phase", { "name": f"rc:{event['event_type'].split('.')[-1]}" })
# Block until verdictresult = await self.wait_for_reality_check(handle)
# result.verdict: "pass" | "fail" | "partial" | None (infrastructure failure)# result.failure_mode: None | "software" | "infrastructure" | "config" | "timeout" | "cancelled"# result.report: dict with criterion-by-criterion results
if result.verdict == "pass": await self.emit("resolver:artifact", {"kind": "verdict", "value": "pass"}) # proceed to deliveryelse: # handle failureSee Reality Check for the full reference.
Session factory — spawning LLM sessions
Section titled “Session factory — spawning LLM sessions”The session_factory parameter in run() provides LLM sessions. Each session is
an Amplifier agent session with the configured bundle, tools, and provider.
async def run(self, config, session_factory, emitter, *, resume=False): # Create a named session session = await session_factory.create(name="plan-the-work")
# Run a prompt and get the response response = await session.run( prompt=f"Analyze this spec and produce an implementation plan:\n\n{config.params['spec']}" )
plan_text = response.outputSessions are independent — each has its own context window. Create a session per phase or logical unit of work. Sessions are cheap to create; the bundle setup cost is paid once and amortized across all sessions created from the same factory.
The JSON-RPC protocol
Section titled “The JSON-RPC protocol”The SDK communicates with the platform host over JSON-RPC 2.0 on stdio (NDJSON-framed — one JSON object per line). You never interact with this directly; it’s documented here for transparency and debugging.
RPC methods the platform calls on the resolver:
| Method | When | Timeout |
|---|---|---|
resolver.initialize(config) |
Before run — send instance config | 30s |
resolver.run({resume}) |
Start or resume | None (long-running) |
resolver.stop({reason}) |
Graceful stop requested | 30s |
resolver.inputResponse({request_id, payload}) |
User responded to input request | 5s |
resolver.message({message_type, payload}) |
User sent a message | 5s |
resolver.getInstantiationSchema({params}) |
Before instance creation | 10s |
resolver.getWorkspaceSpec({params}) |
Before container setup | 10s |
resolver.getMessageTypes({instance_state}) |
On GET /message-types | 5s |
Notifications the resolver sends to the platform (no response expected):
- Events, state updates, input request signals — all sent as JSON-RPC notifications
with no
idfield
Complete minimal resolver
Section titled “Complete minimal resolver”"""minimal_resolver/__main__.py — complete working example."""import loggingimport sys
from amplifier_resolver_sdk import Resolver, ResolverResult
logger = logging.getLogger(__name__)
class MinimalResolver(Resolver): name = "minimal" version = "0.1.0" description = "Minimal resolver — for documentation purposes" supports_resume = False
def instantiation_schema(self, params=None): return { "type": "form", "components": [ { "type": "textarea", "id": "task", "label": "Task", "placeholder": "What should I do?", "checks": [ { "condition": {"call": "required", "args": [{"path": "/task"}]}, "message": "Task is required" } ] } ] }
async def run(self, config, session_factory, emitter, *, resume=False): task = config.params.get("task", "no task given") logger.info("Running task: %s", task) # safe: goes to stderr
try: await self.emit("resolver:created", { "resolver_name": self.name, "resolver_version": self.version, "instance_id": config.instance_id, "params": {"task": task}, "capabilities_granted": [], })
await self.emit("resolver:phase", {"name": "working"}) self.update_state(status="running", phase="working")
# Spawn an LLM session session = await session_factory.create(name="main") response = await session.run(prompt=f"Complete this task: {task}")
await self.emit("resolver:artifact", { "kind": "summary", "value": response.output, })
await self.emit("resolver:completed", { "outcome": "success", "summary": "Task completed", "duration_s": 0.0, "artifact_count": 1, }) return ResolverResult(status="completed", output=response.output)
except Exception as e: logger.exception("Resolver failed") await self.emit("resolver:completed", { "outcome": "failed", "summary": str(e), "duration_s": 0.0, "artifact_count": 0, "error": {"code": "unexpected_error", "detail": str(e)}, }) return ResolverResult(status="failed", error=str(e))
if __name__ == "__main__": MinimalResolver().serve()