Skip to content

Creating Adapters

Adapters are the framework’s bridge to external AI providers. Each adapter wraps a provider SDK (or CLI, or protocol) and exposes it as a bus-integrated agent that participates in sessions, uses tools, and emits typed telemetry events.

This guide walks through the three-layer adapter contract, the bus subjects adapters must implement, and how to build a new adapter from scratch. The OpenAI adapter (../adapters/implementations/openai-node/) is the reference implementation throughout.

The Three-Layer Contract

Every adapter is composed of three classes, each with a distinct responsibility:

AIAdapter — lifecycle owner, bus integration, agent registry
|
+-- AIAgent — turn execution, tool orchestration, event fan-out
|
+-- AIAgentConnector — SDK bridge, streaming, protocol translation
LayerBase classOwns
AdapterAIAdapterAdapter lifecycle, adapter.* subject handlers, agent creation and registry
AgentAIAgentTurn execution, agent.* subject handlers (filtered by agentId), connector lifecycle
ConnectorAIAgentConnectorProvider SDK calls, streaming, message normalization

The split exists because these concerns change independently. Swapping from the OpenAI SDK to a custom HTTP client only changes the connector. Adding a new tool orchestration strategy only changes the agent. The adapter layer stays stable across both.

Layer 1: AIAdapter

AIAdapter is an abstract class that handles bus registration and agent lifecycle. Your adapter subclass wires the three factories that tell the framework how to create agents and connectors:

import { AIAdapter, type AIAdapterConfig } from '@makaio/ai-adapters-core';
import { MyNamespace } from './namespace.js';
import { MyAgent } from './agent.js';
import { MyConnector } from './connector.js';
import { MyConfig } from './config.js';
type MyBus = Awaited<ReturnType<typeof MyNamespace.scopedBus>>;
export class MyAdapter extends AIAdapter<MyBus, MyConnector, MyAgent> {
public constructor(config?: Partial<AIAdapterConfig>) {
super({
name: 'my-provider',
capabilities: ['tools', 'systemPrompt:override'],
namespace: MyNamespace,
agentFactory: (agentConfig) => new MyAgent(agentConfig),
configFactory: MyConfig.getConfig,
connectorFactory: (config) => new MyConnector(config),
...config,
});
}
}

The constructor config (AIAdapterConstructorConfig in ../adapters/core/src/adapter/types.ts) requires:

FieldTypePurpose
namestringUnique adapter identifier (e.g., 'openai', 'anthropic-sdk')
capabilitiesstring[]Declared capabilities (see Capabilities)
namespaceBusNamespaceThe adapter’s registered bus namespace
agentFactory(config) => AIAgentCreates agent instances
configFactory(input) => Promise<Config>Resolves runtime configuration (credentials, model, etc.)
connectorFactory(config) => AIAgentConnectorCreates connector instances

Lifecycle

AIAdapter handles these automatically — you rarely need to override them:

  • init() — Registers adapter.startAgent, adapter.rehydrateAgent, adapter.stopAgent, adapter.listAgents, adapter.getAgent, adapter.getCapabilities, and adapter.infer handlers on the bus. Calls onInit() if overridden.
  • close() — Stops all running agents, unregisters bus handlers. Calls onClose() if overridden.

Override onInit() and onClose() for adapter-specific setup/teardown:

protected async onInit(): Promise<void> {
// e.g., validate API key, warm caches
}
protected async onClose(): Promise<void> {
// e.g., close persistent connections
}

Layer 2: AIAgent

AIAgent is the middle layer. It creates a connector, registers per-agent bus handlers (filtered by agentId), and orchestrates turn execution.

Every agent subclass must implement one abstract method:

protected abstract wireEvents(connector: TConnector): void | Promise<void>;

wireEvents is called during init() and after every connector swap. It subscribes to connector-scoped bus events and fans them out to global agent.* subjects. This is where provider-specific SDK events are translated into the framework’s normalized telemetry.

What AIAgent Provides

