Skip to content

Interface: BusTransport

Makaio Framework


Makaio Framework / bus-core / BusTransport

Defined in: ../../../packages/bus-core/src/types/transports.ts:260

Transport interface for bus communication.

Transports handle serialization, wire protocol, and connection management for cross-process communication.

class WebSocketTransport implements BusTransport {
readonly name = 'websocket';
async send(message: BusMessage): Promise<void> {
this.ws.send(JSON.stringify(message));
}
onReceive(handler: BusReceiveHandler): () => void {
const listener = (data) => {
const message = JSON.parse(data) as BusMessage;
const context: TransportReceiveContext = { transportName: this.name };
void handler(message, context);
};
this.ws.on('message', listener);
return () => this.ws.off('message', listener);
}
async connect(): Promise<void> {
await this.ws.connect();
}
async disconnect(): Promise<void> {
await this.ws.close();
}
}

name: string

Defined in: ../../../packages/bus-core/src/types/transports.ts:274

Unique transport name used as the registry key.

The name is used by IMakaioBus.registerTransport() to key the transport in the internal registry. It must be unique within a single bus instance. Implementations should declare this as a readonly literal or class field.

class MyTransport implements BusTransport {
readonly name = 'my-transport';
}

optional onConnected?: () => void

Defined in: ../../../packages/bus-core/src/types/transports.ts:408

Optional callback set by the transport registry during registration. Called each time the transport establishes a connection (initial or reconnect), after authentication and subscription replay are complete.

void


optional onDisconnected?: () => void

Defined in: ../../../packages/bus-core/src/types/transports.ts:415

Optional callback set by the transport registry during registration. Called when the transport loses its connection unexpectedly. Not called on an explicit disconnect().

void


optional onNewReadySession?: (promise) => void

Defined in: ../../../packages/bus-core/src/types/transports.ts:401

Optional callback set by the transport registry during registration. Transports that support reconnection call this at the start of each new session so the registry can track the new ready promise for dispatch gating.

Promise<void>

The new ready promise for the current session

void


optional ready?: Promise<void>

Defined in: ../../../packages/bus-core/src/types/transports.ts:392

Optional: Promise that resolves when the transport is fully operational (end-to-end, including remote handler availability).

Resolved after the bus has sent the initial subscribe sync to the transport and the transport has received the BusSubscribeSyncCompleteMessage handshake. Before this resolves, remoteRequestHandlers for this transport may be incomplete and requests should not be routed through it.

Transports that do not implement ready are considered immediately ready.

Transports that do not implement onNewReadySession are single-use: after disconnect() completes, a later connect() call has undefined behavior. Reconnectable transports must call onNewReadySession each time they create a new ready promise so the registry can update its gating.

optional cancelRequest(correlationId, error?): void

Defined in: ../../../packages/bus-core/src/types/transports.ts:346

Optional: cancel/cleanup pending correlation state for an in-flight request.

Used when the caller aborts while the transport-level correlation promise is still pending (e.g. timeout 0 + AbortSignal). Implementations should reject and remove the correlation entry if present.

string

Correlation ID to cancel

Error

Optional cancellation error to propagate

void


connect(): Promise<void>

Defined in: ../../../packages/bus-core/src/types/transports.ts:358

Connect the transport.

Promise<void>


disconnect(): Promise<void>

Defined in: ../../../packages/bus-core/src/types/transports.ts:363

Disconnect the transport.

Promise<void>


optional getSubscriptions(): Set<string>

Defined in: ../../../packages/bus-core/src/types/transports.ts:440

Optional: Report which subjects this transport is interested in. Enables transport-level filtering for efficiency.

If not implemented, the transport will receive all messages (broadcast mode). If implemented, only messages matching the subscription patterns will be sent.

Patterns can include wildcards (e.g., ‘adapter.*’ matches ‘adapter.log’, ‘adapter.init’, etc.)

Set<string>

Set of subscription patterns (can include wildcards)


optional isReady(): boolean

Defined in: ../../../packages/bus-core/src/types/transports.ts:428

Optional: Synchronous readiness check for transport-level gating.

When implemented and returning false, the bus skips this transport for outbound sends (requests, broadcasts, events). The transport continues to receive inbound messages via onReceive.

Complements the async ready promise: isReady() enables non-blocking skip decisions in hot paths, while ready supports await-based flows.

boolean

true if the transport can send messages, false otherwise


optional onBroadcastResults(correlationId, results, error?): void

