Skip to content

Abstract Class: BaseLogOrchestrator<TRecord, TState>

Makaio Framework


Makaio Framework / ai-adapters-core/node / BaseLogOrchestrator

Abstract Class: BaseLogOrchestrator<TRecord, TState>

Section titled “Abstract Class: BaseLogOrchestrator<TRecord, TState>”

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:41

Abstract base class for log import orchestrators.

Supports JSONL (byte offset cursors) and JSON (mtime-based) file formats. Concrete implementations provide adapter-specific file parsing.

TRecord

The adapter’s native log record type

TState = unknown

The adapter’s resumable state type (default: unknown)

protected new BaseLogOrchestrator<TRecord, TState>(config, importer): BaseLogOrchestrator<TRecord, TState>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:64

LogOrchestratorConfig

LogImporter<TRecord, TState>

BaseLogOrchestrator<TRecord, TState>

protected readonly config: Required<Omit<LogOrchestratorConfig, "directory" | "checkMakaioManaged">> & object

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:42

optional checkMakaioManaged?: (sessionId) => Promise<boolean>

string

Promise<boolean>

optional directory?: string


protected readonly eventQueue: LogImportEventQueue

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:48


protected readonly importer: LogImporter<TRecord, TState>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:54

Importer instance - subclasses create and manage their own typed instance.


abstract protected readonly logPrefix: string

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:51

Log prefix for console output - set by subclass


protected readonly watcher: LogImportWatcher

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:47

protected buildCursorSessionContext(context): object & object

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:147

Builds the serialized session context for cursor persistence.

LogImportSessionContext<TState>

The import session context to serialize.

Serialized cursor session context.


dispose(): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:232

Promise<void>


abstract protected getLogFilePattern(): string

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:98

Get the glob pattern for log files to watch.

string


protected getMaxRecords(): number | undefined

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:138

Returns the maximum number of records to parse per file. Override in subclasses to limit parsing (e.g., discovery mode).

number | undefined

Maximum record count, or undefined for no limit.


protected handleFileChange(event): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:273

Handle a file-change event: resolve cursor state and route to first-read or incremental-read as appropriate.

Subclasses (e.g., DiscoveryOrchestrator) may override this method to substitute an entirely different dispatch strategy.

LogFileChangeEvent

File change event from the watcher

Promise<void>


protected handleFirstRead(filePath, records, bytesRead, mtime, isJsonFormat, startOffset, emitLifecycleEvents?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:351

Handle the first read of a log file.

Extracts session context, emits session lifecycle events, processes records, and saves the cursor. Subclasses may override to change behavior (e.g., DiscoveryOrchestrator skips message processing).

string

Path to the log file

TRecord[]

Records parsed from the file

number

Bytes read during this parse

Date

File modification time

boolean

Whether the file uses JSON (mtime-based) format

number

Byte offset this read started from

boolean = true

Whether to emit session and started lifecycle events for this pass

Promise<void>


protected handleIncrementalRead(filePath, records, cursorContext, bytesRead, mtime, isJsonFormat, startOffset): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:418

Handle an incremental read of a log file.

Processes new records since the last cursor position. Subclasses may override to skip incremental processing (e.g., DiscoveryOrchestrator).

string

Path to the log file

TRecord[]

New records since last read

object & object

Existing cursor session context

number

Bytes read during this parse

Date

File modification time

boolean

Whether the file uses JSON (mtime-based) format

number

Byte offset this read started from

Promise<void>


isEnabled(): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:173

boolean


isRunning(): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:177

boolean


protected isSessionSkipped(adapterSessionId): Promise<boolean>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:514

Determine whether an adapter session should be skipped from import.

Delegates to the managed-session cache so concurrent checks for the same adapterSessionId share one storage lookup and tracked skips update stats.

string

External adapter session ID being evaluated

Promise<boolean>

Promise resolving to true when the session is Makaio-managed and should be skipped


protected maybeUpdateCursor(filePath, bytesRead, startOffset, mtime, isJsonFormat, sessionContext?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:493

string

number

number

Date

boolean

object & object

Promise<void>


abstract protected parseFile(filePath, startOffset, maxRecords?): Promise<ParseFileResult<TRecord>>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:118

Parse a log file from the given byte offset.

string

Path to the log file

number

Byte offset to start reading from

number

Optional maximum number of records to return (for shallow discovery)

Promise<ParseFileResult<TRecord>>


protected queueCursorUpdate(filePath, bytesRead, startOffset, mtime, isJsonFormat, sessionContext?, precedingEventPromises?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:549

Enqueue a cursor write as a PQueue task so it executes after all previously queued events for the same batch have been emitted.

PQueue runs tasks with concurrency=1 in FIFO order, so placing the cursor write at the back of the queue after all queueEvent calls guarantees that the cursor advances only once every preceding event in the batch has been delivered. This prevents the race where a process exit after queueEvent but before queue drain would leave events lost yet the cursor advanced.

string

Path to the log file

number

Total bytes read at the new cursor position

number

Byte offset this read started from

Date

File modification time

boolean

Whether the file uses JSON (mtime-based) format

object & object

Serialized session context for the cursor

Promise<void>[] = []

Emission promises queued before this cursor write

Promise<void>

Promise that resolves when the cursor write has completed


protected queueEvent(event): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:527

Queue a normalized event and return its delivery promise.

NormalizedEvent

Normalized event to emit

Promise<void>

Promise that resolves after delivery or rejects on emit failure


protected shouldSkipFile(_filePath): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:108

Determine whether a discovered file should be skipped before processing.

Default implementation never skips. Subclasses may override to exclude files by name (e.g., ephemeral compaction summary files).

string

Absolute path to the candidate file

boolean

true to skip the file entirely, false to process normally


start(): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:181

Promise<void>


stop(): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:210

Promise<void>


protected trackFileChange(event): void

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:327

Track a watcher-triggered file import so shutdown can wait for any cursor work that the handler enqueues before draining the shared FIFO queue.

LogFileChangeEvent

File change event to process

void


protected trackImportedSession(adapterSessionId): void

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:401

Record that a session has been imported, updating import statistics.

Protected to allow subclasses to update stats when processing additional sessions (e.g., compress child sessions from compacted files).

string

The session ID that was imported

void


protected updateCursor(filePath, bytesRead, mtime, sessionContext?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:251

string

number

Date

object & object

Promise<void>


protected usesJsonFormat(): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:164

Check if this orchestrator uses JSON format (mtime-based cursor).

boolean

True if JSON format, false for JSONL


protected validateRecords(records): TRecord[]

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:129

Validate and filter parsed records. Default: returns records as-is.

TRecord[]

Raw parsed records

TRecord[]

Validated/filtered records


static createDefaultCheckMakaioManaged(): (sessionId) => Promise<boolean>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:243

Function that checks if a session is Makaio-managed.

(sessionId) => Promise<boolean>