The base class gives you these protected helpers for emitting standardized events:

MethodEmits
emitStart(event?)agent.started
emitCompletion(result)agent.complete
emitError(result)agent.complete (with error)
emitToolUse(toolName, args, nativeId)agent.tool.use
emitToolOutput(output, hints)agent.tool.output
emitStepStarted(stepType, blockData, content)agent.step.started
emitStepFinished(stepType, content)agent.step.finished
trackUsage(normalized)agent.usage
emitContextWindowUpdate(input)agent.contextWindow.updated
emitGlobal(subject, payload)Any subject, enriched with AgentContext

You call these from wireEvents to translate connector events into the framework’s event model.

AgentTurnExecutor

AIAgent delegates turn execution to AgentTurnExecutor (../adapters/core/src/agent/agent-turn-executor.ts). The executor runs the shared pipeline for every message:

  1. Call onBeforeDispatch() (marks agent active in storage)
  2. Run pre-user-message hooks (hooks can mutate message and session context)
  3. Evaluate shouldUseNativeResume() (skip message history if the connector has native session state)
  4. Call connector.sendMessage() or connector.start()
  5. Fire post-user-message hooks (non-blocking)
  6. Track the message handle for lifecycle events

You do not need to implement or override this — it runs automatically when the bus dispatches agent.sendMessage to your agent.

Layer 3: AIAgentConnector

AIAgentConnector is the SDK bridge. It owns the actual provider API calls and translates provider-specific events into the connector’s scoped bus.

Abstract Methods to Implement

abstract initialize(options?): Promise<void>;
abstract start(message, options?): Promise<AgentStartResult>;
abstract sendMessage(message, options?): Promise<MessageHandle>;
abstract abort(): void;
abstract close(): Promise<void>;
abstract getAdapterSessionId(): Promise<string>;
abstract complete(): Promise<MessageResult | null>;
abstract interrupt(): Promise<void>;

Optional Overrides

MethodDefaultWhen to Override
changeModelInPlace(newModel)Returns falseProvider supports in-session model switching
changeCwdInPlace(newCwd)Returns falseProvider has working directory awareness
changeReasoningInPlace(newLevel)Returns falseProvider supports reasoning level adjustment
markToolRefreshPending()No-opProvider needs explicit tool list refresh

Auto-Injected Metadata

The base class automatically injects adapterName, agentId, adapterId, and adapterSessionId into every emit() and requestToolApproval() call. Your connector implementation must not duplicate these fields — doing so overwrites the framework-managed values.

Mutation Contract

When implementing changeModelInPlace, return true if the provider accepted the change but do not mutate this.model directly. The caller (AIAgent) owns that update after a confirmed change.

StreamSession: The Streaming Adapter Base

Most modern adapters use streaming APIs. The stream-session shared package (../adapters/shared/stream-session/) provides base classes that implement the full connector and agent protocols for streaming adapters:

BaseStreamConnector

BaseStreamConnector (stream-session/src/connector/base-stream-connector.ts) implements the full AIAgentConnector protocol. You extend it and provide four hooks:

HookPurpose
fetchTools()Resolve credentials, init SDK client, convert tools to provider format
createSession()Build the streaming session object with all config
getTurnSubjects()Return the namespace subjects for turn lifecycle events
afterSessionCreated()Optional post-creation setup

The base handles: lazy session initialization, user message queuing, MCP tool injection, tool refresh flags, and canonical turn number tracking.

BaseStreamAgent

BaseStreamAgent (stream-session/src/agent/base-stream-agent.ts) extends AIAgent and implements wireEvents() using a two-tier fan-out:

  1. wireSdkEvents() (abstract — you implement) — Routes the sdk.event catch-all from the connector to typed semantic subjects using createConnectorEventMapping.
  2. wireSemanticEvents() (concrete) — Wires semantic subjects (chunk, usage, toolCalls, messageComplete, etc.) to global agent.* bus subjects.

Your agent subclass implements wireSdkEvents() and a few extraction methods:

