Skip to content

Resolver Protocol

The Resolver Protocol is the contract between the Resolve platform and every resolver plugin. Implement all 12 members and your resolver integrates with the full platform — instance creation, live events, input requests, user messages, pause/resume, and graceful stop.

from amplifier_resolve.core.resolver import Resolver, ResolverResult

The SDK provides a base class that implements the boilerplate. You override the methods that matter for your resolver strategy.


class Resolver(Protocol):
# ── Properties (4) ────────────────────────────────────────────
@property
def name(self) -> str: ...
"""Unique resolver identifier, must match manifest.json 'name'."""
@property
def version(self) -> str: ...
"""Semver version string, must match manifest.json 'version'."""
@property
def description(self) -> str: ...
"""Human-readable description shown in GET /resolvers."""
@property
def supports_resume(self) -> bool: ...
"""True if this resolver can resume a paused run via POST /resume."""
# ── Methods (8) ───────────────────────────────────────────────
def instantiation_schema(self, params: dict | None = None) -> dict: ...
"""A2UI surface describing inputs for POST /instances."""
def workspace_spec(self, params, available_credentials) -> WorkspaceSpec: ...
"""Repos to clone and git setup for the workspace."""
def container_setup(self, params) -> ContainerSetup: ...
"""Extra setup commands, mounts, bundles, environment."""
def message_types(self, instance_state: dict) -> list[dict]: ...
"""Available unsolicited message types — dynamic by state."""
async def run(self, config, session_factory, emitter, *, resume=False) -> ResolverResult: ...
"""Execute the resolver. Called on start or resume."""
async def on_input_response(self, config, request_id, payload) -> None: ...
"""Called when a consumer responds to an input request."""
async def on_message(self, config, message_type, payload) -> None: ...
"""Called when a consumer sends an unsolicited message."""
async def stop(self, reason: str) -> None: ...
"""Called when the platform requests graceful stop."""

Must be kebab-case and match the name field in manifest.json. Used for resolver discovery, instance routing, and event prefixes.

@property
def name(self) -> str:
return "my-resolver"

Must match manifest.json. Exposed in GET /resolvers for consumers to detect resolver versions. Use standard semver (MAJOR.MINOR.PATCH).

@property
def version(self) -> str:
return "1.2.0"

Shown in the GET /resolvers listing and the platform UI. One sentence, no trailing period.

@property
def description(self) -> str:
return "Builds REST endpoints from a natural-language spec using a TDD loop"

Set True if your run() can restart from a checkpoint after POST /instances/{id}/resume. The platform exposes the resume endpoint only when this is True.

@property
def supports_resume(self) -> bool:
return False # or True if you implement checkpoint/restore in run()

instantiation_schema() — the creation form

Section titled “instantiation_schema() — the creation form”

Returns an A2UI v0.9 schema tree that the frontend renders as the instance creation form. The params argument enables progressive disclosure — the schema can return different fields based on what the user has already filled in.

def instantiation_schema(self, params: dict | None = None) -> dict:
return {
"type": "form",
"components": [
{
"type": "textarea",
"id": "spec",
"label": "Feature Specification",
"placeholder": "Describe what you want to build...",
"checks": [
{"condition": {"call": "required", "args": [{"path": "/spec"}]},
"message": "Specification is required"}
]
},
{
"type": "text",
"id": "repo",
"label": "Repository (owner/name)",
"placeholder": "myorg/myrepo",
"checks": [
{"condition": {"call": "required", "args": [{"path": "/repo"}]},
"message": "Repository is required"},
{"condition": {"call": "regex",
"args": {"value": {"path": "/repo"}, "pattern": "^[\\w.-]+/[\\w.-]+$"}},
"message": "Must be 'owner/repo' format"}
]
}
]
}

Declares which repos the platform should clone into /project/workspace/ before calling run(). Returns a WorkspaceSpec.

def workspace_spec(self, params: dict, available_credentials=None) -> WorkspaceSpec:
return WorkspaceSpec(
repos=[
RepoSpec(
url=f"https://github.com/{params['repo']}",
branch="main",
)
],
gitea=True, # spin up a Gitea sidecar for local git operations
working_branch=f"resolve/{params.get('name', 'feature')}",
)
Field Type Description
repos list[RepoSpec] Repos to clone into /project/workspace/
gitea bool Whether to start a Gitea sidecar container
working_branch str Branch to create/checkout before resolver starts

container_setup() — container configuration

Section titled “container_setup() — container configuration”

Returns extra container setup beyond the base image. Runs before run().

def container_setup(self, params: dict) -> ContainerSetup:
return ContainerSetup(
setup_commands=[
"pip install my-extra-package",
],
bundles=[
"git+https://github.com/obra/superpowers@main",
],
environment={
"MY_RESOLVER_MODE": "production",
},
)
Field Type Description
setup_commands list[str] Shell commands run before run()
bundles list[str] Amplifier bundle URLs to activate in the container
environment dict Extra env vars injected into the container
volume_mounts list Extra volumes to mount

