Background Tasks¶
Aksara includes a built-in PostgreSQL-backed task queue for work that should run outside the request path without adding Redis, Celery, or another service.
Overview¶
The built-in worker stores queued jobs in aksara_tasks and processes them from
the application lifespan. All state lives in your existing PostgreSQL database —
no extra infrastructure required.
Use it for:
- sending email after a write succeeds
- generating exports or reports
- calling third-party APIs without blocking the response
- lightweight asynchronous housekeeping
- recurring background jobs (heartbeats, digest emails, cleanup sweeps)
Defining a Task¶
Register a task with @task.
from aksara import task
@task
async def send_welcome_email(user_id: str) -> dict[str, str]:
return {"status": "sent", "user_id": user_id}
The decorator keeps the function callable in normal Python code and also adds
an async .enqueue() helper.
Named Tasks¶
Override the auto-derived name (defaults to module.qualname):
Named Queues¶
Assign a task to a specific queue to isolate different workloads:
@task(queue="emails")
async def send_welcome_email(user_id: str) -> dict:
...
@task(queue="reports")
async def generate_monthly_report(period: str) -> dict:
...
Workers can be bound to specific queues — see Worker Configuration.
Recurring Tasks¶
Register a task with every= to have the worker schedule it automatically:
from datetime import timedelta
from aksara import task
@task(every=timedelta(minutes=5))
async def send_digest_emails() -> dict:
# runs every 5 minutes
...
@task(every=300) # int/float seconds also accepted
async def heartbeat_ping() -> dict:
...
Recurring tasks are tracked in aksara_cron_state. The worker uses an atomic
database claim so that multiple running instances never double-enqueue the same
tick — only one instance wins per interval.
Enqueueing Work¶
Queue a task with .enqueue() or the top-level enqueue_task() helper.
user = await User.objects.create(email="ada@example.com")
queued = await send_welcome_email.enqueue(str(user.id))
assert queued.status == "pending"
Delayed Execution¶
Per-Enqueue Overrides¶
await send_welcome_email.enqueue(
str(user.id),
delay_seconds=30,
max_attempts=5,
queue="priority", # override the queue at enqueue time
)
Worker Configuration¶
Automatic (Default)¶
When tasks_enabled=True, Aksara(...) starts a TaskWorker during app
startup and stops it gracefully during shutdown.
from aksara.conf import Settings, configure
configure(Settings(
database_url="postgresql://postgres:postgres@localhost/myapp",
tasks_enabled=True,
task_poll_interval_seconds=1.0,
task_max_attempts=3,
))
Manual Worker¶
Run the worker directly for full control:
from aksara.tasks import TaskWorker
from aksara.db import Database
db = Database("postgresql://localhost/myapp")
await db.connect()
worker = TaskWorker(
db,
poll_interval=0.25,
retry_delay_seconds=5.0,
concurrency=4,
)
await worker.start()
Queue Binding¶
Bind a worker to one or more named queues. A worker with queues=None
(the default) processes all queues.
# Dedicated email worker
email_worker = TaskWorker(db, queues=["emails"])
# Handle multiple queues on one worker
mixed_worker = TaskWorker(db, queues=["emails", "reports"])
# Process all queues (default)
general_worker = TaskWorker(db, queues=None)
Concurrency¶
By default a worker processes one task at a time. Increase concurrency to
process multiple tasks simultaneously within a single worker process:
Or set globally:
Each slot runs as an independent asyncio task. The worker drains all in-flight tasks before shutting down.
Retry Semantics¶
Retry Budget¶
Each task row tracks attempts and max_attempts. On failure the task is
rescheduled until attempts == max_attempts, after which it is marked failed.
Exponential Backoff¶
Retry delays grow exponentially by default:
| Attempt | Default delay (base=2.0, delay=5s) |
|---|---|
| 1 | 5 s |
| 2 | 10 s |
| 3 | 20 s |
| 4 | 40 s |
The delay is capped at retry_max_delay_seconds (default: 3600 s / 1 hour).
worker = TaskWorker(
db,
retry_delay_seconds=5.0,
retry_backoff_base=2.0,
retry_max_delay_seconds=3600.0,
)
Set retry_backoff_base=1.0 to restore a flat constant delay:
Stale Lock Recovery¶
If a worker process crashes after claiming a task but before completing it, the
row stays status='running' indefinitely. Aksara detects this automatically:
Every lock_recovery_interval_seconds (default: 60 s) the worker queries for
tasks whose locked_at is older than stale_lock_timeout_seconds (default:
300 s / 5 min) and resets them to pending so another worker can retry them.
worker = TaskWorker(
db,
stale_lock_timeout_seconds=300.0, # age before "stuck"
lock_recovery_interval_seconds=60.0, # how often to sweep
)
You can also trigger recovery manually:
Completed Task Cleanup¶
Task rows accumulate forever unless you configure a TTL. Set
result_ttl_seconds to have the worker automatically purge old completed
rows:
worker = TaskWorker(
db,
result_ttl_seconds=7 * 86400, # keep completed rows for 7 days
cleanup_interval_seconds=3600, # run cleanup hourly
)
Purge manually or with a custom status filter:
# Purge completed rows older than 7 days
count = await worker.purge_old_tasks(older_than_seconds=7 * 86400)
# Also purge failed rows older than 30 days
count = await worker.purge_old_tasks(
statuses=("completed", "failed"),
older_than_seconds=30 * 86400,
)
Inspecting Task State¶
from aksara import get_task_record
record = await get_task_record(queued.id)
assert record is not None
assert record.status in {"pending", "running", "completed", "failed"}
# Available fields
record.id
record.task_name
record.queue
record.status
record.attempts
record.max_attempts
record.last_error # last failure message
record.result # return value (when completed)
record.available_at # when the task becomes eligible
record.locked_at # when the worker claimed it
record.completed_at
record.created_at
record.updated_at
CLI — Task Queue Management¶
Aksara ships an aksara tasks command group for operational visibility and
dead-letter management.
Stats¶
Shows row counts grouped by status and queue:
Queue: default
pending 3
running 1
completed 412
failed 2
Total
pending 3
running 1
completed 412
failed 2
List Tasks¶
aksara tasks list # failed tasks (default)
aksara tasks list --status pending
aksara tasks list --status failed --queue emails --limit 50
aksara tasks list --task-name send_welcome # substring match on task_name
Re-enqueue Failed Tasks¶
Re-enqueue resets attempts, locked_at, last_error, and available_at
so the task is picked up as fresh:
aksara tasks reenqueue <uuid> # one task by ID
aksara tasks reenqueue --all # all failed tasks
aksara tasks reenqueue --all --queue emails
aksara tasks reenqueue --all --yes # skip confirmation
Purge Old Records¶
aksara tasks purge # completed, >7 days
aksara tasks purge --status failed --older-than-days 30
aksara tasks purge --status completed --status failed -d 1 --yes
Payload Serialization¶
Task arguments and return values are stored as JSONB. The following Python types are normalized automatically:
| Type | Encoding |
|---|---|
dict, list, str, int, float, bool |
Native JSON |
UUID |
str(uuid) |
datetime |
ISO 8601 string |
set |
Sorted list |
| Pydantic model | .model_dump(mode="json") |
| dataclass | dataclasses.asdict() |
Object with .to_dict() |
.to_dict() |
Tasks should be idempotent when possible, and payloads should be kept small.
Operational Notes¶
aksara_tasksandaksara_cron_stateare created lazily on first use via idempotentCREATE TABLE IF NOT EXISTS.- The queue column is added to existing
aksara_taskstables viaALTER TABLE … ADD COLUMN IF NOT EXISTS— safe to run against a live table. - Workers claim jobs with
FOR UPDATE SKIP LOCKED, so multiple app instances share the same queue safely. - The built-in queue is intentionally lightweight. For very high throughput (thousands of tasks/second) or cross-language consumers, consider a dedicated external queue like RabbitMQ or Kafka.
Settings Quick Reference¶
| Setting | Default | Description |
|---|---|---|
tasks_enabled |
True |
Start worker automatically |
task_poll_interval_seconds |
1.0 |
Idle polling frequency |
task_max_attempts |
3 |
Global retry budget |
task_retry_delay_seconds |
5.0 |
Base retry delay |
task_retry_backoff_base |
2.0 |
Exponential backoff multiplier |
task_retry_max_delay_seconds |
3600.0 |
Retry delay ceiling |
task_concurrency |
1 |
Parallel slots per worker |
task_stale_lock_timeout_seconds |
300.0 |
Age before a running task is considered crashed |
task_lock_recovery_interval_seconds |
60.0 |
How often stale-lock sweep runs |
task_result_ttl_seconds |
None |
Age before completed rows are purged (disabled by default) |
task_cleanup_interval_seconds |
3600.0 |
How often TTL cleanup runs |
task_cron_check_interval_seconds |
30.0 |
How often recurring tasks are checked |
See Settings Reference for environment variable names.