type MySubjectSpec = StreamAdapterSubjectSpec<
typeof MyConnectorSubjects.chunk.$meta.namespace
>;
class MyAgent extends BaseStreamAgent<MyBus, MyConnector, MySubjectSpec> {
protected wireSdkEvents(): void {
this.createConnectorEventMapping(
MyConnectorSubjects.sdk.event,
'eventType',
{
chunk: MyConnectorSubjects.chunk,
usage: MyConnectorSubjects.usage,
tool_calls: MyConnectorSubjects.tool_calls,
message_complete: MyConnectorSubjects.message_complete,
agent_started: MyConnectorSubjects.agent_started,
agent_complete: MyConnectorSubjects.agent_complete,
error: MyConnectorSubjects.error,
},
'event',
);
}
protected getConnectorSubjects(): MySubjectSpec { ... }
protected extractChunkText(payload: Record<string, unknown>): string { ... }
protected extractUsagePayload(payload: Record<string, unknown>): NormalizedCallUsage { ... }
}

Bus Subjects

Adapter Subjects (adapter.*)

These are handled by AIAdapter. The base class registers handlers automatically — you do not need to implement them:

SubjectDirectionPurpose
adapter.startAgentRPCCreate and start a new agent
adapter.rehydrateAgentRPCResume an agent from stored state
adapter.stopAgentRPCStop a running agent
adapter.listAgentsRPCList active agents
adapter.getAgentRPCGet one active agent by agentId
adapter.getCapabilitiesRPCQuery adapter capabilities
adapter.inferRPCOne-shot inference (no session)
adapter.initializedEventAdapter is ready
adapter.agent.createdEventNew agent instance created
adapter.session.createdEventAgent started a provider session
adapter.session.closedEventProvider session ended
adapter.session.usageEventAggregated token usage
adapter.logEventAdapter-level log message
adapter.errorEventAdapter-level error

adapter.getConfigSchema exists in the contracts package for settings/config UI, but AIAdapter does not auto-register it. Config schema exposure is owned by the adapter subsystem because schemas live on adapter definitions, not live adapter instances.

Agent Subjects (agent.*)

These are handled by AIAgent, filtered by agentId. Your agent emits these through the base class helpers:

SubjectDirectionPurpose
agent.sendMessageRPCSend a user message to the agent
agent.toolApproveRPCRequest tool approval from the approval service
agent.getCapabilitiesRPCQuery agent-level capabilities
agent.startedEventAgent began processing
agent.completeEventAgent finished (success or error)
agent.idleEventAgent is waiting for input
agent.message_deltaEventStreaming text chunk
agent.messageEventAssembled complete message
agent.reasoning_deltaEventStreaming reasoning chunk
agent.reasoningEventAssembled reasoning block
agent.tool.useEventTool invocation started
agent.tool.startedEventTool execution began
agent.tool.outputEventTool produced output
agent.tool.completedEventTool execution finished
agent.usageEventPer-call token usage
agent.turn.startedEventTurn began
agent.turn.completedEventTurn finished
agent.step.startedEventProcessing step began
agent.step.finishedEventProcessing step finished
agent.contextWindow.updatedEventContext window fill level changed
agent.model.changedEventModel was switched mid-session
agent.cwd.changedEventWorking directory changed
agent.session.closedEventAgent session ended

Capabilities

Adapters declare their capabilities as string tokens in the constructor:

capabilities: ['tools', 'systemPrompt:override', 'session:resume']

The capability registry (../adapters/core/src/types/capabilities.ts) uses TypeScript declaration merging. Declare capabilities at construction time and do not mutate them dynamically unless the capability source itself is explicit and source-backed. Built-in capabilities:

PathMeaning
toolsAdapter supports tool/function calling
visionAdapter accepts image inputs
structuredOutputAdapter supports JSON schema output
systemPrompt:overrideSystem prompt can be replaced per-session
systemPrompt:appendSystem prompt can be appended to
session:resumeAgent can resume from stored session state
session:forkAgent can fork an existing session
chat:inTurnMessagesMultiple messages per turn supported
modelSwitchInSessionModel can change mid-session

