Task Bucket
TaskBucket
Bases: Resource
Track and clean up a set of tasks for a service/app.
Source code in src/hassette/task_bucket/task_bucket.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 | |
config_cancel_timeout: int | float
property
Return the task cancellation timeout from the config.
config_log_level: LOG_LEVEL_TYPE
property
Return the log level from the config.
add(task: asyncio.Task[Any]) -> None
Add a task to the bucket and attach exception logging.
Source code in src/hassette/task_bucket/task_bucket.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | |
install_exception_recorder(recorder: ExceptionRecorderT) -> None
Install a callback that is called for each non-CancelledError task exception.
Called from the task's done callback, after the error is logged. The recorder receives the completed task and the exception.
Intended for test infrastructure (e.g., AppTestHarness drain) that needs to
collect task exceptions regardless of whether the task completed during a
asyncio.wait call or between iterations.
Multiple recorders may be installed; all are called in installation order (FIFO) when an exception occurs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
recorder
|
ExceptionRecorderT
|
Callable |
required |
Source code in src/hassette/task_bucket/task_bucket.py
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | |
uninstall_exception_recorder(recorder: ExceptionRecorderT) -> None
Remove a previously installed exception recorder.
Safe to call even if the recorder was never installed — it is a no-op in that case. Removes the first occurrence; assumes each installed recorder is a distinct callable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
recorder
|
ExceptionRecorderT
|
The recorder callable to remove. |
required |
Source code in src/hassette/task_bucket/task_bucket.py
106 107 108 109 110 111 112 113 114 115 116 117 | |
spawn(coro: CoroLikeT[T], *, name: str | None = None) -> asyncio.Task[T]
Convenience: create and track a new task.
Source code in src/hassette/task_bucket/task_bucket.py
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | |
run_in_thread(fn: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> CoroutineType[Any, Any, R]
Run a synchronous function in a separate thread.
This is a thin wrapper around asyncio.to_thread, but ensures that the current TaskBucket context
is preserved in the new thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[P, R]
|
The synchronous function to run. |
required |
*args
|
args
|
Positional arguments to pass to the function. |
()
|
**kwargs
|
kwargs
|
Keyword arguments to pass to the function. |
{}
|
Returns:
| Type | Description |
|---|---|
CoroutineType[Any, Any, R]
|
A coroutine that resolves to the return value of fn. |
Source code in src/hassette/task_bucket/task_bucket.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | |
post_to_loop(fn: Callable[..., Any], *args: Any, **kwargs: Any) -> None
Schedule a callable on the event loop from any thread.
Source code in src/hassette/task_bucket/task_bucket.py
178 179 180 | |
make_async_adapter(fn: Callable[P, R] | Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]
make_async_adapter(
fn: Callable[P, Awaitable[R]],
) -> Callable[P, Awaitable[R]]
make_async_adapter(
fn: Callable[P, R],
) -> Callable[P, Awaitable[R]]
Normalize a callable (sync or async) into an async callable with the same signature.
- If
fnis async: await it. - If
fnis sync: run it in Hassette's thread pool executor via TaskBucket.run_in_thread.
Source code in src/hassette/task_bucket/task_bucket.py
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | |
run_sync(fn: Coroutine[Any, Any, R], timeout_seconds: int | float | None = None) -> R
Run an async function in a synchronous context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Coroutine[Any, Any, R]
|
The async function to run. |
required |
timeout_seconds
|
int | float | None
|
The timeout for the function call. None uses the config value. |
None
|
Returns:
| Type | Description |
|---|---|
R
|
The result of the function call. |
Source code in src/hassette/task_bucket/task_bucket.py
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 | |
run_on_loop_thread(fn: typing.Callable[..., R], *args: Any, **kwargs: Any) -> R
async
Run a synchronous function on the main event loop thread.
This is useful for ensuring that loop-affine code runs in the correct context.
Source code in src/hassette/task_bucket/task_bucket.py
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | |
create_task_on_loop(coro: Coroutine[Any, Any, Any], *, name: str | None = None) -> asyncio.Task[Any]
Create a task on the main event loop thread, in this bucket's context.
Source code in src/hassette/task_bucket/task_bucket.py
266 267 268 269 | |
pending_tasks() -> list[asyncio.Task[Any]]
Return a snapshot list of non-completed tasks in this bucket.
This is the recommended public accessor for drain helpers and test
infrastructure. Returns a fresh list so callers can safely iterate after
mutations to the internal set without risking a RuntimeError.
Returns:
| Type | Description |
|---|---|
list[Task[Any]]
|
A list of tasks that are currently running (not yet done). |
Source code in src/hassette/task_bucket/task_bucket.py
271 272 273 274 275 276 277 278 279 280 281 | |
cancel_all_sync() -> None
Cancel all tracked tasks without awaiting completion (fire-and-forget).
Source code in src/hassette/task_bucket/task_bucket.py
283 284 285 286 287 288 | |
cancel_all() -> None
async
Cancel all tracked tasks, wait for them to finish, and log stragglers.
Source code in src/hassette/task_bucket/task_bucket.py
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 | |