Type Alias: BusLike
Makaio Framework / tools-core / BusLike
Type Alias: BusLike
Section titled “Type Alias: BusLike”BusLike =
IMakaioBus
Defined in: ../../../tools/core/src/types.ts:12
Type alias for the bus interface used in tool execution contexts.
Tools receive IMakaioBus instances for type-safe event communication.
This ensures full type safety with typed subjects from @makaio/contracts.
Type Composition
Section titled “Type Composition”-
IMakaioBus- `{ namespace: NamespaceDomain; /**
- Register an event or request handler.
- Events: Fire-and-forget - multiple handlers can listen and execute in parallel.
- Requests: Request-response with middleware support - handlers form a chain.
- Wildcard Support: Use
.$allproperty on subjects to match all subjects in a namespace. - Wildcard handlers receive
unknownpayload and must use type guards. - @param subject - SubjectDefinition object (exact or wildcard)
- @param handler - Handler function (EventHandler for events, RequestHandler for requests)
- @returns Unsubscribe function
- @example Basic event handler with typed payload
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- started: z.object({ agentId: z.string() }),
- });
- const unsubscribe = on(AgentSubjects.started, (context) => {
- context.payload.agentId; // ✅ string - fully typed
- console.debug(‘Agent started:’, context.payload.agentId);
- });
-
- @example Wildcard handlers for namespace-level events
-
- // Matches all subjects in namespace
- on(AgentSubjects.$all, (context) => {
- context.payload; // unknown - must use type guards
- if (‘agentId’ in context.payload) {
-
console.debug('Any agent event:', context.payload.agentId);
- }
- });
-
- @example Request handler with typed request/response
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- toolApprove: {
-
request: z.object({ toolName: z.string() }),
-
response: z.object({ approved: z.boolean() }),
- },
- });
- on(AgentSubjects.toolApprove, (context) => {
- const { toolName } = context.payload; // ✅ fully typed
- context.setResult({ approved: true });
- });
-
- @example Middleware pattern for request chain
-
- on(AgentSubjects.toolApprove, async (context) => {
- console.debug(‘Before approval’);
- await context.next();
- console.debug(‘After approval’);
- });
-
*/ on<Subject extends Subjects & StrictNamespace, IsChannel = Subject[‘$meta’][‘channel’]>( subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition
: never, options?: OnOptions, ): () => void; /** Register an interceptor that runs BEFORE handlers (payload transform, blocking, priority). */ intercept<Subject extends Subjects & StrictNamespace>( subject: Subject, handler: InterceptorHandler<Subject[‘$meta’][‘payload’]>, options?: InterceptOptions, ): () => void;
/**
- Register a one-time event or request handler that auto-unsubscribes after first invocation.
- Wraps the
on()method to automatically unsubscribe after the handler fires once. - The handler is removed BEFORE being invoked to prevent re-entrance issues if the
- handler triggers the same event.
- Events: Fire-and-forget - handler executes once then auto-unsubscribes.
- Requests: Request-response - handler executes once then auto-unsubscribes.
- Wildcard Support: Like
on(), supports.$allproperty for namespace-level patterns. - @param subject - SubjectDefinition object (exact or wildcard)
- @param handler - Handler function (EventHandler for events, RequestHandler for requests)
- @returns Unsubscribe function for manual cleanup if needed
- @example Basic one-time event handler
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- started: z.object({ agentId: z.string() }),
- });
- once(AgentSubjects.started, (context) => {
- context.payload.agentId; // ✅ string - fully typed
- console.debug(‘Agent started once:’, context.payload.agentId);
- });
- await emit(AgentSubjects.started, { agentId: ‘agent-123’ }); // Handler fires
- await emit(AgentSubjects.started, { agentId: ‘agent-456’ }); // Handler does NOT fire
-
- @example Manual unsubscribe before first fire
-
- const unsubscribe = once(AgentSubjects.started, (context) => {
- console.debug(‘This will never run’);
- });
- unsubscribe(); // Manually unsubscribe before event fires
- await emit(AgentSubjects.started, { agentId: ‘agent-123’ }); // Handler does NOT fire
-
- @example Wildcard handler for one-time namespace monitoring
-
- once(AgentSubjects.$all, (context) => {
- context.payload; // unknown - must use type guards
- console.debug(‘First agent event:’, context.payload);
- });
-
- @example One-time request handler
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- toolApprove: {
-
request: z.object({ toolName: z.string() }),
-
response: z.object({ approved: z.boolean() }),
- },
- });
- once(AgentSubjects.toolApprove, (context) => {
- const { toolName } = context.payload; // ✅ fully typed
- context.setResult({ approved: true });
- });
- await request(AgentSubjects.toolApprove, { toolName: ‘deleteFile’ }); // Handler fires
- await request(AgentSubjects.toolApprove, { toolName: ‘createFile’ }); // Handler does NOT fire
-
*/ once<Subject extends Subjects & StrictNamespace, IsChannel = Subject[‘$meta’][‘channel’]>( subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition
: never, ): () => void; /**
- Wait for an event to occur, returning a Promise.
- Note: Request subjects are not supported with the promise version of once().
- Use the callback version for request handlers:
once(subject, handler) - @param subject - Event SubjectDefinition object (exact or wildcard)
- @param options - Options object with: timeoutMs (reject after timeout), filter (only resolve when filter returns true), signal (AbortSignal to cancel waiting)
- @returns Promise that resolves with the event context
- @example Simple await
-
- const ctx = await bus.once(Subjects.init);
- console.debug(‘Event received:’, ctx.payload);
-
- @example With timeout
-
- try {
- const ctx = await bus.once(Subjects.init, { timeoutMs: 5000 });
- } catch (err) {
- if (err instanceof OnceTimeoutError) {
-
console.debug('Timed out waiting for event');
- }
- }
-
- @example With filter (waits for matching event)
-
- const ctx = await bus.once(Subjects.message, {
- filter: (payload) => payload.sessionId === expectedId
- });
- // Resolves only when a message with matching sessionId is received
-
- @example With AbortSignal
-
- const controller = new AbortController();
- const promise = bus.once(Subjects.init, { signal: controller.signal });
- controller.abort(); // Cancels the wait
-
*/ once< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends false ? Subject : never, options?: Subject extends SubjectDefinition ? OnceOptions
: never, ): Promise<Subject extends SubjectDefinition ? Parameters<HandlerForSubjectDefinition >[0] : never>; /** - Emit an event to all registered handlers.
- Events are fire-and-forget - all handlers execute in parallel.
- Handler errors are logged but don’t stop other handlers from executing.
- Note: You cannot emit to wildcard patterns. Use concrete subject keys only.
- Handlers registered with wildcards will still receive the event if it matches.
- @param subject - Concrete event subject (wildcards not allowed)
- @param payload - Event payload
- @param options - Emit options (messageId, correlationId, transports)
-
Transport Routing
Section titled “Transport Routing” -
transports: undefined- Send to ALL registered transports (default)
-
transports: []- Local only, don’t send to any transports
-
transports: ['ws', 'nats']- Send only to specified transports
- @example
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- started: z.object({ agentId: z.string() }),
- });
- // Send to all transports (default)
- await emit(AgentSubjects.started, { agentId: ‘agent-123’ });
- // Local only, no transports
- await emit(
- AgentSubjects.started,
- { agentId: ‘agent-123’ },
- { transports: [] }
- );
- // Send to specific transports
- await emit(
- AgentSubjects.started,
- { agentId: ‘agent-123’ },
- { transports: [‘websocket’] }
- );
- // With tracking IDs:
- await emit(
- AgentSubjects.started,
- { agentId: ‘agent-123’ },
- { correlationId: ‘user-action-123’ }
- );
-
*/ emit< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsWildcard = Subject[‘subject’] extends WildcardSubject ? true : false, IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends false ? IsWildcard extends false ? Subject : never : never, payload: Subject[‘$meta’][‘payload’], options?: EmitOptions, ): Promise
; /** - Execute a request and wait for a response.
- Requests follow a middleware chain pattern - handlers are called in order
- until one calls setResult() or all handlers complete.
- Note: You cannot request via wildcard patterns. Use concrete subject keys only.
- Handlers registered with wildcards will still match if the subject matches their pattern.
- @param subject - Concrete request subject (wildcards not allowed)
- @param payload - Request payload
- @param options - Request options (timeout, correlationId, transports)
- @returns Response value
- @throws {NoHandlerError} If no handler is registered
- @throws {TimeoutError} If request times out
- @throws {ValidationError} If payload validation fails
- @throws {RequestError} If handler throws an error
-
Transport Routing
Section titled “Transport Routing” -
transports: undefined- Send to ALL registered transports (default)
-
transports: []- Local only, don’t send to any transports
-
transports: ['ws', 'nats']- Send only to specified transports
- @example
-
- // ✅ Concrete subject
- const result = await request(
- AgentSubjects.toolApprove,
- { toolName: ‘deleteFile’, args: {}, toolCallId: ‘call_123’ },
- { timeout: 10000 }
- );
- console.debug(result.approved);
- // ❌ Cannot use wildcards
- // await request(‘agent.*’, { … }); // Type error
- // With specific transports
- const result = await request(
- AgentSubjects.toolApprove,
- { toolName: ‘deleteFile’, args: {}, toolCallId: ‘call_123’ },
- { transports: [‘websocket’], timeout: 10000 }
- );
-
*/ request< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>;
/**
- Execute a request, returning a discriminated union instead of throwing for missing handlers.
- Use for optional services. Only NoHandlerError is caught - other errors propagate.
- @see {@link OptionalResult} for return type details */ requestOptional< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<OptionalResult<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>>;
/**
- Execute a broadcast request and collect responses from ALL handlers.
- Unlike
request()which uses a middleware chain and returns the first result, broadcast()executes all handlers in parallel and aggregates their responses.- Use for discovery patterns where multiple nodes may respond (e.g., fs.listSources).
- Note: You cannot broadcast via wildcard patterns. Use concrete subject keys only.
- Handlers registered with wildcards will still match if the subject matches their pattern.
- Handler Usage:
- Handlers should call
ctx.identify(nodeId)beforectx.setResult()to tag their response. - If
identify()is not called, the response is tagged as ‘anonymous’. - @param subject - Concrete request subject (wildcards not allowed)
- @param payload - Request payload
- @param options - Request options (timeout, correlationId)
- @returns Array of { nodeId, payload } responses from all handlers
- @example
-
- // Discover all filesystem sources from all nodes
- const results = await MakaioBus.broadcast(FileSystemSubjects.listSources, {});
- // results: [
- // { nodeId: ‘local’, payload: { sources: […] } },
- // { nodeId: ‘container-1’, payload: { sources: […] } },
- // ]
- // Aggregate sources
- const allSources = results.flatMap(r => r.payload.sources);
-
*/ broadcast< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<BroadcastResult<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>[]>;
scoped: <Domain extends string, Subjects extends SubjectRecord, F, Sc extends Record<string, SubjectSchema>>( input: BusNamespace<Domain, Subjects, F, Sc>, context?: MakaioBusContext, ) => ScopedBus
; /**
- Create a filtered bus with a base payload filter.
- The filter is automatically applied to all
on()andonce()calls. - Optionally provide a type parameter for type-safe filter keys.
- @param filter - Base filter to apply to all subscriptions
- @returns FilteredBus with the specified filter
- @example
-
- // Untyped (loose) - any keys allowed
- const agentBus = MakaioBus.withFilter({ agentId: this.agentId });
- // Type-safe filter keys
- interface AgentPayload { agentId: string; sessionId: string }
- const strictBus = MakaioBus.withFilter
({ agentId: ‘x’ }); -
*/ withFilter: <Payload = unknown>( filter: [unknown] extends [Payload] ? PayloadFilter : TypedPayloadFilter
, ) => IFilteredBus<NamespaceDomain extends string ? NamespaceDomain : string>; /**
- Register a handler that receives ALL messages (events and requests) across all namespaces.
- Debugging/Testing Only: Noops in production (process.env.NODE_ENV === ‘production’).
- Useful for logging, debugging, and test assertions that need visibility into all bus activity.
- Handler receives complete metadata: type, subject, namespace, payload, messageId, correlationId.
- @param handler - Function to invoke for every message
- @returns Unsubscribe function (noop in production)
- @example
-
- const unsubscribe = bus.__onAny((context) => {
- console.debug(
[${context.type}] ${context.namespace}:${context.subject}, context.payload); - });
-
*/ __onAny: (handler: AnyHandler) => () => void; __resetHandlers?: () => void;
/**
- Register a transport by its name property.
- Convenience method that delegates to
getContext().transportRegistry.registerTransport(). - The transport’s
nameproperty is used as the registry key. - @param transport - Transport to register
- @returns Registration object with
unregisterandreadypromise */ registerTransport(transport: BusTransport): TransportRegistration;
/**
- Unregister a transport by name.
- No-op if no transport is registered under that name.
- @param name - Transport name to unregister */ unregisterTransport(name: string): void;
/**
- Disconnect all registered transports and clear the transport map.
- Convenience method for tearing down a bus instance. The inverse of
- passing
transportstocreateBusInstance()or calling registerTransport()individually. */ disconnect(): void;
/**
- Trigger an immediate reconnection attempt on all disconnected transports.
- Delegates to each transport’s
reconnect()method if available. For - transports with exponential-backoff reconnection (e.g. WebSocket), this
- wakes the backoff sleep and returns once the attempt is initiated — not
- once the connection is established. For one-shot transports it resolves
- after the connect attempt completes. Failures are logged but do not reject
- this promise. No-op when all transports are already connected.
*/
reconnect(): Promise
;
/**
- Resolves when all transports registered at connect-time have completed
- subscribe-sync. If {@link connect} was called with the default
awaitReady: true, - this is already resolved when
connect()returns. IfawaitReady: falsewas used, - await this to ensure readiness before dispatching requests.
- Resolves immediately if no transports are registered or
connect()has not been called. */ readonly ready: Promise;
/**
- Connect all registered transports and optionally await subscribe-sync readiness.
- Calls
transport.connect()on every transport registered by this bus instance. - If any transport fails to connect, all transports are disconnected and unregistered
- (rollback) before the error is re-thrown. Pass
{ awaitReady: false }to resolve - as soon as sockets are open without waiting for the subscribe-sync handshake.
- Concurrent calls are safe — a second in-flight call awaits the same promise. Once
- sockets are open, subsequent calls are no-ops unless a prior
connect({ awaitReady: false })left the readiness handshake pending; in that case,- default
connect()still awaitsbus.ready. - @param options - Connection options (see {@link ConnectOptions})
- @throws If any transport’s
connect()orreadypromise rejects (after rollback) */ connect(options?: ConnectOptions): Promise;
getContext(): MakaioBusContext;
registerNamespace<Domain extends string, Schemas extends Record<string, SubjectSchema>>( domain: Domain, schemas: Schemas, options?: NamespaceRegistrationOptions, ): BusNamespace< Domain, SubjectRecordFromSchemaRecord
, FilterablePayloadIntersection<SubjectRecordFromSchemaRecord >, Schemas ;
/** Get the schema for a registered subject, or undefined if not found. */ getSchema
(subject: T | string): SubjectSchema | undefined; /**
- Extend a registered subject’s schema with additional fields.
- Adds new root-level fields to the Zod schema used for dev-mode validation and
- widens the TypeScript payload type. Successive calls accumulate — two packages
- can independently extend the same subject without overwriting each other.
- The returned value is the same runtime SubjectDefinition object — only the
- TypeScript type is widened. Bus routing is unaffected.
- @param subject - SubjectDefinition from a registered namespace
- @param extensions - For request subjects:
{ request?: { field: z.string() }, response?: {...} }. - For event subjects:
{ field: z.string() }(flat record of additional Zod fields). - @returns The same SubjectDefinition with wider TypeScript types
*/
extendSubject<SD extends Subjects & StrictNamespace, Ext extends import(’../extend-subject.js’).SubjectExtension
>( subject: SD, extensions: Ext, ): import(’../extend-subject.js’).ExtendedSubjectDefinition<SD, Ext>; }`
Resolved Shape
Section titled “Resolved Shape”type BusLike = { namespace: unknown; on: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never, options?: OnOptions) => () => void; intercept: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }>(subject: Subject, handler: InterceptorHandler<Subject['$meta']['payload']>, options?: InterceptOptions) => () => void; once: { <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never): () => void; <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends false ? Subject : never, options?: (Subject extends SubjectDefinition ? OnceOptions<Subject> : never) | undefined): Promise<Subject extends SubjectDefinition ? Parameters<HandlerForSubjectDefinition<Subject>>[0] : never>; }; emit: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsWildcard = Subject['subject'] extends '*' ? true : false, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends false ? IsWildcard extends false ? Subject : never : never, payload: Subject['$meta']['payload'], options?: EmitOptions) => Promise<void>; request: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>; requestOptional: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<OptionalResult<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>>; broadcast: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<Array<BroadcastResult<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>>>; scoped: <Domain extends string, Subjects extends SubjectRecord, F, Sc extends Record<string, SubjectSchema>>(input: BusNamespace<Domain, Subjects, F, Sc>, context?: MakaioBusContext) => ScopedBus<Domain>; withFilter: <Payload = unknown>(filter: [unknown] extends [Payload] ? PayloadFilter : TypedPayloadFilter<Payload>) => IFilteredBus<string, unknown, unknown>; __onAny: (handler: AnyHandler) => () => void; __resetHandlers?: (() => void) | undefined; registerTransport: (transport: BusTransport) => TransportRegistration; unregisterTransport: (name: string) => void; disconnect: () => void; reconnect: () => Promise<void>; ready: Promise<void>; connect: (options?: ConnectOptions) => Promise<void>; getContext: () => MakaioBusContext; registerNamespace: <Domain extends string, Schemas extends Record<string, SubjectSchema>>(domain: Domain, schemas: Schemas, options?: NamespaceRegistrationOptions) => BusNamespace<Domain, SubjectRecordFromSchemaRecord<Schemas>, FilterablePayloadIntersection<SubjectRecordFromSchemaRecord<Schemas>>, Schemas>; getSchema: <T extends SubjectDefinition>(subject: T | string) => SubjectSchema | undefined; extendSubject: <SD extends SubjectDefinition & { $meta: { namespace: string; }; }, Ext extends import("../extend-subject.js").SubjectExtension<SD>>(subject: SD, extensions: Ext) => import("../extend-subject.js").ExtendedSubjectDefinition<SD, Ext>;};