Query capabilities at runtime:

const caps = parseAIAdapterCapabilities(adapter.capabilities);
caps.tools; // true
caps.systemPromptOverride; // true (colon-paths become camelCase)
caps.hasAll(['tools', 'vision']); // boolean

Extending Capabilities

Add provider-specific capabilities via declaration merging:

declare module '@makaio/ai-adapters-core' {
interface AIAdapterCapabilityRegistry {
artifacts: { beta: boolean };
}
}
// Then declare:
capabilities: ['artifacts:beta']
// Query: caps.artifactsBeta

Walkthrough: OpenAI Adapter

The OpenAI adapter at ../adapters/implementations/openai-node/ is the reference implementation for streaming SDK adapters.

adapter.ts — OpenAIAdapter

export class OpenAIAdapter extends AIAdapter<
OpenAINodeConnectorBus,
OpenAINodeConnector,
OpenAIAgent
> {
public constructor(config?: Partial<AIAdapterConfig>) {
super({
name: OpenAINodeAdapterName,
capabilities: ['tools', 'streaming', 'systemPrompt:override', 'systemPrompt:append'],
agentFactory: (config) => new OpenAIAgent(config),
configFactory: OpenAINodeConfig.getConfig,
connectorFactory: (config) => new OpenAINodeConnector(config),
namespace: OpenAINs,
...config,
});
}
}

No onInit or onClose overrides — the adapter is pure infrastructure wiring.

agent.ts — OpenAIAgent

Extends BaseStreamAgent. Implements:

  • wireSdkEvents() — Routes sdk.event envelopes by eventType discriminant to typed semantic subjects using createConnectorEventMapping
  • getConnectorSubjects() — Maps namespace subjects to StreamAdapterSubjectSpec
  • extractChunkText(payload) — Reads choices[0].delta.content
  • extractUsagePayload(payload) — Maps prompt_tokens/completion_tokens to NormalizedCallUsage
  • wireToolApprovalRpc(connector) — Routes the connector’s tool_approval RPC to the global AgentSubjects.toolApprove

connector.ts — OpenAINodeConnector

Extends BaseStreamConnector. Implements:

  • fetchTools() — Resolves credentials, initializes new OpenAI(...), converts framework tools to OpenAI’s function format
  • createSession() — Constructs OpenAIConnectorSession with system prompt, tools, and streaming config
  • getTurnSubjects() — Returns the connector’s turn namespace subjects

The OpenAIConnectorSession manages the streaming agentic loop. Each turn calls openai.chat.completions.create({ stream: true }) and emits SDK events back to the connector’s scoped bus.

Conformance Tests

The framework ships a shared conformance test suite that validates the full orchestration pipeline for any adapter:

Terminal window
MAKAIO_TEST_ADAPTER=openai-node yarn test adapters/implementations/__tests__

Tests live at ../adapters/implementations/__tests__/ and cover:

  • agents.simple.test.ts — Initialization, processing state, basic round-trip
  • agents.queue.test.ts — Message queue ordering (enqueue, replace, immediate)
  • agents.tool-approval.test.ts — Tool approval RPC flow
  • orchestration/subjects.test.ts — Subject emission coverage
  • orchestration/sendMessage.test.ts — Full send-message pipeline
  • orchestration/continuation.test.ts — Multi-turn continuation
  • orchestration/lifecycle-mutations.test.ts — Model/CWD changes mid-session

Making Your Adapter Conformance-Testable

Export a createTestConfig() function from your adapter’s src/index.ts:

