Skip to content

System Internals: Per-Service Details

Each section follows the event pipeline from a Home Assistant WebSocket frame through to a recorded execution. A top-to-bottom pass traces a single inbound event through every service it touches.

Bus Internals

Each app gets a Bus handle — a lightweight per-app object that delegates to the shared BusService singleton. The Bus translates on_*() calls into Listener objects. BusService indexes those listeners by topic and drives dispatch.

flowchart TD
    accTitle: Bus Event Routing
    accDescr: From app subscription through predicate filtering to handler invocation

    subgraph registration["Registration"]
        on["Bus.on_*()"]
        pca["Predicates (P)<br/>Conditions (C)<br/>Accessors (A)"]
        L["Listener"]
        on --> pca --> L
    end

    subgraph routing["BusService Router"]
        exact["Exact topics<br/><i>light.kitchen</i>"]
        glob["Glob topics<br/><i>light.*</i>"]
    end

    subgraph dispatch["Dispatch"]
        match["Predicate check"]
        exec["CommandExecutor"]
        handler["App handler"]
        match --> exec --> handler
    end

    L -- "add_listener()" --> exact & glob
    exact & glob -- "event arrives" --> match

    style registration fill:#e8f0ff,stroke:#6688cc
    style routing fill:#f0f8e8,stroke:#88aa66
    style dispatch fill:#fff0e8,stroke:#cc8844
Hold "Ctrl" to enable pan & zoom

Event Dispatch Pipeline

BusService.dispatch() runs these steps on every inbound event.

A state_changed event for light.office expands into three candidate topics in specificity order:

  1. hass.event.state_changed.light.office (entity-exact)
  2. hass.event.state_changed.light.* (domain-glob)
  3. hass.event.state_changed (base topic)

Events with other event_type values expand to only the base topic.

BusService iterates the three topics in order and collects matching listeners from the Router. The router stores two separate indexes: one for exact topics, one for glob topics. A listener is collected at most once, deduplicated by listener_id.

Each collected listener runs Listener.matches(event). Predicates registered via P.* and conditions registered via C.* are evaluated here. Listeners that fail the predicate are silently skipped.

Each passing listener spawns a TaskBucket task that calls CommandExecutor.execute(). All matching listeners for a given event run in parallel.

Listener Internal Structure

Listener composes four sub-structs:

Sub-struct Holds
ListenerIdentity Ownership and telemetry fields (app key, name, topic, source location)
ListenerOptions Behavioral timing parameters (debounce, throttle, once, priority, immediate)
HandlerInvoker Handler invocation, dispatch, and rate limiting
DurationConfig Duration-hold configuration and timer lifecycle

Registration is synchronous with the database. sub.listener.db_id is a valid integer immediately when the awaited bus.on_*() call returns.

Rate Limiting

HandlerInvoker delegates to RateLimiter when debounce or throttle is set.

RateLimiter.debounced_call() cancels any pending debounce task before spawning a replacement. Each replacement captures the current event in its closure. Only the most recent event fires after the quiet window elapses. The previous task's closure is discarded entirely.

RateLimiter.throttled_call() records time.monotonic() on each call and drops the handler if fewer than throttle seconds have elapsed since the last invocation. The check-and-set is atomic under asyncio's single-threaded event loop.

Listener Behavior Options

Option Effect
debounce=N Events are buffered; the handler fires only after N seconds of quiet
throttle=N The handler fires immediately, then further calls are suppressed for N seconds
duration=N The handler fires only if the predicate still matches after N seconds
once=True The listener auto-cancels after the first successful invocation
priority=N Higher values dispatch first; StateProxy uses priority 100

Scheduler Internals

Scheduler wraps convenience methods (run_in, run_once, run_every, run_daily, run_cron, schedule) around trigger objects. All jobs enter a shared min-heap inside SchedulerService.

flowchart TD
    accTitle: Scheduler Job Pipeline
    accDescr: From convenience methods through triggers to the dispatch loop

    subgraph api["Scheduler API"]
        methods["run_*() / schedule()"]
    end

    subgraph triggers["Triggers"]
        T["Trigger<br/><i>implements TriggerProtocol</i>"]
    end

    subgraph engine["SchedulerService"]
        heap["Min-heap<br/>by next_run"]
        loop["serve() loop"]
        exec["CommandExecutor"]
        heap -- "pop due" --> loop --> exec
    end

    methods --> T
    T -- "ScheduledJob" --> heap
    exec -. "re-enqueue<br/>if recurring" .-> heap

    style api fill:#e8f0ff,stroke:#6688cc
    style triggers fill:#f0f8e8,stroke:#88aa66
    style engine fill:#fff0e8,stroke:#cc8844