message_types() — available message catalog

Section titled “message_types() — available message catalog”

Declares what unsolicited messages the resolver accepts right now. The platform calls this to validate POST /instances/{id}/messages requests. The list is dynamic — you can return different types based on the instance’s current phase.

def message_types(self, instance_state: dict) -> list[dict]:
types = [
{
"name": "user_feedback",
"label": "Send Feedback",
"description": "Steer the resolver with free-text feedback",
"schema": {
"type": "form",
"components": [
{"type": "textarea", "id": "text", "label": "Feedback"}
]
}
}
]
# Only expose abort during active work
if instance_state.get("phase") not in ("completed", "failed"):
types.append({
"name": "abort",
"label": "Abort",
"description": "Stop after the current step",
"schema": {"type": "form", "components": []}
})
return types

The most important method. Called once when the instance starts (and again with resume=True if the resolver supports resume). Must emit the required event chain and return a ResolverResult.

async def run(
self,
config: InstanceConfig,
session_factory: SessionFactory,
emitter: EventEmitter,
*,
resume: bool = False,
) -> ResolverResult:
...

Parameters:

Parameter Type Description
config InstanceConfig Instance config: instance_id, params, workspace_path, credentials
session_factory SessionFactory Factory for creating Amplifier LLM sessions
emitter EventEmitter Event emitter — use await emitter.emit(...) to emit events
resume bool True when called from POST /instances/{id}/resume

Full example:

async def run(self, config, session_factory, emitter, *, resume=False):
try:
await emitter.emit("resolver:created", {
"resolver_name": self.name,
"resolver_version": self.version,
"instance_id": config.instance_id,
"params": config.params,
})
await emitter.emit("resolver:phase", {"name": "planning"})
# Dispatch an LLM session
session = await session_factory.create(name="plan")
plan = await session.run(prompt=f"Plan: {config.params['spec']}")
await emitter.emit("resolver:phase", {"name": "implementing"})
# ... do the work ...
await emitter.emit("resolver:artifact", {
"kind": "pr",
"value": "https://github.com/myorg/myrepo/pull/42",
})
await emitter.emit("resolver:completed", {
"outcome": "success",
"summary": "PR created",
"duration_s": 42.0,
"artifact_count": 1,
})
return ResolverResult(status="completed", output="PR created")
except Exception as e:
await emitter.emit("resolver:completed", {
"outcome": "failed",
"summary": str(e),
"duration_s": 0.0,
"artifact_count": 0,
"error": {"code": "unexpected_error", "detail": str(e)},
})
return ResolverResult(status="failed", error=str(e))

on_input_response() — handle user responses

Section titled “on_input_response() — handle user responses”

Called when a consumer submits POST /instances/{id}/input-requests/{rid}. The payload is validated against the A2UI schema you declared when creating the request.

async def on_input_response(self, config, request_id, payload):
# payload is already validated against your A2UI schema
decision = payload.get("decision")
if decision == "retry":
self._retry_gate.set() # signal your waiting run() to continue
elif decision == "abort":
self._abort_gate.set()

on_message() — handle unsolicited messages

Section titled “on_message() — handle unsolicited messages”

Called when a consumer submits POST /instances/{id}/messages. The message_type is always one you declared in message_types() for the current state.

async def on_message(self, config, message_type, payload):
if message_type == "user_feedback":
self._feedback_queue.put_nowait(payload["text"])
elif message_type == "abort":
self._abort_requested = True

Called when POST /instances/{id}/stop is received. Your resolver should checkpoint state and return promptly. The platform will wait briefly, then force-terminate the container.

async def stop(self, reason: str) -> None:
self._stop_requested = True
# Checkpoint any state you want to persist
await self._checkpoint()

The return value of run(). The platform uses this to set the final instance status.

@dataclass
class ResolverResult:
status: Literal["completed", "paused", "failed"]
output: str | None = None # brief result summary (shown in GET /instances/{id})
error: str | None = None # error description (when status="failed")
sessions_used: int = 0 # number of LLM sessions consumed
status Instance status When to use
"completed" completed Work finished successfully
"paused" paused Intentional checkpoint — will resume later
"failed" failed Unrecoverable error

Concern Platform Resolver
Container lifecycle ✓ Create, start, stop, destroy
Event transport ✓ Mirror, SSE, persistence
Input request routing ✓ Store, expose, validate, deliver ✓ Schema + handling
Message routing ✓ Validate, deliver ✓ Catalog + handling
A2UI validation ✓ Structural validation ✓ Semantic meaning
LLM sessions ✓ Factory (infrastructure) ✓ What to ask
Workspace git setup ✓ Clone, branch ✓ What repos, what branch
Business logic ✓ Everything inside run()