import {
createTestProviderContext,
resolveTestConfig,
type ConformanceTestConfig,
} from '@makaio/ai-adapters-core';
import { registerToolApprovalHandler } from './tool-handling.js';
import { providerDefinitions } from './provider.js';
export async function createTestConfig(): Promise<
ConformanceTestConfig<MyBus, MyConnector, MyAgent>
> {
const bus = await MyNamespace.scopedBus();
const testProviderDef = providerDefinitions[0];
return {
createConnector: async (options?) => {
// Create a connector instance for testing
const config = await MyConfig.getConfig({
...resolveTestConfig(options, bus, testProviderDef),
sessionId: options?.sessionId ?? 'test-session-id',
});
return new MyConnector(config);
},
registerToolApprovalHandler,
bus,
capabilities: {
supportsReplace: true,
supportsInterrupt: true,
supportsUsageMetrics: true,
},
options: {
primaryModel: { definitionId: 'my-provider', modelName: 'my-fast-model' },
secondaryModel: { definitionId: 'my-provider', modelName: 'my-default-model' },
},
// For orchestration tests:
createAdapter: async (options) => {
const adapter = new MyAdapter({ adapterId: options?.adapterId });
await adapter.init();
return adapter;
},
adapterName: 'my-provider',
testProviderContext: createTestProviderContext(testProviderDef),
};
}

When wiring tool approval manually, pass a complete ToolApprovalContext: adapterId, adapterName, agentId, adapterSessionId, and sessionId are all required for routing the global AgentSubjects.toolApprove request correctly.

The shared test harness (__tests__/shared.ts) dynamically imports your adapter by name and calls createTestConfig().

Scaffolding a New Adapter

1. Create the Package

adapters/implementations/my-provider/
├── descriptor.json # Descriptor discovery contract
├── package.json
├── tsdown.config.ts
├── tsconfig.json
└── src/
├── index.ts # Adapter export + createTestConfig
├── server.ts # Default export for descriptor entrypoints.server
├── package.ts # MakaioExtension with adapters[] contribution
├── definition.ts # Runtime adapter definition
├── namespace.ts # Provider-specific bus namespace
├── config.ts # Configuration factory
├── adapter.ts # AIAdapter subclass
├── agent.ts # AIAgent subclass (or BaseStreamAgent)
└── connector.ts # AIAgentConnector subclass (or BaseStreamConnector)

2. Register a Bus Namespace

src/namespace.ts
import { MakaioBus } from '@makaio/bus-core';
import { z } from 'zod';
const MyProviderSchemas = {
// Provider-specific events (optional)
'sdk.event': z.object({ eventType: z.string(), data: z.unknown() }),
};
export const MyProviderNamespace = MakaioBus.registerNamespace(
'adapter:myProvider',
MyProviderSchemas,
);

3. Implement the Three Layers

For streaming providers, extend BaseStreamConnector and BaseStreamAgent from @makaio/ai-adapters-stream-session. For non-streaming providers (e.g., CLI-backed), extend AIAgentConnector and AIAgent directly.

4. Register the Adapter with an Extension

Adapters are contributed via MakaioExtension.adapters. Create a package.ts that wraps your adapter definition into an extension manifest:

src/package.ts
import type { MakaioExtension } from '@makaio/contracts/extension';
import { adapterDefinition } from './definition.js';
const extension: MakaioExtension = {
name: 'my-provider',
displayName: 'My Provider',
adapters: [{
manifest: { name: 'my-provider', protocols: ['openai'] },
definition: adapterDefinition,
}],
};
export default extension;

Expose that package through src/server.ts and declare "entrypoints": { "server": true } in descriptor.json. The runtime discovers adapters through descriptor-based extension discovery and the adapter subsystem contribution processor. There is no separate adapter discovery pipeline — adapters are extension contribution surfaces like tools or UI. Use plural manifest.protocols on AdapterManifest; keep singular definition.protocol on the runtime adapter definition when the adapter has one active protocol.

5. Run Conformance Tests

Terminal window
MAKAIO_TEST_ADAPTER=my-provider yarn test adapters/implementations/__tests__

If your adapter passes the conformance suite, it works with the full orchestration pipeline: bus request > adapter > agent > tool calls > events > response.