Hold "Ctrl" to enable pan & zoom

Trigger Evaluation Loop

SchedulerService.serve() loops indefinitely. Each iteration:

  1. Calls _ScheduledJobQueue.pop_due_and_peek_next(now): pops all jobs whose next_run is at or before now and returns the next scheduled time.
  2. Spawns a TaskBucket task for each due job via CommandExecutor.
  3. Sleeps until the next job's next_run time, clamped between min_delay and max_delay.

When no jobs are queued, the loop sleeps for default_delay seconds. The kick() method interrupts the sleep immediately. It fires when a new job is registered with an earlier run time than the current sleep target.

Trigger-to-Job Translation

Convenience method Trigger object Behavior
run_in(fn, delay=N) After One-shot after N seconds
run_once(fn, at=T) Once One-shot at a specific time
run_every(fn, seconds=N) Every Recurring every N seconds
run_daily(fn, at="HH:MM") Daily Wall-clock daily at HH:MM
run_cron(fn, expression=E) Cron Croniter expression
schedule(fn, trigger=T) Custom T Implements TriggerProtocol

Daily uses CronTrigger internally rather than a 24-hour interval. A naive fixed interval would drift across DST transitions. CronTrigger computes next_run in the configured timezone and handles fall-back ambiguity by selecting the second (post-transition) occurrence.

Missed-Job Handling

SchedulerService does not make up missed executions. A job whose next_run passed during a shutdown or restart fires once on the next pop_due call, not multiple times for the skipped interval. Every triggers call advance_past(now) to advance next_run past the current time, so the job schedules forward from now rather than from its originally missed time.

Jitter and Job Groups

jitter=N adds a random offset drawn from [0, N] seconds at enqueue time. Jobs in the same group share a group= label. Scheduler.cancel_group(name) cancels all jobs with that label. Named jobs (name=) support deduplication: if_exists="skip" leaves the existing job in place; if_exists="replace" cancels the existing job and re-registers.

StateManager and StateProxy

StateProxy is a shared singleton maintaining an in-memory cache of all entity states. StateManager is the per-app interface over it — the self.states handle — providing typed access with Pydantic model validation. App code never touches StateProxy directly.

flowchart TD
    accTitle: State Management
    accDescr: How entity states flow from HA through the cache to typed app access

    subgraph sources["Cache Population"]
        bus_sub["Bus subscription<br/>(priority 100)"]
        poll["Periodic poll<br/>(run_every)"]
    end

    subgraph proxy["StateProxy"]
        cache["In-memory dict<br/>entity_id to HassStateDict"]
    end

    subgraph access["StateManager (per-app)"]
        attr["self.states.light<br/><i>DomainStates[LightState]</i>"]
        item["self.states[CustomState]<br/><i>DomainStates[T]</i>"]
        get["self.states.get(entity_id)<br/><i>raw lookup</i>"]
    end

    subgraph convert["Type Conversion"]
        SR["StateRegistry<br/>domain to model class"]
        TR["TypeRegistry<br/>scalar conversion"]
    end

    bus_sub --> cache
    poll --> cache
    cache --> attr & item & get
    attr & item --> SR & TR

    style sources fill:#f0f8e8,stroke:#88aa66
    style proxy fill:#fff0e8,stroke:#cc8844
    style access fill:#e8f0ff,stroke:#6688cc
    style convert fill:#f8f0ff,stroke:#8866cc
Hold "Ctrl" to enable pan & zoom

Cache Population

StateProxy declares depends_on: [WebsocketService, ApiResource, BusService, SchedulerService]. Once all four dependencies are ready, on_initialize() runs two setup steps.

First, subscribe_to_events() registers a bus subscription on Topic.HASS_EVENT_STATE_CHANGED at priority 100. Priority 100 means StateProxy's handler updates the cache before any user handler sees the event. App handlers always observe current state.

Second, _load_cache() bulk-fetches all entity states via get_states_raw() and populates the states dict. A periodic run_every job re-runs _load_cache() at state_proxy_poll_interval_seconds intervals to recover from any missed events.

Lock-Free Reads

StateProxy.get_state() reads from self.states without acquiring a lock. CPython dict reads are safe without locking because dict assignment replaces whole objects atomically. Writers use a FairAsyncRLock when updating the dict to prevent concurrent write corruption. Readers never contend with each other.

