Skip to content

Scheduler

Hassette scheduling lives on self.scheduler. All methods are async and return a ScheduledJob object for cancellation.

Coming from synchronous AppDaemon?

The mechanical rule: declare on_initialize as async def and put await in front of every scheduling call. Omitting await means the job is never scheduled — no error, just silence. Migration Concepts covers the async model.

Method Equivalents

AppDaemon Hassette Notes
self.run_in(cb, 60) await self.scheduler.run_in(cb, delay=60) Delay in seconds
self.run_once(cb, time(7, 30)) await self.scheduler.run_once(cb, at="07:30") "HH:MM" string or ZonedDateTime (from the whenever library, which ships with Hassette)
self.run_every(cb, "now", 300) await self.scheduler.run_every(cb, seconds=300) Use hours=, minutes=, or seconds=
self.run_minutely(cb) await self.scheduler.run_minutely(cb) Every 1 minute
self.run_hourly(cb, time(0, 30)) await self.scheduler.run_hourly(cb) Every 1 hour
self.run_daily(cb, time(7, 30)) await self.scheduler.run_daily(cb, at="07:30") Wall-clock, DST-safe
self.cancel_timer(handle) job.cancel() Cancel via the returned job object
await self.scheduler.run_cron(cb, "0 7 * * *") Hassette-only; cron expression
await self.scheduler.schedule(cb, trigger) Hassette-only; custom trigger object

run_daily is now cron-backed

Hassette's run_daily fires at the specified wall-clock time every day, handling DST transitions correctly. An interval-based approach drifts by an hour across a DST boundary. The cron-backed implementation does not.

Every scheduling call returns a ScheduledJob. Call .cancel() on it to stop the job.

Callback Signatures

AppDaemon requires all schedule callbacks to match def my_callback(self, **kwargs). The kwargs dict carries any data you passed at registration, plus an internal __thread_id key.

Hassette accepts any callable, async or sync, with any parameters. To pass data to the handler, give the scheduling call args= or kwargs= — the values arrive as parameters on the handler. App[MyConfig] in the example pairs the app with its config class; self.app_config replaces AppDaemon's self.args (see Configuration):

from hassette import App, AppConfig


class MyConfig(AppConfig):
    color_name: str = "red"


class NightLight(App[MyConfig]):
    # function which will be called at startup and reload
    async def on_initialize(self):
        # Schedule a daily callback that will call run_daily_callback() at 7pm every night
        job = await self.scheduler.run_daily(self.run_daily_callback, at="19:00")
        self.logger.info("Scheduled job: %r", job)

        # 2025-10-13 19:57:02.670 INFO hassette.NightLight.0.on_initialize:11 - Scheduled job: ScheduledJob(name='run_daily_callback', owner=NightLight.0)

    # Our callback function will be called by the scheduler every day at 7pm
    async def run_daily_callback(self):
        # Call to Home Assistant to turn the porch light on
        await self.api.turn_on("light.office_light_1", color_name=self.app_config.color_name)

No fixed signature. No **kwargs unwrapping.

Migration Example

The complete before/after for an app that uses run_in, run_daily, and run_every:

from appdaemon.plugins.hass import Hass


class NightLight(Hass):
    # function which will be called at startup and reload
    def initialize(self):
        # Schedule a daily callback that will call run_daily_callback() at 7pm every night
        self.run_daily(self.run_daily_callback, "19:00:00")

    # Our callback function will be called by the scheduler every day at 7pm
    def run_daily_callback(self, **kwargs):
        # Call to Home Assistant to turn the porch light on
        self.turn_on("light.porch")
from hassette import App


class MySchedulerApp(App):
    async def on_initialize(self):
        await self.scheduler.run_in(self.delayed_task, delay=60)
        await self.scheduler.run_daily(self.morning_task, at="07:30")
        job = await self.scheduler.run_every(self.periodic_task, seconds=300)

    async def delayed_task(self):
        pass

    async def morning_task(self):
        pass

    async def periodic_task(self):
        pass

Key changes:

  • Call scheduling methods on self.scheduler, not directly on self
  • await every scheduling call
  • run_daily takes at="HH:MM" instead of a datetime.time object
  • run_every takes hours=, minutes=, or seconds= instead of a positional interval
  • Jobs return ScheduledJob objects; cancel with job.cancel() instead of self.cancel_timer(handle)

Blocking Work

In AppDaemon, every callback runs in its own thread, so blocking IO is safe anywhere.

In Hassette, sync callables passed to the scheduler run in a thread pool automatically. Write a plain def callback and Hassette detects it is not a coroutine. No extra configuration needed.

def periodic_sync_task(self):
    data = requests.get("http://example.com/api").json()
    ...

Async callbacks run in the event loop directly. For blocking IO inside an async def callback, offload with asyncio.to_thread() or self.task_bucket.run_in_thread()self.task_bucket is a helper on every App instance for running blocking code without stalling other apps:

async def periodic_async_task(self):
    data = await asyncio.to_thread(
        requests.get, "http://example.com/api"
    )
    ...

AppSync is for sync lifecycle hooks (on_initialize_sync, on_shutdown_sync). Sync scheduler callbacks already run in a thread pool regardless of base class — for migrating scheduling alone, App is the right choice.

Verify the Migration

Run hassette job --app <key> to confirm the jobs registered with the expected next-run times, and hassette log --app <key> to watch callbacks fire.

See Also