personal memory agent

Callosum Protocol#

Callosum is a JSON-per-line message bus for real-time event distribution across solstone services.

Protocol#

Transport: Unix domain socket at journal/health/callosum.sock

Format: Newline-delimited JSON. Broadcast to all connected clients.

Message Structure:

{
  "tract": "source_subsystem",
  "event": "event_type",
  "ts": 1234567890123,
  // ... tract-specific fields
}

Required Fields:

  • tract - Source subsystem identifier (string)
  • event - Event type within tract (string)
  • ts - Timestamp in milliseconds (auto-added by server if missing)

Behavior:

  • All connections are bidirectional (can emit and receive)
  • No routing, no filtering - all messages broadcast to all clients
  • Clients should drain socket continuously to prevent backpressure

Tract Registry#

Note: This registry is kept intentionally high-level. For detailed field schemas and current implementation, always refer to the source files listed - they are the authoritative reference.

cortex - Agent execution events#

Source: think/cortex.py Events: request, start, thinking, tool_start, tool_end, finish, error, agent_updated, info, status Details: See CORTEX.md for agent lifecycle, configuration, and event schemas

supervisor - Process lifecycle management#

Source: think/supervisor.py Events: started, stopped, restarting, status, queue Listens for: request (task spawn), restart (service restart) Key fields: ref (instance ID), service (name), pid, exit_code Purpose: Unified lifecycle events for all supervised processes (services and tasks)

