Django Expert Examples

Externalized from the agent definition per the few-shot-examples rule (#1587).

Django Expert — Worked Examples

Externalized from the agent definition per the few-shot-examples rule (#1587).

Your Process — Sample Implementations

1. Codebase Audit

# Identify slow views via Django Debug Toolbar queries
# Check for common anti-patterns
grep -r "\.objects\.all()" apps/ --include="*.py"
grep -r "select_related\|prefetch_related" apps/ --include="*.py"

# Run Django system checks
python manage.py check --deploy

# Find large migration files that may indicate schema issues
find . -path "*/migrations/*.py" -size +20k | sort -rh | head -10

2. ORM Optimization

# PROBLEM: N+1 queries — one query per related object
def get_orders(request):
    orders = Order.objects.filter(user=request.user)
    for order in orders:
        # Each access hits the DB again
        print(order.customer.name)      # N queries for customer
        print(order.items.count())      # N queries for item counts
    return orders

# SOLUTION: select_related for FK/OneToOne, prefetch_related for M2M/reverse FK
def get_orders(request):
    orders = (
        Order.objects.filter(user=request.user)
        .select_related("customer")              # JOIN — single query
        .prefetch_related(
            Prefetch(
                "items",
                queryset=OrderItem.objects.select_related("product"),
            )
        )
        .annotate(item_count=Count("items"))     # DB-level aggregation
        .order_by("-created_at")
    )
    return orders

# Custom manager encapsulates query logic
class OrderManager(models.Manager):
    def for_user(self, user):
        return (
            self.get_queryset()
            .filter(user=user)
            .select_related("customer")
            .prefetch_related("items__product")
            .annotate(item_count=Count("items"))
        )

    def pending(self):
        return self.get_queryset().filter(status=Order.Status.PENDING)

class Order(models.Model):
    class Status(models.TextChoices):
        PENDING = "pending", "Pending"
        CONFIRMED = "confirmed", "Confirmed"
        SHIPPED = "shipped", "Shipped"

    objects = OrderManager()

3. DRF API Design

# Serializer with nested writes and validation
from rest_framework import serializers
from rest_framework.validators import UniqueTogetherValidator

class OrderItemSerializer(serializers.ModelSerializer):
    product_name = serializers.CharField(source="product.name", read_only=True)
    subtotal = serializers.SerializerMethodField()

    class Meta:
        model = OrderItem
        fields = ["id", "product", "product_name", "quantity", "unit_price", "subtotal"]
        read_only_fields = ["unit_price"]

    def get_subtotal(self, obj) -> str:
        return str(obj.quantity * obj.unit_price)


class OrderSerializer(serializers.ModelSerializer):
    items = OrderItemSerializer(many=True)
    status_display = serializers.CharField(source="get_status_display", read_only=True)

    class Meta:
        model = Order
        fields = ["id", "customer", "status", "status_display", "items", "created_at"]
        read_only_fields = ["created_at"]

    def validate(self, data):
        # Cross-field validation
        if data.get("status") == Order.Status.SHIPPED and not data.get("tracking_number"):
            raise serializers.ValidationError(
                {"tracking_number": "Tracking number required when shipping."}
            )
        return data

    def create(self, validated_data):
        items_data = validated_data.pop("items")
        order = Order.objects.create(**validated_data)
        OrderItem.objects.bulk_create(
            [OrderItem(order=order, **item) for item in items_data]
        )
        return order


# ViewSet with queryset optimization and action methods
from rest_framework import viewsets, permissions, status
from rest_framework.decorators import action
from rest_framework.response import Response

class OrderViewSet(viewsets.ModelViewSet):
    serializer_class = OrderSerializer
    permission_classes = [permissions.IsAuthenticated]

    def get_queryset(self):
        return Order.objects.for_user(self.request.user)

    def perform_create(self, serializer):
        serializer.save(user=self.request.user)

    @action(detail=True, methods=["post"])
    def confirm(self, request, pk=None):
        order = self.get_object()
        if order.status != Order.Status.PENDING:
            return Response(
                {"detail": "Only pending orders can be confirmed."},
                status=status.HTTP_400_BAD_REQUEST,
            )
        order.status = Order.Status.CONFIRMED
        order.save(update_fields=["status"])
        send_confirmation_email.delay(order.id)  # Celery task
        return Response(OrderSerializer(order).data)

4. Middleware Patterns

# Custom middleware: timing and request ID injection
import time
import uuid
import logging
from django.utils.deprecation import MiddlewareMixin

logger = logging.getLogger("django.request")

class RequestTimingMiddleware(MiddlewareMixin):
    """Adds X-Request-ID header and logs request duration."""

    def process_request(self, request):
        request.request_id = str(uuid.uuid4())
        request._start_time = time.monotonic()

    def process_response(self, request, response):
        duration_ms = (time.monotonic() - request._start_time) * 1000
        response["X-Request-ID"] = getattr(request, "request_id", "")
        response["X-Response-Time"] = f"{duration_ms:.2f}ms"

        if duration_ms > 500:
            logger.warning(
                "Slow request: %s %s took %.2fms (request_id=%s)",
                request.method,
                request.path,
                duration_ms,
                request.request_id,
            )
        return response


# Middleware for current user context (signals, models)
import threading

_thread_local = threading.local()

class CurrentUserMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        _thread_local.user = getattr(request, "user", None)
        response = self.get_response(request)
        _thread_local.user = None
        return response

def get_current_user():
    return getattr(_thread_local, "user", None)

5. Celery Integration

# tasks.py — idempotent tasks with retry strategy
from celery import shared_task
from celery.utils.log import get_task_logger
from django.db import transaction

logger = get_task_logger(__name__)

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,  # 1 minute
    acks_late=True,           # Acknowledge only after successful completion
    reject_on_worker_lost=True,
)
def send_confirmation_email(self, order_id: int) -> dict:
    """Send order confirmation email. Idempotent — safe to retry."""
    try:
        order = Order.objects.select_related("customer").get(pk=order_id)

        if order.confirmation_sent_at:
            logger.info("Confirmation already sent for order %d, skipping.", order_id)
            return {"status": "skipped", "order_id": order_id}

        with transaction.atomic():
            # Mark sent atomically before sending to prevent duplicate sends
            rows = Order.objects.filter(
                pk=order_id, confirmation_sent_at__isnull=True
            ).update(confirmation_sent_at=now())

            if rows == 0:
                return {"status": "skipped", "order_id": order_id}

            _send_email(order)  # Actual send after DB lock

        return {"status": "sent", "order_id": order_id}

    except Order.DoesNotExist:
        logger.error("Order %d not found, not retrying.", order_id)
        return {"status": "not_found", "order_id": order_id}

    except Exception as exc:
        logger.warning("Email send failed for order %d: %s", order_id, exc)
        raise self.retry(exc=exc)


# celery.py — app configuration
from celery import Celery
from celery.schedules import crontab

app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

app.conf.beat_schedule = {
    "cleanup-expired-sessions": {
        "task": "apps.accounts.tasks.cleanup_expired_sessions",
        "schedule": crontab(hour=2, minute=0),  # 2am daily
    },
    "send-daily-digest": {
        "task": "apps.notifications.tasks.send_daily_digest",
        "schedule": crontab(hour=8, minute=0, day_of_week="mon-fri"),
    },
}

6. Settings Management

# settings/base.py — environment-driven configuration
from pathlib import Path
import environ

env = environ.Env(
    DEBUG=(bool, False),
    ALLOWED_HOSTS=(list, []),
)

BASE_DIR = Path(__file__).resolve().parent.parent
environ.Env.read_env(BASE_DIR / ".env")

SECRET_KEY = env("SECRET_KEY")
DEBUG = env("DEBUG")
ALLOWED_HOSTS = env("ALLOWED_HOSTS")

DATABASES = {
    "default": env.db("DATABASE_URL")  # postgres://user:pass@host/db
}

CACHES = {
    "default": env.cache("REDIS_URL")  # redis://localhost:6379/0
}

CELERY_BROKER_URL = env("REDIS_URL")
CELERY_RESULT_BACKEND = env("REDIS_URL")

# Security hardening for production
SECURE_HSTS_SECONDS = env.int("SECURE_HSTS_SECONDS", default=0)
SECURE_SSL_REDIRECT = env.bool("SECURE_SSL_REDIRECT", default=False)
SESSION_COOKIE_SECURE = env.bool("SESSION_COOKIE_SECURE", default=False)
CSRF_COOKIE_SECURE = env.bool("CSRF_COOKIE_SECURE", default=False)

Few-Shot Examples

Example 1: N+1 Query Detection and Fix

Input: "Our `/api/orders/` endpoint is slow — 300ms for 25 orders"

Diagnosis with Django Debug Toolbar:

GET /api/orders/
51 queries | 287ms

Root cause:

# views.py — missing prefetch
class OrderViewSet(viewsets.ModelViewSet):
    def get_queryset(self):
        return Order.objects.filter(user=self.request.user)
        # Missing: each serializer access to order.customer and order.items
        # triggers separate DB hits → 1 + 25 + 25 = 51 queries

Fix:

class OrderViewSet(viewsets.ModelViewSet):
    def get_queryset(self):
        return (
            Order.objects.filter(user=self.request.user)
            .select_related("customer")
            .prefetch_related(
                Prefetch("items", queryset=OrderItem.objects.select_related("product"))
            )
            .annotate(item_count=Count("items"))
        )

Result: 51 queries → 3 queries. Response time: 287ms → 18ms.


Example 2: DRF Serializer for Nested Create

Input: "We need to create an order with multiple line items in one API call"

# serializers.py
class CreateOrderSerializer(serializers.ModelSerializer):
    items = OrderItemSerializer(many=True, min_length=1)

    class Meta:
        model = Order
        fields = ["customer", "shipping_address", "items"]

    def validate_items(self, items):
        # Validate product availability
        product_ids = [item["product"].id for item in items]
        unavailable = Product.objects.filter(
            id__in=product_ids, available=False
        ).values_list("id", flat=True)
        if unavailable:
            raise serializers.ValidationError(
                f"Products unavailable: {list(unavailable)}"
            )
        return items

    @transaction.atomic
    def create(self, validated_data):
        items_data = validated_data.pop("items")
        order = Order.objects.create(
            user=self.context["request"].user, **validated_data
        )
        # Bulk create for efficiency
        OrderItem.objects.bulk_create([
            OrderItem(
                order=order,
                product=item["product"],
                quantity=item["quantity"],
                unit_price=item["product"].price,  # Capture price at time of order
            )
            for item in items_data
        ])
        return order

Example 3: Zero-Downtime Migration for Adding a Non-Nullable Column

Input: "We need to add a required `sku` field to Product — how do we migrate without downtime?"

# Step 1: Add nullable column (deploy 1 — backward compatible)
# migrations/0042_product_add_sku_nullable.py
class Migration(migrations.Migration):
    def up(self, schema_editor):
        schema_editor.add_column(
            "products_product",
            models.CharField(max_length=64, null=True, blank=True),
        )

# Step 2: Backfill data (data migration — run separately or in step 1)
# migrations/0043_product_backfill_sku.py
def backfill_sku(apps, schema_editor):
    Product = apps.get_model("products", "Product")
    products = Product.objects.filter(sku__isnull=True)
    for batch_start in range(0, products.count(), 1000):
        batch = products[batch_start : batch_start + 1000]
        for product in batch:
            product.sku = generate_sku(product.name, product.id)
        Product.objects.bulk_update(batch, ["sku"])

# Step 3: Add NOT NULL constraint (deploy 2 — after all rows backfilled)
# migrations/0044_product_sku_not_null.py
class Migration(migrations.Migration):
    def up(self, schema_editor):
        schema_editor.execute(
            "ALTER TABLE products_product ALTER COLUMN sku SET NOT NULL"
        )