Skip to content

Type Alias: BusLike

Makaio Framework


Makaio Framework / tools-core / BusLike

BusLike = IMakaioBus

Defined in: ../../../tools/core/src/types.ts:12

Type alias for the bus interface used in tool execution contexts.

Tools receive IMakaioBus instances for type-safe event communication. This ensures full type safety with typed subjects from @makaio/contracts.

  • BusLike

    • Register an event or request handler.
    • Events: Fire-and-forget - multiple handlers can listen and execute in parallel.
    • Requests: Request-response with middleware support - handlers form a chain.
    • Wildcard Support: Use .$all property on subjects to match all subjects in a namespace.
    • Wildcard handlers receive unknown payload and must use type guards.
    • @param subject - SubjectDefinition object (exact or wildcard)
    • @param handler - Handler function (EventHandler for events, RequestHandler for requests)
    • @returns Unsubscribe function
    • @example Basic event handler with typed payload
    • const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
    • started: z.object({ agentId: z.string() }),
    • });
    • const unsubscribe = on(AgentSubjects.started, (context) => {
    • context.payload.agentId; // ✅ string - fully typed
    • console.debug(‘Agent started:’, context.payload.agentId);
    • });
    • @example Wildcard handlers for namespace-level events
    • // Matches all subjects in namespace
    • on(AgentSubjects.$all, (context) => {
    • context.payload; // unknown - must use type guards
    • if (‘agentId’ in context.payload) {
    • console.debug('Any agent event:', context.payload.agentId);
    • }
    • });
    • @example Request handler with typed request/response
    • const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
    • toolApprove: {
    • request: z.object({ toolName: z.string() }),
    • response: z.object({ approved: z.boolean() }),
    • },
    • });
    • on(AgentSubjects.toolApprove, (context) => {
    • const { toolName } = context.payload; // ✅ fully typed
    • context.setResult({ approved: true });
    • });
    • @example Middleware pattern for request chain
    • on(AgentSubjects.toolApprove, async (context) => {
    • console.debug(‘Before approval’);
    • await context.next();
    • console.debug(‘After approval’);
    • });

    */ on<Subject extends Subjects & StrictNamespace, IsChannel = Subject[‘$meta’][‘channel’]>( subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition : never, options?: OnOptions, ): () => void;

    /** Register an interceptor that runs BEFORE handlers (payload transform, blocking, priority). */ intercept<Subject extends Subjects & StrictNamespace>( subject: Subject, handler: InterceptorHandler<Subject[‘$meta’][‘payload’]>, options?: InterceptOptions, ): () => void;

    /**

    • Register a one-time event or request handler that auto-unsubscribes after first invocation.
    • Wraps the on() method to automatically unsubscribe after the handler fires once.
    • The handler is removed BEFORE being invoked to prevent re-entrance issues if the
    • handler triggers the same event.
    • Events: Fire-and-forget - handler executes once then auto-unsubscribes.
    • Requests: Request-response - handler executes once then auto-unsubscribes.
    • Wildcard Support: Like on(), supports .$all property for namespace-level patterns.
    • @param subject - SubjectDefinition object (exact or wildcard)
    • @param handler - Handler function (EventHandler for events, RequestHandler for requests)
    • @returns Unsubscribe function for manual cleanup if needed
    • @example Basic one-time event handler
    • const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
    • started: z.object({ agentId: z.string() }),
    • });
    • once(AgentSubjects.started, (context) => {
    • context.payload.agentId; // ✅ string - fully typed
    • console.debug(‘Agent started once:’, context.payload.agentId);
    • });
    • await emit(AgentSubjects.started, { agentId: ‘agent-123’ }); // Handler fires
    • await emit(AgentSubjects.started, { agentId: ‘agent-456’ }); // Handler does NOT fire
    • @example Manual unsubscribe before first fire
    • const unsubscribe = once(AgentSubjects.started, (context) => {
    • console.debug(‘This will never run’);
    • });
    • unsubscribe(); // Manually unsubscribe before event fires
    • await emit(AgentSubjects.started, { agentId: ‘agent-123’ }); // Handler does NOT fire
    • @example Wildcard handler for one-time namespace monitoring
    • once(AgentSubjects.$all, (context) => {
    • context.payload; // unknown - must use type guards
    • console.debug(‘First agent event:’, context.payload);
    • });
    • @example One-time request handler
    • const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
    • toolApprove: {
    • request: z.object({ toolName: z.string() }),
    • response: z.object({ approved: z.boolean() }),
    • },
    • });
    • once(AgentSubjects.toolApprove, (context) => {
    • const { toolName } = context.payload; // ✅ fully typed
    • context.setResult({ approved: true });
    • });
    • await request(AgentSubjects.toolApprove, { toolName: ‘deleteFile’ }); // Handler fires
    • await request(AgentSubjects.toolApprove, { toolName: ‘createFile’ }); // Handler does NOT fire

    */ once<Subject extends Subjects & StrictNamespace, IsChannel = Subject[‘$meta’][‘channel’]>( subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition : never, ): () => void;

    /**

    • Wait for an event to occur, returning a Promise.
    • Note: Request subjects are not supported with the promise version of once().
    • Use the callback version for request handlers: once(subject, handler)
    • @param subject - Event SubjectDefinition object (exact or wildcard)
    • @param options - Options object with: timeoutMs (reject after timeout), filter (only resolve when filter returns true), signal (AbortSignal to cancel waiting)
    • @returns Promise that resolves with the event context
    • @example Simple await
    • const ctx = await bus.once(Subjects.init);
    • console.debug(‘Event received:’, ctx.payload);
    • @example With timeout
    • try {
    • const ctx = await bus.once(Subjects.init, { timeoutMs: 5000 });
    • } catch (err) {
    • if (err instanceof OnceTimeoutError) {
    • console.debug('Timed out waiting for event');
    • }
    • }
    • @example With filter (waits for matching event)
    • const ctx = await bus.once(Subjects.message, {
    • filter: (payload) => payload.sessionId === expectedId
    • });
    • // Resolves only when a message with matching sessionId is received
    • @example With AbortSignal
    • const controller = new AbortController();
    • const promise = bus.once(Subjects.init, { signal: controller.signal });
    • controller.abort(); // Cancels the wait

    */ once< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],

    ( subject: IsChannel extends true ? never : IsRequest extends false ? Subject : never, options?: Subject extends SubjectDefinition ? OnceOptions : never, ): Promise<Subject extends SubjectDefinition ? Parameters<HandlerForSubjectDefinition>[0] : never>; /**

    • Emit an event to all registered handlers.
    • Events are fire-and-forget - all handlers execute in parallel.
    • Handler errors are logged but don’t stop other handlers from executing.
    • Note: You cannot emit to wildcard patterns. Use concrete subject keys only.
    • Handlers registered with wildcards will still receive the event if it matches.
    • @param subject - Concrete event subject (wildcards not allowed)
    • @param payload - Event payload
    • @param options - Emit options (messageId, correlationId, transports)
      • transports: undefined - Send to ALL registered transports (default)
      • transports: [] - Local only, don’t send to any transports
      • transports: ['ws', 'nats'] - Send only to specified transports
    • @example
    • const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
    • started: z.object({ agentId: z.string() }),
    • });
    • // Send to all transports (default)
    • await emit(AgentSubjects.started, { agentId: ‘agent-123’ });
    • // Local only, no transports
    • await emit(
    • AgentSubjects.started,
    • { agentId: ‘agent-123’ },
    • { transports: [] }
    • );
    • // Send to specific transports
    • await emit(
    • AgentSubjects.started,
    • { agentId: ‘agent-123’ },
    • { transports: [‘websocket’] }
    • );
    • // With tracking IDs:
    • await emit(
    • AgentSubjects.started,
    • { agentId: ‘agent-123’ },
    • { correlationId: ‘user-action-123’ }
    • );

    */ emit< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsWildcard = Subject[‘subject’] extends WildcardSubject ? true : false, IsChannel = Subject[‘$meta’][‘channel’],

    ( subject: IsChannel extends true ? never : IsRequest extends false ? IsWildcard extends false ? Subject : never : never, payload: Subject[‘$meta’][‘payload’], options?: EmitOptions, ): Promise; /**

    • Execute a request and wait for a response.
    • Requests follow a middleware chain pattern - handlers are called in order
    • until one calls setResult() or all handlers complete.
    • Note: You cannot request via wildcard patterns. Use concrete subject keys only.
    • Handlers registered with wildcards will still match if the subject matches their pattern.
    • @param subject - Concrete request subject (wildcards not allowed)
    • @param payload - Request payload
    • @param options - Request options (timeout, correlationId, transports)
    • @returns Response value
    • @throws {NoHandlerError} If no handler is registered
    • @throws {TimeoutError} If request times out
    • @throws {ValidationError} If payload validation fails
    • @throws {RequestError} If handler throws an error
      • transports: undefined - Send to ALL registered transports (default)
      • transports: [] - Local only, don’t send to any transports
      • transports: ['ws', 'nats'] - Send only to specified transports
    • @example
    • // ✅ Concrete subject
    • const result = await request(
    • AgentSubjects.toolApprove,
    • { toolName: ‘deleteFile’, args: {}, toolCallId: ‘call_123’ },
    • { timeout: 10000 }
    • );
    • console.debug(result.approved);
    • // ❌ Cannot use wildcards
    • // await request(‘agent.*’, { … }); // Type error
    • // With specific transports
    • const result = await request(
    • AgentSubjects.toolApprove,
    • { toolName: ‘deleteFile’, args: {}, toolCallId: ‘call_123’ },
    • { transports: [‘websocket’], timeout: 10000 }
    • );

    */ request< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],

    ( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>;

    /**

    • Execute a request, returning a discriminated union instead of throwing for missing handlers.
    • Use for optional services. Only NoHandlerError is caught - other errors propagate.
    • @see {@link OptionalResult} for return type details */ requestOptional< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],

    ( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<OptionalResult<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>>;

    /**

    • Execute a broadcast request and collect responses from ALL handlers.
    • Unlike request() which uses a middleware chain and returns the first result,
    • broadcast() executes all handlers in parallel and aggregates their responses.
    • Use for discovery patterns where multiple nodes may respond (e.g., fs.listSources).
    • Note: You cannot broadcast via wildcard patterns. Use concrete subject keys only.
    • Handlers registered with wildcards will still match if the subject matches their pattern.
    • Handler Usage:
    • Handlers should call ctx.identify(nodeId) before ctx.setResult() to tag their response.
    • If identify() is not called, the response is tagged as ‘anonymous’.
    • @param subject - Concrete request subject (wildcards not allowed)
    • @param payload - Request payload
    • @param options - Request options (timeout, correlationId)
    • @returns Array of { nodeId, payload } responses from all handlers
    • @example
    • // Discover all filesystem sources from all nodes
    • const results = await MakaioBus.broadcast(FileSystemSubjects.listSources, {});
    • // results: [
    • // { nodeId: ‘local’, payload: { sources: […] } },
    • // { nodeId: ‘container-1’, payload: { sources: […] } },
    • // ]
    • // Aggregate sources
    • const allSources = results.flatMap(r => r.payload.sources);

    */ broadcast< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],

    ( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<BroadcastResult<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>[]>;

    scoped: <Domain extends string, Subjects extends SubjectRecord, F, Sc extends Record<string, SubjectSchema>>( input: BusNamespace<Domain, Subjects, F, Sc>, context?: MakaioBusContext, ) => ScopedBus;

    /**

    • Create a filtered bus with a base payload filter.
    • The filter is automatically applied to all on() and once() calls.
    • Optionally provide a type parameter for type-safe filter keys.
    • @param filter - Base filter to apply to all subscriptions
    • @returns FilteredBus with the specified filter
    • @example
    • // Untyped (loose) - any keys allowed
    • const agentBus = MakaioBus.withFilter({ agentId: this.agentId });
    • // Type-safe filter keys
    • interface AgentPayload { agentId: string; sessionId: string }
    • const strictBus = MakaioBus.withFilter({ agentId: ‘x’ });

    */ withFilter: <Payload = unknown>( filter: [unknown] extends [Payload] ? PayloadFilter : TypedPayloadFilter, ) => IFilteredBus<NamespaceDomain extends string ? NamespaceDomain : string>;

    /**

    • Register a handler that receives ALL messages (events and requests) across all namespaces.
    • Debugging/Testing Only: Noops in production (process.env.NODE_ENV === ‘production’).
    • Useful for logging, debugging, and test assertions that need visibility into all bus activity.
    • Handler receives complete metadata: type, subject, namespace, payload, messageId, correlationId.
    • @param handler - Function to invoke for every message
    • @returns Unsubscribe function (noop in production)
    • @example
    • const unsubscribe = bus.__onAny((context) => {
    • console.debug([${context.type}] ${context.namespace}:${context.subject}, context.payload);
    • });

    */ __onAny: (handler: AnyHandler) => () => void; __resetHandlers?: () => void;

    /**

    • Register a transport by its name property.
    • Convenience method that delegates to getContext().transportRegistry.registerTransport().
    • The transport’s name property is used as the registry key.
    • @param transport - Transport to register
    • @returns Registration object with unregister and ready promise */ registerTransport(transport: BusTransport): TransportRegistration;

    /**

    • Unregister a transport by name.
    • No-op if no transport is registered under that name.
    • @param name - Transport name to unregister */ unregisterTransport(name: string): void;

    /**

    • Disconnect all registered transports and clear the transport map.
    • Convenience method for tearing down a bus instance. The inverse of
    • passing transports to createBusInstance() or calling
    • registerTransport() individually. */ disconnect(): void;

    /**

    • Trigger an immediate reconnection attempt on all disconnected transports.
    • Delegates to each transport’s reconnect() method if available. For
    • transports with exponential-backoff reconnection (e.g. WebSocket), this
    • wakes the backoff sleep and returns once the attempt is initiated — not
    • once the connection is established. For one-shot transports it resolves
    • after the connect attempt completes. Failures are logged but do not reject
    • this promise. No-op when all transports are already connected. */ reconnect(): Promise;

    /**

    • Resolves when all transports registered at connect-time have completed
    • subscribe-sync. If {@link connect} was called with the default awaitReady: true,
    • this is already resolved when connect() returns. If awaitReady: false was used,
    • await this to ensure readiness before dispatching requests.
    • Resolves immediately if no transports are registered or connect() has not been called. */ readonly ready: Promise;

    /**

    • Connect all registered transports and optionally await subscribe-sync readiness.
    • Calls transport.connect() on every transport registered by this bus instance.
    • If any transport fails to connect, all transports are disconnected and unregistered
    • (rollback) before the error is re-thrown. Pass { awaitReady: false } to resolve
    • as soon as sockets are open without waiting for the subscribe-sync handshake.
    • Concurrent calls are safe — a second in-flight call awaits the same promise. Once
    • sockets are open, subsequent calls are no-ops unless a prior
    • connect({ awaitReady: false }) left the readiness handshake pending; in that case,
    • default connect() still awaits bus.ready.
    • @param options - Connection options (see {@link ConnectOptions})
    • @throws If any transport’s connect() or ready promise rejects (after rollback) */ connect(options?: ConnectOptions): Promise;

    getContext(): MakaioBusContext;

    registerNamespace<Domain extends string, Schemas extends Record<string, SubjectSchema>>( domain: Domain, schemas: Schemas, options?: NamespaceRegistrationOptions, ): BusNamespace< Domain, SubjectRecordFromSchemaRecord, FilterablePayloadIntersection<SubjectRecordFromSchemaRecord>, Schemas

    ;

    /** Get the schema for a registered subject, or undefined if not found. */ getSchema(subject: T | string): SubjectSchema | undefined;

    /**

    • Extend a registered subject’s schema with additional fields.
    • Adds new root-level fields to the Zod schema used for dev-mode validation and
    • widens the TypeScript payload type. Successive calls accumulate — two packages
    • can independently extend the same subject without overwriting each other.
    • The returned value is the same runtime SubjectDefinition object — only the
    • TypeScript type is widened. Bus routing is unaffected.
    • @param subject - SubjectDefinition from a registered namespace
    • @param extensions - For request subjects: { request?: { field: z.string() }, response?: {...} }.
    • For event subjects: { field: z.string() } (flat record of additional Zod fields).
    • @returns The same SubjectDefinition with wider TypeScript types */ extendSubject<SD extends Subjects & StrictNamespace, Ext extends import(’../extend-subject.js’).SubjectExtension>( subject: SD, extensions: Ext, ): import(’../extend-subject.js’).ExtendedSubjectDefinition<SD, Ext>; }`
type BusLike = {
namespace: unknown;
on: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never, options?: OnOptions) => () => void;
intercept: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }>(subject: Subject, handler: InterceptorHandler<Subject['$meta']['payload']>, options?: InterceptOptions) => () => void;
once: { <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never): () => void; <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends false ? Subject : never, options?: (Subject extends SubjectDefinition ? OnceOptions<Subject> : never) | undefined): Promise<Subject extends SubjectDefinition ? Parameters<HandlerForSubjectDefinition<Subject>>[0] : never>; };
emit: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsWildcard = Subject['subject'] extends '*' ? true : false, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends false ? IsWildcard extends false ? Subject : never : never, payload: Subject['$meta']['payload'], options?: EmitOptions) => Promise<void>;
request: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>;
requestOptional: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<OptionalResult<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>>;
broadcast: <Subject extends SubjectDefinition & { $meta: { namespace: string; }; }, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<Array<BroadcastResult<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>>>;
scoped: <Domain extends string, Subjects extends SubjectRecord, F, Sc extends Record<string, SubjectSchema>>(input: BusNamespace<Domain, Subjects, F, Sc>, context?: MakaioBusContext) => ScopedBus<Domain>;
withFilter: <Payload = unknown>(filter: [unknown] extends [Payload] ? PayloadFilter : TypedPayloadFilter<Payload>) => IFilteredBus<string, unknown, unknown>;
__onAny: (handler: AnyHandler) => () => void;
__resetHandlers?: (() => void) | undefined;
registerTransport: (transport: BusTransport) => TransportRegistration;
unregisterTransport: (name: string) => void;
disconnect: () => void;
reconnect: () => Promise<void>;
ready: Promise<void>;
connect: (options?: ConnectOptions) => Promise<void>;
getContext: () => MakaioBusContext;
registerNamespace: <Domain extends string, Schemas extends Record<string, SubjectSchema>>(domain: Domain, schemas: Schemas, options?: NamespaceRegistrationOptions) => BusNamespace<Domain, SubjectRecordFromSchemaRecord<Schemas>, FilterablePayloadIntersection<SubjectRecordFromSchemaRecord<Schemas>>, Schemas>;
getSchema: <T extends SubjectDefinition>(subject: T | string) => SubjectSchema | undefined;
extendSubject: <SD extends SubjectDefinition & { $meta: { namespace: string; }; }, Ext extends import("../extend-subject.js").SubjectExtension<SD>>(subject: SD, extensions: Ext) => import("../extend-subject.js").ExtendedSubjectDefinition<SD, Ext>;
};