Per-command task queue: Tasks are serialized by command name (e.g., "indexer"):

  • If no task with that command is running → run immediately
  • If command is already running → queue the request (FIFO)
  • Deduped by exact cmd match (same command+args won't queue twice)
  • When task completes → next queued request runs automatically

Ref tracking: Callers can provide a ref field in requests to track completion:

  • If omitted, supervisor generates a timestamp-based ref
  • stopped events include the ref, allowing callers to match their request
  • When duplicate requests are deduped, their refs are coalesced - all refs receive stopped events when the single execution completes

Queue event: Emitted when queue state changes:

{"tract": "supervisor", "event": "queue", "command": "indexer", "running": "ref123", "queued": 2, "queue": [{"refs": ["ref456"], "cmd": ["sol", "indexer", "--rescan"]}]}

logs - Process output streaming#

Source: think/runner.py Events: exec, line, exit Key fields: ref (correlates with supervisor), name, stream (stdout/stderr), line Purpose: Real-time stdout/stderr streaming and process exit events

observe - Multimodal capture and processing#

Sources:

  • Capture: standalone observer services (solstone-linux, solstone-tmux, solstone-macos) upload via remote ingest
  • Processing: observe/sense.py, observe/describe.py, observe/transcribe/

Events:

Event Emitter Purpose
status sense Periodic state (every 5s) - see emit_status() in source
observing ingest Recording window boundary crossed, files saved
detected sense File detected, handler spawned
described describe Vision analysis complete
transcribed transcribe Audio transcription complete (includes VAD metadata)
observed sense All files for segment fully processed (may include errors)

Common fields: day, segment, remote (for remote uploads), stream (stream name, e.g., "archon", "import.apple") observing event fields:

  • meta (dict, optional): Metadata dict from remote observer. Contains host, platform, and any client-provided fields (e.g., facet, setting). Passed to handlers via SEGMENT_META env var and unrolled into JSONL metadata headers.
  • stream (str, optional): Stream name identifying the segment source. Set by observers, remote ingest, and importer.

observed event fields:

  • stream (str, optional): Stream name, forwarded from the originating observing event.
  • error (bool, optional): true if any handler failed during segment processing
  • errors (list[str], optional): Error descriptions for failed handlers (e.g., ["transcribe exit 1"])

Correlation: detected.ref matches logs.exec.ref; segment groups files from same capture window Event Log: Observe, dream, and activity tract events with day + segment are logged to <day>/<segment>/events.jsonl by supervisor

importer - Media import processing#

Source: think/importers/cli.py Events: started, status, completed, error Key fields: import_id (correlates all events), stage, segments (created segment keys), stream (stream name, e.g., "import.apple") Stages: initialization, segmenting, transcribing, summarizing Purpose: Track media file import from upload through transcription to segment creation

dream - Generator and agent processing#

Source: think/dream.py Events: started, status, group_started, group_completed, agent_started, agent_completed, completed, segments_started, segments_completed Key fields: mode ("daily"/"segment"/"activity"/"flush"), day, segment (when mode="segment" or "flush"), activity and facet (when mode="activity") Purpose: Track dream processing from generators through scheduled agents status - Periodic progress (every ~5s). Fields: mode, day, segment, stream, agents_completed, agents_total, current_group_priority, current_agents (list of running agent names). In --segments batch mode, also includes segments_completed, segments_total. In activity mode, includes activity, facet.

activity - Activity lifecycle events#

Sources: talent/activity_state.py (post-hook), talent/activities.py (post-hook) Events: live, recorded Event Log: Logged to <day>/<segment>/events.jsonl by supervisor

live - Emitted per active activity per segment (new or continuing). Provides real-time activity tracking. Key fields: facet, day, segment, id, activity (type), since, description, level, active_entities

recorded - Emitted when a completed activity record is written to journal. Supervisor queues a per-activity dream task on receipt. Key fields: facet, day, segment, id, activity (type), segments (full span), level_avg, description, active_entities

sync - Remote segment synchronization#

Source: observe/sync.py Events: status Key fields: queue_size, segment, state, host, platform Purpose: Track remote sync service status for segment uploads to central server

notification - In-app notification display#

Source: convey/static/websocket.js (client-side listener; any service can emit) Events: any (event name is not interpreted) Key fields: title (string), message (string), icon (string, emoji), action (string, URL path), facet (string), autoDismiss (number, ms), app (string, app name) Defaults: app → "system", icon → "📬", title → "Notification" (applied by AppServices.notifications.show()) Purpose: Forward Callosum events directly to the browser notification UI — any service can trigger an in-app notification card by emitting to this tract

Example:

callosum_send("notification", "show", title="Import Complete", message="3 segments imported", icon="📥", autoDismiss=5000)

Source: think/tools/navigate.py (sol call navigate) Events: request Key fields: path (string, URL path), facet (string, facet name) — at least one required Consumer: convey/static/websocket.js (built-in listener) Purpose: Navigate the browser to a URL path and/or switch to a facet — facet-only triggers selectFacet() without page reload, path triggers full page load, path+facet sets facet cookie before navigating


Key Concepts#

Correlation ID (ref): Universal identifier for process instances, used across tracts to correlate events. Auto-generated as epoch milliseconds if not provided.

Field Semantics:

  • service - Human-readable name (e.g., "cortex", "sol import")
  • ref - Unique instance ID (changes on each restart)
  • pid - Operating system process ID

Implementation#

Source: think/callosum.py

Client APIs#

CallosumConnection - Long-lived bidirectional connection with background thread

from think.callosum import CallosumConnection

conn = CallosumConnection()
conn.start(callback=handle_message)  # Start with optional message handler
conn.emit("tract", "event", field1="value")  # Queue message for send
conn.stop()  # Clean shutdown

callosum_send() - One-shot fire-and-forget for simple cases

from think.callosum import callosum_send

callosum_send("observe", "described", day="20251102", segment="143045_300")

CallosumServer - Broadcast server (started in-process by supervisor)

Convey Integration#

  • convey.emit() - Non-blocking emission from route handlers (uses shared bridge connection)
  • apps.events - Server-side event handlers via @on_event decorator

See APPS.md for app event handler patterns.

CLI Tools#

sol callosum / sol callosum listen - Listen to events on the message bus

sol callosum                              # Stream all events as JSONL
sol callosum listen --tract cortex        # Filter to cortex tract
sol callosum listen --event finish -p     # Pretty-print finish events

sol callosum send - Send a message to the bus

sol callosum send observe described day=20250101 segment=143045_300
sol callosum send '{"tract":"test","event":"ping","data":42}'
echo '{"tract":"test","event":"ping"}' | sol callosum send

Common Patterns#

Event-Driven Processing Chain#

The observe pipeline demonstrates event-driven handoffs:

observe.observing (files saved)
    ↓ sense (listening via Callosum)
observe.detected (handler spawned)
    ↓ logs.exec (process started)
observe.described / observe.transcribed (processing complete)
    ↓ sense tracks completion
observe.observed (segment fully processed)
    ↓ supervisor triggers dream, tracks flush timer
dream.completed
    ↓ apps/entities/events.py updates entity activity
activity.recorded (activity span completed)
    ↓ supervisor queues per-activity dream
dream --activity (runs schedule="activity" agents)

[If no new segments for FLUSH_TIMEOUT (1h):]
    ↓ supervisor queues flush
dream --flush (runs hook.flush agents to close dangling state)

See think/supervisor.py:_handle_segment_observed() for the observe→dream trigger and _handle_activity_recorded() for activity→dream.

Activity-scheduled agents declare schedule: "activity" with a required activities list (activity types to match, or ["*"] for all). They receive the activity's segment span as transcript source and $activity_* template variables in their prompts.

Status Event Pattern#

Long-running services emit status events every 5 seconds for health monitoring:

  • Supervisor checks event freshness to detect stale processes
  • UI displays live state from status events
  • See status emission methods in observer, sense, cortex for examples

Request/Response via Callosum#

For async task dispatch, use supervisor's request handling:

from convey import emit
emit("supervisor", "request", ref=task_id, cmd=["sol", "import", path])

For agent requests, use the cortex client:

from think.cortex_client import cortex_request
agent_id = cortex_request(prompt="...", name="default")

See think/cortex_client.py for the full API.