Why Background Processing for a Viral Video Vault
ViralVidVault tracks viral videos across 7 European regions. The main cron pipeline needs to be fast — fetch, score, store. But secondary tasks like thumbnail quality checks, deep metadata enrichment, and stale link removal are too slow for the critical path. Celery lets us offload this work to background workers.
Celery Configuration
# celery_app.py
from celery import Celery
app = Celery(
"viralvidvault",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
task_acks_late=True,
worker_prefetch_multiplier=2,
task_soft_time_limit=90,
task_time_limit=120,
task_routes={
"tasks.score_virality_batch": {"queue": "scoring"},
"tasks.validate_thumbnails": {"queue": "thumbnails"},
"tasks.detect_dead_videos": {"queue": "cleanup"},
},
)
Task 1: Batch Virality Re-Scoring
The cron pipeline does a quick initial score, but background workers do a deeper analysis with historical data:
# tasks.py
import sqlite3
from celery_app import app
REGIONAL_BASELINES = {
"PL": {"velocity": 10000, "engagement": 6.0},
"NL": {"velocity": 8000, "engagement": 5.5},
"SE": {"velocity": 5000, "engagement": 5.0},
"NO": {"velocity": 4000, "engagement": 5.0},
"AT": {"velocity": 6000, "engagement": 5.0},
"GB": {"velocity": 20000, "engagement": 4.5},
"US": {"velocity": 50000, "engagement": 5.0},
}
@app.task(bind=True, max_retries=2)
def score_virality_batch(self, video_ids: list[str], db_path: str) -> dict:
"""Deep virality scoring with historical velocity analysis."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
results = {}
for vid in video_ids:
rows = conn.execute(
"""SELECT view_count, likes, comments, region,
strftime('%s', fetched_at) as ts
FROM video_snapshots
WHERE video_id = ?
ORDER BY fetched_at DESC LIMIT 5""",
(vid,),
).fetchall()
if len(rows) < 2:
results[vid] = {"score": 0, "reason": "insufficient data"}
continue
latest, previous = rows[0], rows[1]
region = latest["region"]
baseline = REGIONAL_BASELINES.get(region, REGIONAL_BASELINES["GB"])
time_delta = int(latest["ts"]) - int(previous["ts"])
if time_delta <= 0:
results[vid] = {"score": 0, "reason": "no time delta"}
continue
view_velocity = (latest["view_count"] - previous["view_count"]) / time_delta * 3600
engagement = (
(latest["likes"] + latest["comments"] * 2) / max(latest["view_count"], 1)
) * 100
velocity_norm = min(100, (view_velocity / baseline["velocity"]) * 100)
engagement_norm = min(100, (engagement / baseline["engagement"]) * 100)
score = velocity_norm * 0.55 + engagement_norm * 0.45
# Update in database
conn.execute(
"UPDATE videos SET virality_score = ? WHERE video_id = ?",
(round(score, 1), vid),
)
results[vid] = {
"score": round(score, 1),
"velocity": round(view_velocity),
"engagement": round(engagement, 2),
"region": region,
"label": "VIRAL" if score >= 85 else "TRENDING" if score >= 60 else "NORMAL",
}
conn.commit()
conn.close()
return results
Task 2: Thumbnail Quality Validation
import requests
from PIL import Image
from io import BytesIO
@app.task(bind=True, max_retries=3, default_retry_delay=30)
def validate_thumbnails(self, videos: list[dict]) -> dict:
"""Check thumbnail accessibility and quality."""
report = {"valid": 0, "broken": 0, "placeholder": 0, "details": []}
for v in videos:
try:
resp = requests.get(v["thumbnail_url"], timeout=8, stream=True)
if resp.status_code != 200:
report["broken"] += 1
report["details"].append({"id": v["video_id"], "status": "broken"})
continue
img = Image.open(BytesIO(resp.content))
w, h = img.size
if w < 200 or h < 150:
report["placeholder"] += 1
report["details"].append({"id": v["video_id"], "status": "placeholder", "size": f"{w}x{h}"})
else:
report["valid"] += 1
except Exception as e:
report["broken"] += 1
report["details"].append({"id": v["video_id"], "status": "error", "msg": str(e)})
return report
Task 3: Dead Video Detection
Videos get deleted or go private. Detect them early:
@app.task(rate_limit="20/m")
def detect_dead_videos(video_ids: list[str]) -> dict:
"""Check which videos are still publicly accessible."""
alive, dead = [], []
for vid in video_ids:
try:
resp = requests.head(
f"https://www.youtube.com/oembed?url=https://youtube.com/watch?v={vid}",
timeout=8,
)
(alive if resp.status_code == 200 else dead).append(vid)
except requests.RequestException:
dead.append(vid)
return {"alive": len(alive), "dead": len(dead), "dead_ids": dead}
Dispatching from the Pipeline
from celery import group
from tasks import score_virality_batch, validate_thumbnails, detect_dead_videos
def background_processing(new_videos: list[dict], stale_ids: list[str], db_path: str):
# Re-score in batches of 50
for i in range(0, len(new_videos), 50):
batch_ids = [v["video_id"] for v in new_videos[i:i+50]]
score_virality_batch.apply_async(
args=[batch_ids, db_path],
queue="scoring",
)
# Validate thumbnails
validate_thumbnails.apply_async(
args=[new_videos],
queue="thumbnails",
)
# Check stale videos for dead links
if stale_ids:
detect_dead_videos.apply_async(
args=[stale_ids],
queue="cleanup",
)
Running Workers
# Scoring workers -- CPU bound
celery -A celery_app worker --queues=scoring --concurrency=4 --loglevel=info
# Thumbnail workers -- IO bound
celery -A celery_app worker --queues=thumbnails --concurrency=8
# Cleanup -- rate limited
celery -A celery_app worker --queues=cleanup --concurrency=2
Celery transformed how ViralVidVault processes metadata. The cron pipeline went from 15 minutes to 3 minutes, with background workers handling the heavy lifting asynchronously.
This article is part of the Building ViralVidVault series. Check out ViralVidVault to see these techniques in action.
