Python Celery Task Queues for Video Metadata Processing

Why Background Processing for a Viral Video Vault

ViralVidVault tracks viral videos across 7 European regions. The main cron pipeline needs to be fast — fetch, score, store. But secondary tasks like thumbnail quality checks, deep metadata enrichment, and stale link removal are too slow for the critical path. Celery lets us offload this work to background workers.

Celery Configuration

# celery_app.py
from celery import Celery

app = Celery(
    "viralvidvault",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    task_acks_late=True,
    worker_prefetch_multiplier=2,
    task_soft_time_limit=90,
    task_time_limit=120,
    task_routes={
        "tasks.score_virality_batch": {"queue": "scoring"},
        "tasks.validate_thumbnails": {"queue": "thumbnails"},
        "tasks.detect_dead_videos": {"queue": "cleanup"},
    },
)

Task 1: Batch Virality Re-Scoring

The cron pipeline does a quick initial score, but background workers do a deeper analysis with historical data:

# tasks.py
import sqlite3
from celery_app import app

REGIONAL_BASELINES = {
    "PL": {"velocity": 10000, "engagement": 6.0},
    "NL": {"velocity": 8000, "engagement": 5.5},
    "SE": {"velocity": 5000, "engagement": 5.0},
    "NO": {"velocity": 4000, "engagement": 5.0},
    "AT": {"velocity": 6000, "engagement": 5.0},
    "GB": {"velocity": 20000, "engagement": 4.5},
    "US": {"velocity": 50000, "engagement": 5.0},
}

@app.task(bind=True, max_retries=2)
def score_virality_batch(self, video_ids: list[str], db_path: str) -> dict:
    """Deep virality scoring with historical velocity analysis."""
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    results = {}

    for vid in video_ids:
        rows = conn.execute(
            """SELECT view_count, likes, comments, region,
                      strftime('%s', fetched_at) as ts
               FROM video_snapshots
               WHERE video_id = ?
               ORDER BY fetched_at DESC LIMIT 5""",
            (vid,),
        ).fetchall()

        if len(rows) < 2:
            results[vid] = {"score": 0, "reason": "insufficient data"}
            continue

        latest, previous = rows[0], rows[1]
        region = latest["region"]
        baseline = REGIONAL_BASELINES.get(region, REGIONAL_BASELINES["GB"])

        time_delta = int(latest["ts"]) - int(previous["ts"])
        if time_delta <= 0:
            results[vid] = {"score": 0, "reason": "no time delta"}
            continue

        view_velocity = (latest["view_count"] - previous["view_count"]) / time_delta * 3600
        engagement = (
            (latest["likes"] + latest["comments"] * 2) / max(latest["view_count"], 1)
        ) * 100

        velocity_norm = min(100, (view_velocity / baseline["velocity"]) * 100)
        engagement_norm = min(100, (engagement / baseline["engagement"]) * 100)

        score = velocity_norm * 0.55 + engagement_norm * 0.45

        # Update in database
        conn.execute(
            "UPDATE videos SET virality_score = ? WHERE video_id = ?",
            (round(score, 1), vid),
        )

        results[vid] = {
            "score": round(score, 1),
            "velocity": round(view_velocity),
            "engagement": round(engagement, 2),
            "region": region,
            "label": "VIRAL" if score >= 85 else "TRENDING" if score >= 60 else "NORMAL",
        }

    conn.commit()
    conn.close()
    return results

Task 2: Thumbnail Quality Validation

import requests
from PIL import Image
from io import BytesIO

@app.task(bind=True, max_retries=3, default_retry_delay=30)
def validate_thumbnails(self, videos: list[dict]) -> dict:
    """Check thumbnail accessibility and quality."""
    report = {"valid": 0, "broken": 0, "placeholder": 0, "details": []}

    for v in videos:
        try:
            resp = requests.get(v["thumbnail_url"], timeout=8, stream=True)
            if resp.status_code != 200:
                report["broken"] += 1
                report["details"].append({"id": v["video_id"], "status": "broken"})
                continue

            img = Image.open(BytesIO(resp.content))
            w, h = img.size

            if w < 200 or h < 150:
                report["placeholder"] += 1
                report["details"].append({"id": v["video_id"], "status": "placeholder", "size": f"{w}x{h}"})
            else:
                report["valid"] += 1

        except Exception as e:
            report["broken"] += 1
            report["details"].append({"id": v["video_id"], "status": "error", "msg": str(e)})

    return report

Task 3: Dead Video Detection

Videos get deleted or go private. Detect them early:

@app.task(rate_limit="20/m")
def detect_dead_videos(video_ids: list[str]) -> dict:
    """Check which videos are still publicly accessible."""
    alive, dead = [], []

    for vid in video_ids:
        try:
            resp = requests.head(
                f"https://www.youtube.com/oembed?url=https://youtube.com/watch?v={vid}",
                timeout=8,
            )
            (alive if resp.status_code == 200 else dead).append(vid)
        except requests.RequestException:
            dead.append(vid)

    return {"alive": len(alive), "dead": len(dead), "dead_ids": dead}

Dispatching from the Pipeline

from celery import group
from tasks import score_virality_batch, validate_thumbnails, detect_dead_videos

def background_processing(new_videos: list[dict], stale_ids: list[str], db_path: str):
    # Re-score in batches of 50
    for i in range(0, len(new_videos), 50):
        batch_ids = [v["video_id"] for v in new_videos[i:i+50]]
        score_virality_batch.apply_async(
            args=[batch_ids, db_path],
            queue="scoring",
        )

    # Validate thumbnails
    validate_thumbnails.apply_async(
        args=[new_videos],
        queue="thumbnails",
    )

    # Check stale videos for dead links
    if stale_ids:
        detect_dead_videos.apply_async(
            args=[stale_ids],
            queue="cleanup",
        )

Running Workers

# Scoring workers -- CPU bound
celery -A celery_app worker --queues=scoring --concurrency=4 --loglevel=info

# Thumbnail workers -- IO bound
celery -A celery_app worker --queues=thumbnails --concurrency=8

# Cleanup -- rate limited
celery -A celery_app worker --queues=cleanup --concurrency=2

Celery transformed how ViralVidVault processes metadata. The cron pipeline went from 15 minutes to 3 minutes, with background workers handling the heavy lifting asynchronously.

This article is part of the Building ViralVidVault series. Check out ViralVidVault to see these techniques in action.

Leave a Reply