Resolver Protocol
The Resolver Protocol is the contract between the Resolve platform and every resolver plugin. Implement all 12 members and your resolver integrates with the full platform — instance creation, live events, input requests, user messages, pause/resume, and graceful stop.
from amplifier_resolve.core.resolver import Resolver, ResolverResultThe SDK provides a base class that implements the boilerplate. You override the methods that matter for your resolver strategy.
The 12-member contract
Section titled “The 12-member contract”class Resolver(Protocol):
# ── Properties (4) ────────────────────────────────────────────
@property def name(self) -> str: ... """Unique resolver identifier, must match manifest.json 'name'."""
@property def version(self) -> str: ... """Semver version string, must match manifest.json 'version'."""
@property def description(self) -> str: ... """Human-readable description shown in GET /resolvers."""
@property def supports_resume(self) -> bool: ... """True if this resolver can resume a paused run via POST /resume."""
# ── Methods (8) ───────────────────────────────────────────────
def instantiation_schema(self, params: dict | None = None) -> dict: ... """A2UI surface describing inputs for POST /instances."""
def workspace_spec(self, params, available_credentials) -> WorkspaceSpec: ... """Repos to clone and git setup for the workspace."""
def container_setup(self, params) -> ContainerSetup: ... """Extra setup commands, mounts, bundles, environment."""
def message_types(self, instance_state: dict) -> list[dict]: ... """Available unsolicited message types — dynamic by state."""
async def run(self, config, session_factory, emitter, *, resume=False) -> ResolverResult: ... """Execute the resolver. Called on start or resume."""
async def on_input_response(self, config, request_id, payload) -> None: ... """Called when a consumer responds to an input request."""
async def on_message(self, config, message_type, payload) -> None: ... """Called when a consumer sends an unsolicited message."""
async def stop(self, reason: str) -> None: ... """Called when the platform requests graceful stop."""Properties
Section titled “Properties”name — resolver identifier
Section titled “name — resolver identifier”Must be kebab-case and match the name field in manifest.json. Used for resolver
discovery, instance routing, and event prefixes.
@propertydef name(self) -> str: return "my-resolver"version — semver string
Section titled “version — semver string”Must match manifest.json. Exposed in GET /resolvers for consumers to detect
resolver versions. Use standard semver (MAJOR.MINOR.PATCH).
@propertydef version(self) -> str: return "1.2.0"description — human-readable summary
Section titled “description — human-readable summary”Shown in the GET /resolvers listing and the platform UI. One sentence, no
trailing period.
@propertydef description(self) -> str: return "Builds REST endpoints from a natural-language spec using a TDD loop"supports_resume — resumability flag
Section titled “supports_resume — resumability flag”Set True if your run() can restart from a checkpoint after POST /instances/{id}/resume.
The platform exposes the resume endpoint only when this is True.
@propertydef supports_resume(self) -> bool: return False # or True if you implement checkpoint/restore in run()Methods
Section titled “Methods”instantiation_schema() — the creation form
Section titled “instantiation_schema() — the creation form”Returns an A2UI v0.9 schema tree that the frontend renders as the instance
creation form. The params argument enables progressive disclosure — the schema can
return different fields based on what the user has already filled in.
def instantiation_schema(self, params: dict | None = None) -> dict: return { "type": "form", "components": [ { "type": "textarea", "id": "spec", "label": "Feature Specification", "placeholder": "Describe what you want to build...", "checks": [ {"condition": {"call": "required", "args": [{"path": "/spec"}]}, "message": "Specification is required"} ] }, { "type": "text", "id": "repo", "label": "Repository (owner/name)", "placeholder": "myorg/myrepo", "checks": [ {"condition": {"call": "required", "args": [{"path": "/repo"}]}, "message": "Repository is required"}, {"condition": {"call": "regex", "args": {"value": {"path": "/repo"}, "pattern": "^[\\w.-]+/[\\w.-]+$"}}, "message": "Must be 'owner/repo' format"} ] } ] }workspace_spec() — repository setup
Section titled “workspace_spec() — repository setup”Declares which repos the platform should clone into /project/workspace/ before
calling run(). Returns a WorkspaceSpec.
def workspace_spec(self, params: dict, available_credentials=None) -> WorkspaceSpec: return WorkspaceSpec( repos=[ RepoSpec( url=f"https://github.com/{params['repo']}", branch="main", ) ], gitea=True, # spin up a Gitea sidecar for local git operations working_branch=f"resolve/{params.get('name', 'feature')}", )| Field | Type | Description |
|---|---|---|
repos |
list[RepoSpec] |
Repos to clone into /project/workspace/ |
gitea |
bool |
Whether to start a Gitea sidecar container |
working_branch |
str |
Branch to create/checkout before resolver starts |
container_setup() — container configuration
Section titled “container_setup() — container configuration”Returns extra container setup beyond the base image. Runs before run().
def container_setup(self, params: dict) -> ContainerSetup: return ContainerSetup( setup_commands=[ "pip install my-extra-package", ], bundles=[ "git+https://github.com/obra/superpowers@main", ], environment={ "MY_RESOLVER_MODE": "production", }, )| Field | Type | Description |
|---|---|---|
setup_commands |
list[str] |
Shell commands run before run() |
bundles |
list[str] |
Amplifier bundle URLs to activate in the container |
environment |
dict |
Extra env vars injected into the container |
volume_mounts |
list |
Extra volumes to mount |
message_types() — available message catalog
Section titled “message_types() — available message catalog”Declares what unsolicited messages the resolver accepts right now. The platform calls
this to validate POST /instances/{id}/messages requests. The list is dynamic — you
can return different types based on the instance’s current phase.
def message_types(self, instance_state: dict) -> list[dict]: types = [ { "name": "user_feedback", "label": "Send Feedback", "description": "Steer the resolver with free-text feedback", "schema": { "type": "form", "components": [ {"type": "textarea", "id": "text", "label": "Feedback"} ] } } ] # Only expose abort during active work if instance_state.get("phase") not in ("completed", "failed"): types.append({ "name": "abort", "label": "Abort", "description": "Stop after the current step", "schema": {"type": "form", "components": []} }) return typesrun() — the resolver entry point
Section titled “run() — the resolver entry point”The most important method. Called once when the instance starts (and again with
resume=True if the resolver supports resume). Must emit the required event chain
and return a ResolverResult.
async def run( self, config: InstanceConfig, session_factory: SessionFactory, emitter: EventEmitter, *, resume: bool = False,) -> ResolverResult: ...Parameters:
| Parameter | Type | Description |
|---|---|---|
config |
InstanceConfig |
Instance config: instance_id, params, workspace_path, credentials |
session_factory |
SessionFactory |
Factory for creating Amplifier LLM sessions |
emitter |
EventEmitter |
Event emitter — use await emitter.emit(...) to emit events |
resume |
bool |
True when called from POST /instances/{id}/resume |
Full example:
async def run(self, config, session_factory, emitter, *, resume=False): try: await emitter.emit("resolver:created", { "resolver_name": self.name, "resolver_version": self.version, "instance_id": config.instance_id, "params": config.params, })
await emitter.emit("resolver:phase", {"name": "planning"})
# Dispatch an LLM session session = await session_factory.create(name="plan") plan = await session.run(prompt=f"Plan: {config.params['spec']}")
await emitter.emit("resolver:phase", {"name": "implementing"})
# ... do the work ...
await emitter.emit("resolver:artifact", { "kind": "pr", "value": "https://github.com/myorg/myrepo/pull/42", })
await emitter.emit("resolver:completed", { "outcome": "success", "summary": "PR created", "duration_s": 42.0, "artifact_count": 1, }) return ResolverResult(status="completed", output="PR created")
except Exception as e: await emitter.emit("resolver:completed", { "outcome": "failed", "summary": str(e), "duration_s": 0.0, "artifact_count": 0, "error": {"code": "unexpected_error", "detail": str(e)}, }) return ResolverResult(status="failed", error=str(e))on_input_response() — handle user responses
Section titled “on_input_response() — handle user responses”Called when a consumer submits POST /instances/{id}/input-requests/{rid}. The
payload is validated against the A2UI schema you declared when creating the request.
async def on_input_response(self, config, request_id, payload): # payload is already validated against your A2UI schema decision = payload.get("decision") if decision == "retry": self._retry_gate.set() # signal your waiting run() to continue elif decision == "abort": self._abort_gate.set()on_message() — handle unsolicited messages
Section titled “on_message() — handle unsolicited messages”Called when a consumer submits POST /instances/{id}/messages. The message_type
is always one you declared in message_types() for the current state.
async def on_message(self, config, message_type, payload): if message_type == "user_feedback": self._feedback_queue.put_nowait(payload["text"]) elif message_type == "abort": self._abort_requested = Truestop() — graceful shutdown
Section titled “stop() — graceful shutdown”Called when POST /instances/{id}/stop is received. Your resolver should checkpoint
state and return promptly. The platform will wait briefly, then force-terminate the
container.
async def stop(self, reason: str) -> None: self._stop_requested = True # Checkpoint any state you want to persist await self._checkpoint()ResolverResult
Section titled “ResolverResult”The return value of run(). The platform uses this to set the final instance status.
@dataclassclass ResolverResult: status: Literal["completed", "paused", "failed"] output: str | None = None # brief result summary (shown in GET /instances/{id}) error: str | None = None # error description (when status="failed") sessions_used: int = 0 # number of LLM sessions consumedstatus |
Instance status | When to use |
|---|---|---|
"completed" |
completed |
Work finished successfully |
"paused" |
paused |
Intentional checkpoint — will resume later |
"failed" |
failed |
Unrecoverable error |
Platform vs resolver responsibilities
Section titled “Platform vs resolver responsibilities”| Concern | Platform | Resolver |
|---|---|---|
| Container lifecycle | ✓ Create, start, stop, destroy | |
| Event transport | ✓ Mirror, SSE, persistence | |
| Input request routing | ✓ Store, expose, validate, deliver | ✓ Schema + handling |
| Message routing | ✓ Validate, deliver | ✓ Catalog + handling |
| A2UI validation | ✓ Structural validation | ✓ Semantic meaning |
| LLM sessions | ✓ Factory (infrastructure) | ✓ What to ask |
| Workspace git setup | ✓ Clone, branch | ✓ What repos, what branch |
| Business logic | ✓ Everything inside run() |
Next steps
Section titled “Next steps”- manifest.json Reference — declare your resolver to the platform
- Event Vocabulary — what events to emit and when
- A2UI Schema — build forms for instantiation and input requests