Defined in: ../../../packages/bus-core/src/types/transports.ts:491

Receive aggregated broadcast results from the transport registry.

Called by the transport registry after executing local handlers and relaying to other transports for a broadcast that arrived from this transport via onReceive. Replaces the legacy send({ type: 'broadcast-response' }) side-channel when implemented.

Transports that manage their own peer-level fan-out (e.g., ServerTransport with multiple WebSocket clients) implement this to receive registry results without abusing the send() contract.

string

Correlation ID of the originating broadcast

readonly object[]

Aggregated results from local handlers and relay transports

BusTransportError

Optional structured error when broadcast processing failed; results may be empty or partial in this case

void


onReceive(handler): () => void

Defined in: ../../../packages/bus-core/src/types/transports.ts:353

Register a handler for incoming messages.

BusReceiveHandler

Handler function

Unsubscribe function

() => void


optional reconnect(): Promise<void>

Defined in: ../../../packages/bus-core/src/types/transports.ts:374

Optional: Trigger an immediate reconnection attempt, bypassing any backoff wait.

If the transport is currently waiting in an exponential-backoff delay, calling this wakes it immediately so a new connection attempt starts without waiting. If reconnection is disabled on the transport, a one-shot connect attempt is made. No-op when the transport is already connected.

Promise<void>

Promise that resolves when the attempt is initiated or completes


send<TMessage>(message, timeout?): Promise<TMessage extends BusRequestMessage ? unknown : TMessage extends BusBroadcastMessage ? object[] : boolean>

Defined in: ../../../packages/bus-core/src/types/transports.ts:326

Send a message over the transport.

Behavior by message type:

  • Requests (BusRequestMessage): Returns the response payload from the handler. Throws error if no clients available to handle request (server mode).
  • Events (BusEventMessage): Returns delivery status boolean.
  • true: Delivered to at least one recipient
  • false: No recipients available (not an error)
  • Other messages (heartbeat, subscribe, etc.): Returns true if sent successfully.

Error handling:

  • Throws when transport is disconnected
  • Throws when sending requests with no connected clients (server mode only)
  • Never throws for events - returns false instead

Timeout contract: The timeout value flows from the caller’s bus.request({ timeout }) option through the dispatch layer to the transport’s correlation tracker. A value of 0 means no automatic timeout — the promise stays open until resolved or rejected externally (e.g. by the caller’s own AbortSignal).

TMessage extends BusMessage

TMessage

Message to send

number

Correlation timeout in milliseconds; 0 means no automatic timeout

Promise<TMessage extends BusRequestMessage ? unknown : TMessage extends BusBroadcastMessage ? object[] : boolean>

Promise resolving to response payload (requests) or delivery status (events/other)

// Request: type-safe response handling
const request: BusRequestMessage = {
type: 'request',
subject: 'user.get',
namespace: 'api',
payload: { id: 123 },
correlationId: 'req-1',
messageId: 'msg-1',
};
const response = await transport.send(request, 5000); // response: unknown
// Event: delivery status
const event: BusEventMessage = {
type: 'event',
subject: 'user.created',
namespace: 'api',
payload: { id: 123 },
messageId: 'evt-1',
};
const delivered = await transport.send(event, 0); // delivered: boolean
if (!delivered) {
console.warn('Event not delivered - no subscribers');
}

subscribe(subject, filter?, priorities?): Promise<void>

Defined in: ../../../packages/bus-core/src/types/transports.ts:461

Subscribe to a subject for smart-routing.

Transports use this to track which subjects have active handlers, enabling targeted message delivery instead of broadcast.

When priorities is provided the transport should include the priority information in the next outbound subscribe wire message for this subject, enabling cross-transport priority-based dispatch on the remote side. Calling this method again with a new priorities array replaces the previously advertised set for that subject (re-subscribe semantics).

Transports that do not need subscription management (e.g. local-only loopback transports) should provide a no-op implementation.

string

Subject pattern to subscribe to (can include wildcards)

PayloadFilter

Optional payload filter for fine-grained routing

number[]

Handler priorities registered for this subject; an empty array signals event-only handlers that do not participate in priority dispatch

Promise<void>


unsubscribe(subject): Promise<void>

Defined in: ../../../packages/bus-core/src/types/transports.ts:473

Unsubscribe from a subject.

Called when the last handler for a subject is removed, allowing the transport to stop routing messages for that subject.

Transports that do not need subscription management should provide a no-op implementation.

string

Subject pattern to unsubscribe from

Promise<void>