Skip to content

Caching

Speed up your application with caching.


Overview

Aksara provides a flexible caching system:

  • Query caching — Cache database query results
  • Response caching — Cache API responses
  • Object caching — Cache individual objects
  • Function caching — Cache function return values

Configuration

Basic Setup

# settings.py
AKSARA = {
    "CACHE": {
        "default": {
            "backend": "memory",
            "ttl": 300,  # 5 minutes
        }
    }
}

Redis Backend

AKSARA = {
    "CACHE": {
        "default": {
            "backend": "redis",
            "url": "redis://localhost:6379/0",
            "ttl": 300,
        }
    }
}

Multiple Caches

AKSARA = {
    "CACHE": {
        "default": {
            "backend": "redis",
            "url": "redis://localhost:6379/0",
            "ttl": 300,
        },
        "sessions": {
            "backend": "redis",
            "url": "redis://localhost:6379/1",
            "ttl": 86400,  # 24 hours
        },
        "queries": {
            "backend": "memory",
            "max_size": 1000,
            "ttl": 60,
        }
    }
}

Function Caching

Basic Usage

from aksara.cache import cached

@cached(ttl=300)
async def get_popular_posts():
    return await Post.objects.filter(
        is_published=True
    ).order_by("-view_count").limit(10).all()

# First call: hits database
posts = await get_popular_posts()

# Second call: returns cached result
posts = await get_popular_posts()

Cache Keys

@cached(ttl=300)
async def get_user_posts(user_id: str):
    # Automatic key: "get_user_posts:user_id=abc123"
    return await Post.objects.filter(author_id=user_id).all()

# Custom key
@cached(ttl=300, key="posts:{user_id}")
async def get_user_posts(user_id: str):
    return await Post.objects.filter(author_id=user_id).all()

# Custom key function
def make_key(user_id, status=None):
    return f"user_posts:{user_id}:{status or 'all'}"

@cached(ttl=300, key_func=make_key)
async def get_user_posts(user_id: str, status: str = None):
    qs = Post.objects.filter(author_id=user_id)
    if status:
        qs = qs.filter(status=status)
    return await qs.all()

Conditional Caching

@cached(ttl=300, condition=lambda result: len(result) > 0)
async def search_posts(query: str):
    """Only cache non-empty results."""
    return await Post.objects.filter(title__contains=query).all()

Query Caching

Cache QuerySet Results

from aksara.cache import cache

# Manual caching
cache_key = "active_users"
users = await cache.get(cache_key)

if users is None:
    users = await User.objects.filter(is_active=True).all()
    await cache.set(cache_key, users, ttl=300)

# Or use get_or_set
users = await cache.get_or_set(
    "active_users",
    lambda: User.objects.filter(is_active=True).all(),
    ttl=300
)

QuerySet Cache Method

# Cache enabled queryset (if supported)
posts = await Post.objects.filter(
    is_published=True
).cache(ttl=300).all()

Object Caching

Cache Single Objects

from aksara.cache import cache

async def get_user(user_id: str) -> User:
    cache_key = f"user:{user_id}"

    user = await cache.get(cache_key)
    if user is None:
        user = await User.objects.get(id=user_id)
        await cache.set(cache_key, user, ttl=600)

    return user

Cache Mixin

from aksara.cache import CacheMixin

class User(CacheMixin, Model):
    email = fields.Email(unique=True)
    name = fields.String(max_length=100)

    class Meta:
        cache_ttl = 600

# Automatic caching
user = await User.objects.cached_get(id=user_id)

# Invalidate on save
await user.save()  # Auto-invalidates cache

Response Caching

Cache API Responses

from aksara.api import ViewSet, action
from aksara.cache import cache_response

class PostViewSet(ViewSet):
    @cache_response(ttl=60)
    async def list(self, request):
        posts = await Post.objects.filter(is_published=True).all()
        return PostSerializer(posts, many=True).data

    @cache_response(ttl=300, key="post:{pk}")
    async def retrieve(self, request, pk=None):
        post = await Post.objects.get(id=pk)
        return PostDetailSerializer(post).data

Vary Cache by User

@cache_response(ttl=60, vary_on=["user_id"])
async def list(self, request):
    user_id = request.user.id if request.user else "anonymous"
    posts = await get_posts_for_user(user_id)
    return PostSerializer(posts, many=True).data

Vary by Headers

@cache_response(ttl=300, vary_headers=["Accept-Language"])
async def list(self, request):
    # Different cache for each language
    return posts

Cache Invalidation

Manual Invalidation

from aksara.cache import cache

# Delete specific key
await cache.delete("user:abc123")

# Delete multiple keys
await cache.delete_many(["user:abc123", "user:def456"])

# Delete by pattern (Redis)
await cache.delete_pattern("user:*")