Type Conversion and context_id Caching

DomainStates wraps a StateProxy and a model class. On each entity access, DomainStates._validate_or_return_from_cache() extracts the context_id from the raw state dict (a UUID from Home Assistant's event context). If the context_id matches the cached CacheValue, the previously validated Pydantic model is returned without re-running validation. A new context_id triggers a full validation pass and replaces the cached entry.

StateManager.__getattr__ caches DomainStates instances by model class in _domain_states_cache. Accessing self.states.light multiple times returns the same DomainStates object.

Disconnect and Reconnect

On WebSocket disconnect, StateProxy clears self.states and calls mark_not_ready(). State reads during this window raise ResourceNotReadyError. On reconnect, _load_cache() bulk-reloads all states, then subscribe_to_events() re-registers the bus subscription. mark_ready() then unblocks any waiters.

Api Internals

The per-app Api handle delegates all network I/O to two shared singletons: ApiResource (REST) and WebsocketService (WebSocket).

flowchart TD
    accTitle: Api Transport
    accDescr: How per-app Api delegates to shared REST and WebSocket transports

    subgraph app["Per-App"]
        Api
    end

    subgraph transport["Shared Singletons"]
        AR["ApiResource<br/>(aiohttp)"]
        WS["WebsocketService"]
    end

    subgraph ha["Home Assistant"]
        REST["REST API"]
        WSAPI["WebSocket API"]
    end

    Api -- "get_state(id)" --> AR
    Api -- "call_service()<br/>get_states()" --> WS
    AR -- "HTTP" --> REST
    WS -- "WS frame" --> WSAPI

    style app fill:#e8f0ff,stroke:#6688cc
    style transport fill:#fff0e8,stroke:#cc8844
    style ha fill:#f0f0f0,stroke:#999
Hold "Ctrl" to enable pan & zoom

Transport Routing

Method Transport Pattern
get_state(entity_id) REST GET /api/states/{id}
get_state_raw(entity_id) REST GET /api/states/{id}
get_states() WebSocket get_states command
call_service() WebSocket call_service command
fire_event() WebSocket fire_event command
ws_send_and_wait() WebSocket Raw message, blocks for result
ws_send_json() WebSocket Raw message, fire-and-forget
rest_request() REST Raw aiohttp request

Api.rest_request() and Api.ws_send_and_wait() are escape hatches for HA API surface not covered by the typed methods.

Authentication

HassetteConfig.token holds a long-lived access token. ApiResource injects it as a Bearer header on every REST request. The WebSocket auth handshake sends an auth frame immediately after connection. Auth failures raise InvalidAuthError, a FatalError subclass. The system shuts down immediately rather than retrying.

Connection Management

ApiResource holds a single aiohttp.ClientSession. WebsocketService manages the WebSocket connection with tenacity retry logic (default 5 attempts with exponential jitter, configurable via connect_retry_max_attempts). On reconnect, StateProxy bulk-reloads state and re-registers its subscription. Per-app Api instances share the same underlying connections. There is no per-app connection pool.

Database Internals

DatabaseService stores all telemetry in a local SQLite file. Schema management uses SQLite's native PRAGMA user_version with numbered .sql migration files.

Schema

The database has five tables:

Table Purpose
sessions One row per Hassette process run; tracks start/stop time and error info
listeners One row per registered bus listener; natural key (app_key, instance_index, name, topic)
scheduled_jobs One row per registered scheduler job; natural key (app_key, instance_index, job_name)
executions One row per handler invocation or job execution; unified with kind discriminator
log_records Captured log lines with execution_id linkage

executions stores one row per handler invocation or job execution. The kind column holds 'handler' or 'job'. listener_id and job_id are nullable foreign keys into listeners and scheduled_jobs respectively. A CHECK constraint enforces that exactly one is non-null per row: CHECK ((listener_id IS NOT NULL) + (job_id IS NOT NULL) = 1).

Six views (active_listeners, active_app_listeners, active_framework_listeners, and their scheduled-job equivalents) pre-filter retired registrations.

Migration System

The migration runner reads PRAGMA user_version from the on-disk database and applies each numbered .sql file in order. Every migration runs inside BEGIN IMMEDIATE / COMMIT, with PRAGMA user_version = N as the final statement. A crash mid-migration leaves the version at N-1; the next startup retries from that point.

On a fresh database (user_version = 0), the runner sets auto_vacuum = INCREMENTAL via a separate connection before any transaction. PRAGMA auto_vacuum cannot be changed inside BEGIN IMMEDIATE, so it must precede the first transaction.

DatabaseService._handle_schema_version() runs before migrations:

On-disk version Code action
Matches expected head No action
Older than expected head Log warning, delete database, allow migrations to recreate
0 on existing file Treat as unversioned legacy schema, delete and recreate
Newer than expected head Raise SchemaVersionError; manual intervention required

SchemaVersionError is declared in DatabaseService.restart_spec.fatal_error_names, so a version-ahead database stops the process immediately rather than retrying.

Write Pipeline

DatabaseService serializes all writes through an asyncio.Queue drained by a single background _db_write_worker() task. Callers submit a coroutine to DatabaseService.submit() or place a raw item via enqueue(). The worker processes items one at a time. Each item is a (coroutine, future) pair; when a future is present, the result or exception is delivered through it.

A dedicated read connection (_read_db) runs with PRAGMA query_only = ON and a 5-second busy timeout. Read queries never contend with the write worker.

Synchronous Registration

BusService and SchedulerService declare depends_on: [DatabaseService]. The database is ready before any listener or job registration runs. Each bus.on_*() call awaits the DatabaseService.submit() call inline, so sub.listener.db_id is a valid integer when the awaited registration returns. Scheduler methods behave identically.

Retention

A background loop in DatabaseService.serve() runs retention cleanup every _RETENTION_INTERVAL_SECONDS seconds. _RETENTION_TABLES declares each managed table with its retention column. Each entry carries a retention_days_getter lambda that reads the configured value from HassetteConfig. A separate size-failsafe loop runs on startup and periodically. When the database exceeds a configured size threshold, it deletes old rows in batches and runs incremental vacuum.

Web/UI Layer

WebApiService starts a uvicorn/FastAPI server. Two data source services provide live state and historical telemetry to the frontend.

flowchart TD
    accTitle: Web Layer
    accDescr: How the frontend connects to backend data sources

    subgraph browser["Browser"]
        SPA["Preact SPA"]
    end

    subgraph server["WebApiService"]
        rest["REST endpoints<br/>/api/health, /api/apps,<br/>/api/telemetry/*, ..."]
        ws["/api/ws<br/>WebSocket"]
        static["Static files<br/>SPA catch-all"]
    end

    subgraph data["Data Sources"]
        RQS["RuntimeQueryService<br/><i>live state, event buffer,<br/>WS broadcast</i>"]
        TQS["TelemetryQueryService<br/><i>SQLite: listeners, jobs,<br/>errors, sessions</i>"]
    end

    SPA -- "fetch" --> rest
    SPA <-- "push events" --> ws
    rest --> RQS & TQS
    ws --> RQS

    style browser fill:#e8f0ff,stroke:#6688cc
    style server fill:#fff0e8,stroke:#cc8844
    style data fill:#f0f8e8,stroke:#88aa66
Hold "Ctrl" to enable pan & zoom

WebApiService

WebApiService.serve() calls create_fastapi_app() and passes the result to uvicorn.Server. The uvicorn instance uses ws="websockets-sansio" and lifespan="off". On CancelledError during shutdown, the service calls asyncio.shield(server.shutdown()) to give uvicorn a graceful exit window before propagating cancellation.

When config.web_api.run is False, serve() blocks on shutdown_event.wait() without binding a port. The dependency graph remains intact, and services that depend on WebApiService being ready still start normally.

RuntimeQueryService

RuntimeQueryService subscribes to bus events on initialization and maintains a bounded in-memory event buffer. On each WebSocket-push-worthy event (state changes, app status changes, execution completions), buffer_and_broadcast() appends to the buffer and fans out to all registered WebSocket clients.

Each connected client gets its own asyncio.Queue of bounded size (_WS_CLIENT_QUEUE_MAX). A slow client that exhausts its queue causes its frames to be dropped with a rate-limited log line. Clients register via register_ws_client() and deregister via unregister_ws_client().

TelemetryQueryService

TelemetryQueryService serves all historical data: listener registrations, job registrations, execution records, log lines, and session history. Queries run against DatabaseService.read_db (the dedicated read connection) to avoid contending with the write worker.

SPA Routing

create_fastapi_app() mounts /assets and /fonts via StaticFiles for the built SPA output. A spa_catch_all handler covers all remaining paths: it serves root-level static files directly, returns 404 for API paths and filenames matching _STATIC_EXTENSIONS, and returns index.html for everything else. This enables client-side routing inside the Preact SPA.