# Clear all
await cache.clear()

Signal-Based Invalidation

from aksara.signals import post_save, post_delete
from aksara.cache import cache

@post_save(Post)
async def invalidate_post_cache(sender, instance, **kwargs):
    await cache.delete(f"post:{instance.id}")
    await cache.delete("posts:all")
    await cache.delete(f"posts:author:{instance.author_id}")

@post_delete(Post)
async def invalidate_on_delete(sender, instance, **kwargs):
    await cache.delete(f"post:{instance.id}")
    await cache.delete("posts:all")

Decorator-Based Invalidation

from aksara.cache import invalidates

@invalidates("posts:all", "post:{post_id}")
async def update_post(post_id: str, data: dict):
    post = await Post.objects.get(id=post_id)
    for key, value in data.items():
        setattr(post, key, value)
    await post.save()
    return post

Cache Backends

Memory Backend

AKSARA = {
    "CACHE": {
        "default": {
            "backend": "memory",
            "max_size": 1000,  # Max entries
            "ttl": 300,
        }
    }
}

Memory Cache Limitations

Memory cache is not shared between processes. Use Redis in production.

Redis Backend

AKSARA = {
    "CACHE": {
        "default": {
            "backend": "redis",
            "url": "redis://localhost:6379/0",
            "ttl": 300,
            "key_prefix": "myapp:",
        }
    }
}

Custom Backend

from aksara.cache import CacheBackend

class MyBackend(CacheBackend):
    async def get(self, key: str):
        # Implementation
        pass

    async def set(self, key: str, value, ttl: int = None):
        # Implementation
        pass

    async def delete(self, key: str):
        # Implementation
        pass

    async def clear(self):
        # Implementation
        pass

# Register
AKSARA = {
    "CACHE": {
        "default": {
            "backend": "myapp.cache.MyBackend",
            "custom_option": "value",
        }
    }
}

Cache Utilities

Cache Object

from aksara.cache import cache, get_cache

# Default cache
await cache.set("key", "value")
value = await cache.get("key")

# Named cache
query_cache = get_cache("queries")
await query_cache.set("key", "value")

Cache Methods

# Get with default
value = await cache.get("key", default="not_found")

# Check existence
exists = await cache.exists("key")

# Get remaining TTL
ttl = await cache.ttl("key")

# Increment/decrement (Redis)
await cache.incr("counter")
await cache.decr("counter")

# Set if not exists
was_set = await cache.set_nx("lock:task", "1", ttl=60)

Lock

from aksara.cache import cache

# Distributed lock
async with cache.lock("process:task", timeout=30):
    # Only one process can execute this
    await expensive_operation()

Caching Patterns

Cache-Aside

async def get_user(user_id: str) -> User:
    # 1. Try cache
    user = await cache.get(f"user:{user_id}")

    if user is None:
        # 2. Load from database
        user = await User.objects.get(id=user_id)

        # 3. Store in cache
        await cache.set(f"user:{user_id}", user, ttl=600)

    return user

Write-Through

async def update_user(user_id: str, data: dict) -> User:
    # Update database
    user = await User.objects.get(id=user_id)
    for key, value in data.items():
        setattr(user, key, value)
    await user.save()

    # Update cache
    await cache.set(f"user:{user_id}", user, ttl=600)

    return user

Cache Stampede Prevention

from aksara.cache import cache

async def get_expensive_data():
    cache_key = "expensive_data"

    # Use lock to prevent multiple simultaneous cache rebuilds
    async with cache.lock(f"lock:{cache_key}", timeout=30):
        data = await cache.get(cache_key)

        if data is None:
            data = await compute_expensive_data()
            await cache.set(cache_key, data, ttl=300)

    return data

Best Practices

Do

  • Use meaningful cache keys
  • Set appropriate TTLs
  • Invalidate on data changes
  • Monitor cache hit rates
  • Use Redis in production

Don't

  • Don't cache user-specific data without variation
  • Don't cache sensitive data without encryption
  • Don't cache with infinite TTL
  • Don't ignore cache invalidation

Key Naming

# Good: hierarchical, descriptive
"user:abc123:profile"
"posts:published:page:1"
"org:xyz:members:count"

# Bad: flat, unclear
"data1"
"cache_key"

Monitoring

Cache Statistics

from aksara.cache import cache

stats = await cache.stats()
print(f"Hits: {stats['hits']}")
print(f"Misses: {stats['misses']}")
print(f"Hit ratio: {stats['hit_ratio']:.2%}")

Debug Caching

import logging

logging.getLogger("aksara.cache").setLevel(logging.DEBUG)

# Logs:
# DEBUG - Cache HIT: user:abc123
# DEBUG - Cache MISS: posts:all
# DEBUG - Cache SET: posts:all (ttl=300)