Migrate to Procrastinate 3.x with backward compatibility for 2.x

- Add comprehensive compatibility layer supporting both Procrastinate 2.x and 3.x
- Implement version-aware database migration system with pre/post migrations for 3.x
- Create worker option mapping for seamless transition between versions
- Add extensive test coverage for all compatibility features
- Update dependency constraints to support both 2.x and 3.x simultaneously
- Provide Docker containerization with uv caching and multi-service orchestration
- Include demo applications and web interface for testing capabilities
- Bump version to 0.2.0 reflecting new compatibility features

Key Features:
- Automatic version detection and feature flagging
- Unified connector creation across PostgreSQL drivers
- Worker option translation (timeout → fetch_job_polling_interval)
- Database migration utilities with CLI and programmatic interfaces
- Complete Docker Compose setup with PostgreSQL, Redis, workers, and demos

Files Added:
- src/video_processor/tasks/compat.py - Core compatibility layer
- src/video_processor/tasks/migration.py - Migration utilities
- src/video_processor/tasks/worker_compatibility.py - Worker CLI
- tests/test_procrastinate_compat.py - Compatibility tests
- tests/test_procrastinate_migration.py - Migration tests
- Dockerfile - Multi-stage build with uv caching
- docker-compose.yml - Complete development environment
- examples/docker_demo.py - Containerized demo application
- examples/web_demo.py - Flask web interface demo

Migration Support:
- Procrastinate 2.x: Single migration command compatibility
- Procrastinate 3.x: Separate pre/post migration phases
- Database URL validation and connection testing
- Version-specific feature detection and graceful degradation
This commit is contained in:
Ryan Malloy 2025-09-05 10:38:12 -06:00
parent cfda5d6777
commit 5ca1b7a07d
25 changed files with 2536 additions and 173 deletions

84
Dockerfile Normal file
View File

@ -0,0 +1,84 @@
# Video Processor Dockerfile with uv caching optimization
# Based on uv Docker integration best practices
# https://docs.astral.sh/uv/guides/integration/docker/
FROM python:3.11-slim as base
# Install system dependencies
RUN apt-get update && apt-get install -y \
ffmpeg \
imagemagick \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
# Create app directory
WORKDIR /app
# Create user for running the application
RUN groupadd -r app && useradd -r -g app app
# Change to app user for dependency installation
USER app
# Copy dependency files first for better caching
COPY --chown=app:app pyproject.toml uv.lock* ./
# Create virtual environment and install dependencies
# This layer will be cached if dependencies don't change
ENV UV_SYSTEM_PYTHON=1
RUN uv sync --frozen --no-dev
# Copy application code
COPY --chown=app:app . .
# Install the application
RUN uv pip install -e .
# Production stage
FROM base as production
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PATH="/app/.venv/bin:$PATH"
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "from video_processor import VideoProcessor; print('OK')" || exit 1
# Default command
CMD ["python", "-m", "video_processor.tasks.procrastinate_tasks"]
# Development stage with dev dependencies
FROM base as development
# Install development dependencies
RUN uv sync --frozen
# Install pre-commit hooks
RUN uv run pre-commit install || true
# Set development environment
ENV FLASK_ENV=development
ENV PYTHONPATH=/app
# Default command for development
CMD ["bash"]
# Worker stage for Procrastinate workers
FROM production as worker
# Set worker-specific environment
ENV PROCRASTINATE_WORKER=1
# Command to run Procrastinate worker
CMD ["python", "-m", "video_processor.tasks.worker_compatibility", "worker"]
# Migration stage for database migrations
FROM production as migration
# Command to run migrations
CMD ["python", "-m", "video_processor.tasks.migration"]

View File

@ -126,6 +126,67 @@ uv add "video-processor[video-360-full]"
# Includes: All 360° dependencies + exifread
```
### ⚡ Procrastinate Migration (2.x → 3.x)
This library supports both **Procrastinate 2.x** and **3.x** for smooth migration:
#### 🔄 Automatic Version Detection
```python
from video_processor.tasks.compat import get_version_info, IS_PROCRASTINATE_3_PLUS
version_info = get_version_info()
print(f"Using Procrastinate {version_info['procrastinate_version']}")
print(f"Features available: {list(version_info['features'].keys())}")
# Version-aware setup
if IS_PROCRASTINATE_3_PLUS:
# Use 3.x features like improved performance, graceful shutdown
pass
```
#### 📋 Migration Steps
1. **Install compatible version**:
```bash
uv add "procrastinate>=3.5.2,<4.0.0" # Or keep 2.x support: ">=2.15.1,<4.0.0"
```
2. **Apply database migrations**:
```bash
# Procrastinate 3.x (two-step process)
procrastinate schema --apply --mode=pre # Before deploying
# Deploy new code
procrastinate schema --apply --mode=post # After deploying
# Procrastinate 2.x (single step)
procrastinate schema --apply
```
3. **Use migration helper**:
```python
from video_processor.tasks.migration import migrate_database
# Automatic version-aware migration
success = await migrate_database("postgresql://localhost/mydb")
```
4. **Update worker configuration**:
```python
from video_processor.tasks import get_worker_kwargs
# Automatically normalizes options for your version
worker_options = get_worker_kwargs(
concurrency=4,
timeout=5, # Maps to fetch_job_polling_interval in 3.x
remove_error=True, # Maps to remove_failed in 3.x
)
```
#### 🆕 Procrastinate 3.x Benefits
- **Better performance** with improved job fetching
- **Graceful shutdown** with `shutdown_graceful_timeout`
- **Enhanced error handling** and job cancellation
- **Schema compatibility** improvements (3.5.2+)
### Development Setup
```bash
@ -512,7 +573,7 @@ This project is licensed under the **MIT License** - see the [LICENSE](LICENSE)
- ✨ **Multi-format encoding**: MP4, WebM, OGV support
- 🖼️ **Thumbnail generation** with customizable timestamps
- 🎞️ **Sprite sheet creation** with WebVTT files
- ⚡ **Background processing** with Procrastinate
- ⚡ **Background processing** with Procrastinate (2.x and 3.x compatible)
- ⚙️ **Type-safe configuration** with Pydantic V2
- 🛠️ **Modern tooling**: uv, ruff, pytest integration
- 📚 **Comprehensive documentation** and examples

158
docker-compose.yml Normal file
View File

@ -0,0 +1,158 @@
# Docker Compose setup for Video Processor with Procrastinate
# Complete development and testing environment
version: '3.8'
services:
# PostgreSQL database for Procrastinate
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: video_processor
POSTGRES_USER: video_user
POSTGRES_PASSWORD: video_password
POSTGRES_HOST_AUTH_METHOD: trust
volumes:
- postgres_data:/var/lib/postgresql/data
- ./docker/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U video_user -d video_processor"]
interval: 10s
timeout: 5s
retries: 5
networks:
- video_net
# Redis for additional caching (optional)
redis:
image: redis:7-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
networks:
- video_net
# Video Processor API service
app:
build:
context: .
dockerfile: Dockerfile
target: development
environment:
- DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor
- PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor
- REDIS_URL=redis://redis:6379/0
- PYTHONPATH=/app
volumes:
- .:/app
- video_uploads:/app/uploads
- video_outputs:/app/outputs
ports:
- "8000:8000"
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
networks:
- video_net
command: ["python", "examples/docker_demo.py"]
# Procrastinate worker for background processing
worker:
build:
context: .
dockerfile: Dockerfile
target: worker
environment:
- PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor
- WORKER_CONCURRENCY=4
- WORKER_TIMEOUT=300
volumes:
- video_uploads:/app/uploads
- video_outputs:/app/outputs
depends_on:
postgres:
condition: service_healthy
networks:
- video_net
command: ["python", "-m", "video_processor.tasks.worker_compatibility", "worker"]
# Migration service (runs once to setup DB)
migrate:
build:
context: .
dockerfile: Dockerfile
target: migration
environment:
- PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor
depends_on:
postgres:
condition: service_healthy
networks:
- video_net
command: ["python", "-c", "
import asyncio;
from video_processor.tasks.migration import migrate_database;
asyncio.run(migrate_database('postgresql://video_user:video_password@postgres:5432/video_processor'))
"]
# Test runner service
test:
build:
context: .
dockerfile: Dockerfile
target: development
environment:
- DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor_test
- PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor_test
volumes:
- .:/app
depends_on:
postgres:
condition: service_healthy
networks:
- video_net
command: ["uv", "run", "pytest", "tests/", "-v", "--cov=src/", "--cov-report=html", "--cov-report=term"]
# Demo web interface (optional)
demo:
build:
context: .
dockerfile: Dockerfile
target: development
environment:
- DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor
- PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor
ports:
- "8080:8080"
volumes:
- .:/app
- video_uploads:/app/uploads
- video_outputs:/app/outputs
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
networks:
- video_net
command: ["python", "examples/web_demo.py"]
volumes:
postgres_data:
driver: local
video_uploads:
driver: local
video_outputs:
driver: local
networks:
video_net:
driver: bridge

42
docker/init-db.sql Normal file
View File

@ -0,0 +1,42 @@
-- Database initialization for Video Processor
-- Creates necessary databases and extensions
-- Create test database
CREATE DATABASE video_processor_test;
-- Connect to main database
\c video_processor;
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Create basic schema (Procrastinate will handle its own tables)
CREATE SCHEMA IF NOT EXISTS video_processor;
-- Grant permissions
GRANT ALL PRIVILEGES ON DATABASE video_processor TO video_user;
GRANT ALL PRIVILEGES ON DATABASE video_processor_test TO video_user;
GRANT ALL PRIVILEGES ON SCHEMA video_processor TO video_user;
-- Create a sample videos table for demo purposes
CREATE TABLE IF NOT EXISTS video_processor.videos (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
filename VARCHAR(255) NOT NULL,
original_path TEXT,
processed_path TEXT,
status VARCHAR(50) DEFAULT 'pending',
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create index for efficient queries
CREATE INDEX IF NOT EXISTS idx_videos_status ON video_processor.videos(status);
CREATE INDEX IF NOT EXISTS idx_videos_created_at ON video_processor.videos(created_at);
-- Insert sample data
INSERT INTO video_processor.videos (filename, status) VALUES
('sample_video_1.mp4', 'pending'),
('sample_video_2.mp4', 'processing'),
('sample_video_3.mp4', 'completed')
ON CONFLICT DO NOTHING;

View File

@ -14,7 +14,8 @@ from pathlib import Path
import procrastinate
from video_processor import ProcessorConfig
from video_processor.tasks import setup_procrastinate
from video_processor.tasks import setup_procrastinate, get_worker_kwargs
from video_processor.tasks.compat import get_version_info, IS_PROCRASTINATE_3_PLUS
async def async_processing_example():
@ -25,8 +26,18 @@ async def async_processing_example():
database_url = "postgresql://localhost/procrastinate_test"
try:
# Set up Procrastinate
app = setup_procrastinate(database_url)
# Print version information
version_info = get_version_info()
print(f"Using Procrastinate {version_info['procrastinate_version']}")
print(f"Version 3.x+: {version_info['is_v3_plus']}")
# Set up Procrastinate with version-appropriate settings
connector_kwargs = {}
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x specific settings
connector_kwargs["pool_size"] = 10
app = setup_procrastinate(database_url, connector_kwargs=connector_kwargs)
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)

231
examples/docker_demo.py Normal file
View File

@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
Docker Demo Application for Video Processor
This demo shows how to use the video processor in a containerized environment
with Procrastinate background tasks and PostgreSQL.
"""
import asyncio
import logging
import os
import tempfile
from pathlib import Path
from video_processor import ProcessorConfig, VideoProcessor
from video_processor.tasks import setup_procrastinate
from video_processor.tasks.compat import get_version_info
from video_processor.tasks.migration import migrate_database
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def create_sample_video(output_path: Path) -> Path:
"""Create a sample video using ffmpeg for testing."""
video_file = output_path / "sample_test_video.mp4"
# Create a simple test video using ffmpeg
import subprocess
cmd = [
"ffmpeg", "-y",
"-f", "lavfi",
"-i", "testsrc=duration=10:size=640x480:rate=30",
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
str(video_file)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"FFmpeg failed: {result.stderr}")
raise RuntimeError("Failed to create sample video")
logger.info(f"Created sample video: {video_file}")
return video_file
except FileNotFoundError:
logger.error("FFmpeg not found. Please install FFmpeg.")
raise
async def demo_sync_processing():
"""Demonstrate synchronous video processing."""
logger.info("🎬 Starting Synchronous Processing Demo")
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create sample video
sample_video = await create_sample_video(temp_path)
# Configure processor
config = ProcessorConfig(
output_dir=temp_path / "outputs",
output_formats=["mp4", "webm"],
quality_preset="fast",
generate_thumbnails=True,
generate_sprites=True,
enable_360_processing=True, # Will be disabled if deps not available
)
# Process video
processor = VideoProcessor(config)
result = processor.process_video(sample_video)
logger.info("✅ Synchronous processing completed!")
logger.info(f"📹 Processed video ID: {result.video_id}")
logger.info(f"📁 Output files: {len(result.encoded_files)} formats")
logger.info(f"🖼️ Thumbnails: {len(result.thumbnails)}")
if result.sprite_file:
sprite_size = result.sprite_file.stat().st_size // 1024
logger.info(f"🎯 Sprite sheet: {sprite_size}KB")
if hasattr(result, 'thumbnails_360') and result.thumbnails_360:
logger.info(f"🌐 360° thumbnails: {len(result.thumbnails_360)}")
async def demo_async_processing():
"""Demonstrate asynchronous video processing with Procrastinate."""
logger.info("⚡ Starting Asynchronous Processing Demo")
# Get database URL from environment
database_url = os.environ.get(
'PROCRASTINATE_DATABASE_URL',
'postgresql://video_user:video_password@postgres:5432/video_processor'
)
try:
# Show version info
version_info = get_version_info()
logger.info(f"📦 Using Procrastinate {version_info['procrastinate_version']}")
# Run migrations
logger.info("🔄 Running database migrations...")
migration_success = await migrate_database(database_url)
if not migration_success:
logger.error("❌ Database migration failed")
return
logger.info("✅ Database migrations completed")
# Set up Procrastinate
app = setup_procrastinate(database_url)
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create sample video
sample_video = await create_sample_video(temp_path)
# Configure processing
config_dict = {
"base_path": str(temp_path),
"output_formats": ["mp4"],
"quality_preset": "fast",
"generate_thumbnails": True,
"sprite_interval": 5,
}
async with app.open_async() as app_context:
# Submit video processing task
logger.info("📤 Submitting async video processing job...")
job = await app_context.configure_task(
"process_video_async",
queue="video_processing"
).defer_async(
input_path=str(sample_video),
output_dir=str(temp_path / "async_outputs"),
config_dict=config_dict
)
logger.info(f"✅ Job submitted with ID: {job.id}")
logger.info("🔄 Job will be processed by background worker...")
# In a real app, you would monitor job status or use webhooks
# For demo purposes, we'll just show the job was submitted
# Submit additional tasks
logger.info("📤 Submitting thumbnail generation job...")
thumb_job = await app_context.configure_task(
"generate_thumbnail_async",
queue="thumbnail_generation"
).defer_async(
video_path=str(sample_video),
output_dir=str(temp_path / "thumbnails"),
timestamp=5,
video_id="demo_thumb"
)
logger.info(f"✅ Thumbnail job submitted: {thumb_job.id}")
except Exception as e:
logger.error(f"❌ Async processing demo failed: {e}")
raise
async def demo_migration_features():
"""Demonstrate migration utilities."""
logger.info("🔄 Migration Features Demo")
from video_processor.tasks.migration import ProcrastinateMigrationHelper
database_url = os.environ.get(
'PROCRASTINATE_DATABASE_URL',
'postgresql://video_user:video_password@postgres:5432/video_processor'
)
# Show migration plan
helper = ProcrastinateMigrationHelper(database_url)
helper.print_migration_plan()
# Show version-specific features
version_info = get_version_info()
logger.info("🆕 Available Features:")
for feature, available in version_info['features'].items():
status = "" if available else ""
logger.info(f" {status} {feature}")
async def main():
"""Run all demo scenarios."""
logger.info("🚀 Video Processor Docker Demo Starting...")
try:
# Run demos in sequence
await demo_sync_processing()
await demo_async_processing()
await demo_migration_features()
logger.info("🎉 All demos completed successfully!")
# Keep the container running to show logs
logger.info("📋 Demo completed. Container will keep running for log inspection...")
logger.info("💡 Check the logs with: docker-compose logs app")
logger.info("🛑 Stop with: docker-compose down")
# Keep running for log inspection
while True:
await asyncio.sleep(30)
logger.info("💓 Demo container heartbeat - still running...")
except KeyboardInterrupt:
logger.info("🛑 Demo interrupted by user")
except Exception as e:
logger.error(f"❌ Demo failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())

254
examples/web_demo.py Normal file
View File

@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""
Simple web demo interface for Video Processor.
This provides a basic Flask web interface to demonstrate video processing
capabilities in a browser-friendly format.
"""
import asyncio
import os
import tempfile
from pathlib import Path
from typing import Optional
try:
from flask import Flask, jsonify, render_template_string, request
except ImportError:
print("Flask not installed. Install with: uv add flask")
exit(1)
from video_processor import ProcessorConfig, VideoProcessor
from video_processor.tasks import setup_procrastinate
from video_processor.tasks.compat import get_version_info
# Simple HTML template
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>Video Processor Demo</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.container { max-width: 800px; margin: 0 auto; }
.status { padding: 10px; margin: 10px 0; border-radius: 5px; }
.success { background: #d4edda; color: #155724; }
.error { background: #f8d7da; color: #721c24; }
.info { background: #d1ecf1; color: #0c5460; }
pre { background: #f8f9fa; padding: 10px; border-radius: 5px; }
button { background: #007bff; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; }
button:hover { background: #0056b3; }
</style>
</head>
<body>
<div class="container">
<h1>🎬 Video Processor Demo</h1>
<div class="status info">
<strong>System Information:</strong><br>
Version: {{ version_info.version }}<br>
Procrastinate: {{ version_info.procrastinate_version }}<br>
Features: {{ version_info.features }}
</div>
<h2>Test Video Processing</h2>
<button onclick="processTestVideo()">Create & Process Test Video</button>
<button onclick="submitAsyncJob()">Submit Async Processing Job</button>
<button onclick="getSystemInfo()">Refresh System Info</button>
<div id="results"></div>
<h2>Processing Logs</h2>
<pre id="logs">Ready...</pre>
</div>
<script>
function log(message) {
const logs = document.getElementById('logs');
logs.textContent += new Date().toLocaleTimeString() + ': ' + message + '\\n';
logs.scrollTop = logs.scrollHeight;
}
function showResult(data, isError = false) {
const results = document.getElementById('results');
const className = isError ? 'error' : 'success';
results.innerHTML = '<div class="status ' + className + '"><pre>' + JSON.stringify(data, null, 2) + '</pre></div>';
}
async function processTestVideo() {
log('Starting test video processing...');
try {
const response = await fetch('/api/process-test', { method: 'POST' });
const data = await response.json();
if (response.ok) {
log('Test video processing completed successfully');
showResult(data);
} else {
log('Test video processing failed: ' + data.error);
showResult(data, true);
}
} catch (error) {
log('Request failed: ' + error);
showResult({error: error.message}, true);
}
}
async function submitAsyncJob() {
log('Submitting async processing job...');
try {
const response = await fetch('/api/async-job', { method: 'POST' });
const data = await response.json();
if (response.ok) {
log('Async job submitted with ID: ' + data.job_id);
showResult(data);
} else {
log('Async job submission failed: ' + data.error);
showResult(data, true);
}
} catch (error) {
log('Request failed: ' + error);
showResult({error: error.message}, true);
}
}
async function getSystemInfo() {
log('Refreshing system information...');
try {
const response = await fetch('/api/info');
const data = await response.json();
showResult(data);
log('System info refreshed');
} catch (error) {
log('Failed to get system info: ' + error);
showResult({error: error.message}, true);
}
}
</script>
</body>
</html>
"""
app = Flask(__name__)
async def create_test_video(output_dir: Path) -> Path:
"""Create a simple test video for processing."""
import subprocess
video_file = output_dir / "web_demo_test.mp4"
cmd = [
"ffmpeg", "-y",
"-f", "lavfi",
"-i", "testsrc=duration=5:size=320x240:rate=15",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "30",
str(video_file)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
return video_file
except FileNotFoundError:
raise RuntimeError("FFmpeg not found. Please install FFmpeg.")
@app.route('/')
def index():
"""Serve the demo web interface."""
version_info = get_version_info()
return render_template_string(HTML_TEMPLATE, version_info=version_info)
@app.route('/api/info')
def api_info():
"""Get system information."""
return jsonify(get_version_info())
@app.route('/api/process-test', methods=['POST'])
def api_process_test():
"""Process a test video synchronously."""
try:
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create test video
test_video = asyncio.run(create_test_video(temp_path))
# Configure processor for fast processing
config = ProcessorConfig(
output_dir=temp_path / "outputs",
output_formats=["mp4"],
quality_preset="ultrafast",
generate_thumbnails=True,
generate_sprites=False, # Skip sprites for faster demo
enable_360_processing=False, # Skip 360 for faster demo
)
# Process video
processor = VideoProcessor(config)
result = processor.process_video(test_video)
return jsonify({
"status": "success",
"video_id": result.video_id,
"encoded_files": len(result.encoded_files),
"thumbnails": len(result.thumbnails),
"processing_time": "< 30s (estimated)",
"message": "Test video processed successfully!"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/async-job', methods=['POST'])
def api_async_job():
"""Submit an async processing job."""
try:
database_url = os.environ.get(
'PROCRASTINATE_DATABASE_URL',
'postgresql://video_user:video_password@postgres:5432/video_processor'
)
# Set up Procrastinate
app_context = setup_procrastinate(database_url)
# In a real application, you would:
# 1. Accept file uploads
# 2. Store them temporarily
# 3. Submit processing jobs
# 4. Return job IDs for status tracking
# For demo, we'll just simulate job submission
job_id = f"demo-job-{os.urandom(4).hex()}"
return jsonify({
"status": "submitted",
"job_id": job_id,
"queue": "video_processing",
"message": "Job submitted to background worker",
"note": "In production, this would submit a real Procrastinate job"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
def main():
"""Run the web demo server."""
port = int(os.environ.get('PORT', 8080))
debug = os.environ.get('FLASK_ENV') == 'development'
print(f"🌐 Starting Video Processor Web Demo on port {port}")
print(f"📖 Open http://localhost:{port} in your browser")
app.run(host='0.0.0.0', port=port, debug=debug)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""
Procrastinate worker compatibility example.
This example demonstrates how to run a Procrastinate worker that works
with both version 2.x and 3.x of Procrastinate.
"""
import asyncio
import logging
import signal
import sys
from pathlib import Path
from video_processor.tasks import setup_procrastinate, get_worker_kwargs
from video_processor.tasks.compat import get_version_info, IS_PROCRASTINATE_3_PLUS
from video_processor.tasks.migration import migrate_database
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def setup_and_run_worker():
"""Set up and run a Procrastinate worker with version compatibility."""
# Database connection
database_url = "postgresql://localhost/procrastinate_dev"
try:
# Print version information
version_info = get_version_info()
logger.info(f"Starting worker with Procrastinate {version_info['procrastinate_version']}")
logger.info(f"Available features: {list(version_info['features'].keys())}")
# Optionally run database migration
migrate_success = await migrate_database(database_url)
if not migrate_success:
logger.error("Database migration failed")
return
# Set up Procrastinate app
connector_kwargs = {}
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x connection pool settings
connector_kwargs.update({
"pool_size": 20,
"max_pool_size": 50,
})
app = setup_procrastinate(database_url, connector_kwargs=connector_kwargs)
# Configure worker options with version compatibility
worker_options = {
"concurrency": 4,
"name": "video-processor-worker",
}
# Add version-specific options
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x options
worker_options.update({
"fetch_job_polling_interval": 5, # Renamed from "timeout" in 2.x
"shutdown_graceful_timeout": 30, # New in 3.x
"remove_failed": True, # Renamed from "remove_error"
"include_failed": False, # Renamed from "include_error"
})
else:
# Procrastinate 2.x options
worker_options.update({
"timeout": 5,
"remove_error": True,
"include_error": False,
})
# Normalize options for the current version
normalized_options = get_worker_kwargs(**worker_options)
logger.info(f"Worker options: {normalized_options}")
# Create and configure worker
async with app.open_async() as app_context:
worker = app_context.create_worker(
queues=["video_processing", "thumbnail_generation", "sprite_generation"],
**normalized_options
)
# Set up signal handlers for graceful shutdown
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x has improved graceful shutdown
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down gracefully...")
worker.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info("Starting Procrastinate worker...")
logger.info("Queues: video_processing, thumbnail_generation, sprite_generation")
logger.info("Press Ctrl+C to stop")
# Run the worker
await worker.run_async()
except KeyboardInterrupt:
logger.info("Worker interrupted by user")
except Exception as e:
logger.error(f"Worker error: {e}")
raise
async def test_task_submission():
"""Test task submission with both Procrastinate versions."""
database_url = "postgresql://localhost/procrastinate_dev"
try:
app = setup_procrastinate(database_url)
# Test video processing task
with Path("test_video.mp4").open("w") as f:
f.write("") # Create dummy file for testing
async with app.open_async() as app_context:
# Submit test task
job = await app_context.configure_task(
"process_video_async",
queue="video_processing"
).defer_async(
input_path="test_video.mp4",
output_dir="/tmp/test_output",
config_dict={"quality_preset": "fast"}
)
logger.info(f"Submitted test job: {job.id}")
# Clean up
Path("test_video.mp4").unlink(missing_ok=True)
except Exception as e:
logger.error(f"Task submission test failed: {e}")
def show_migration_help():
"""Show migration help for upgrading from Procrastinate 2.x to 3.x."""
print("\nProcrastinate Migration Guide")
print("=" * 40)
version_info = get_version_info()
if version_info['is_v3_plus']:
print("✅ You are running Procrastinate 3.x")
print("\nMigration steps for 3.x:")
print("1. Apply pre-migration: python -m video_processor.tasks.migration --pre")
print("2. Deploy new application code")
print("3. Apply post-migration: python -m video_processor.tasks.migration --post")
print("4. Verify: procrastinate schema --check")
else:
print("📦 You are running Procrastinate 2.x")
print("\nTo upgrade to 3.x:")
print("1. Update dependencies: uv add 'procrastinate>=3.0,<4.0'")
print("2. Apply pre-migration: python -m video_processor.tasks.migration --pre")
print("3. Deploy new code")
print("4. Apply post-migration: python -m video_processor.tasks.migration --post")
print(f"\nCurrent version: {version_info['procrastinate_version']}")
print(f"Available features: {list(version_info['features'].keys())}")
if __name__ == "__main__":
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "worker":
asyncio.run(setup_and_run_worker())
elif command == "test":
asyncio.run(test_task_submission())
elif command == "help":
show_migration_help()
else:
print("Usage: python worker_compatibility.py [worker|test|help]")
else:
print("Procrastinate Worker Compatibility Demo")
print("Usage:")
print(" python worker_compatibility.py worker - Run worker")
print(" python worker_compatibility.py test - Test task submission")
print(" python worker_compatibility.py help - Show migration help")
show_migration_help()

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

View File

@ -4,20 +4,21 @@ build-backend = "hatchling.build"
[project]
name = "video-processor"
version = "0.1.0"
version = "0.2.0"
description = "Standalone video processing pipeline with multiple format encoding"
authors = [{name = "Video Processor", email = "dev@example.com"}]
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"ffmpeg-python>=0.2.0",
"pillow>=11.2.1",
"pillow>=11.2.1",
"msprites2 @ git+https://github.com/rsp2k/msprites2.git",
"procrastinate>=2.15.1",
"procrastinate>=2.15.1,<4.0.0", # Support both 2.x and 3.x during migration
"psycopg[pool]>=3.2.9",
"python-dateutil>=2.9.0",
"pydantic>=2.0.0",
"pydantic-settings>=2.0.0",
"exifread>=3.5.1",
]
[project.optional-dependencies]
@ -102,11 +103,13 @@ testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
asyncio_mode = "auto"
[dependency-groups]
dev = [
"mypy>=1.17.1",
"pytest>=8.4.2",
"pytest-asyncio>=0.21.0",
"pytest-cov>=6.2.1",
"ruff>=0.12.12",
]

View File

@ -11,8 +11,8 @@ from .exceptions import EncodingError, StorageError, VideoProcessorError
# Optional 360° imports
try:
from .utils.video_360 import Video360Detection, Video360Utils, HAS_360_SUPPORT
from .core.thumbnails_360 import Thumbnail360Generator
from .utils.video_360 import HAS_360_SUPPORT, Video360Detection, Video360Utils
except ImportError:
HAS_360_SUPPORT = False
@ -30,6 +30,6 @@ __all__ = [
if HAS_360_SUPPORT:
__all__.extend([
"Video360Detection",
"Video360Utils",
"Video360Utils",
"Thumbnail360Generator",
])

View File

@ -7,7 +7,12 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator
# Optional dependency detection for 360° features
try:
from .utils.video_360 import Video360Utils, ProjectionType, StereoMode, HAS_360_SUPPORT
from .utils.video_360 import (
HAS_360_SUPPORT,
ProjectionType,
StereoMode,
Video360Utils,
)
except ImportError:
# Fallback types when 360° libraries not available
ProjectionType = str
@ -43,7 +48,7 @@ class ProcessorConfig(BaseModel):
# File permissions
file_permissions: int = 0o644
directory_permissions: int = 0o755
# 360° Video settings (only active if 360° libraries are available)
enable_360_processing: bool = Field(default=HAS_360_SUPPORT)
auto_detect_360: bool = Field(default=True)
@ -67,7 +72,7 @@ class ProcessorConfig(BaseModel):
if not v:
raise ValueError("At least one output format must be specified")
return v
@field_validator("enable_360_processing")
@classmethod
def validate_360_processing(cls, v: bool) -> bool:
@ -75,7 +80,7 @@ class ProcessorConfig(BaseModel):
if v and not HAS_360_SUPPORT:
raise ValueError(
"360° processing requires optional dependencies. "
f"Install with: pip install 'video-processor[video-360]' or uv add 'video-processor[video-360]'"
"Install with: pip install 'video-processor[video-360]' or uv add 'video-processor[video-360]'"
)
return v

View File

@ -57,7 +57,7 @@ class VideoMetadata:
# Raw probe data for advanced use cases
"raw_probe_data": probe_data,
}
# Add 360° video detection
video_360_info = Video360Detection.detect_360_video(metadata)
metadata["video_360"] = video_360_info

View File

@ -55,7 +55,7 @@ class VideoProcessor:
self.encoder = VideoEncoder(config)
self.thumbnail_generator = ThumbnailGenerator(config)
self.metadata_extractor = VideoMetadata(config)
# Initialize 360° thumbnail generator if available and enabled
if HAS_360_SUPPORT and config.enable_360_processing:
self.thumbnail_360_generator = Thumbnail360Generator(config)
@ -138,19 +138,19 @@ class VideoProcessor:
sprite_file, webvtt_file = self.thumbnail_generator.generate_sprites(
encoded_files["mp4"], output_dir, video_id
)
# Generate 360° thumbnails and sprites if this is a 360° video
thumbnails_360 = {}
sprite_360_files = {}
if (self.thumbnail_360_generator and
if (self.thumbnail_360_generator and
self.config.generate_360_thumbnails and
metadata.get("video_360", {}).get("is_360_video", False)):
# Get 360° video information
video_360_info = metadata["video_360"]
projection_type = video_360_info.get("projection_type", "equirectangular")
# Generate 360° thumbnails for each timestamp
for timestamp in self.config.thumbnail_timestamps:
angle_thumbnails = self.thumbnail_360_generator.generate_360_thumbnails(
@ -161,12 +161,12 @@ class VideoProcessor:
projection_type,
self.config.thumbnail_360_projections,
)
# Store thumbnails by timestamp and angle
for angle, thumbnail_path in angle_thumbnails.items():
key = f"{timestamp}s_{angle}"
thumbnails_360[key] = thumbnail_path
# Generate 360° sprite sheets for each viewing angle
if self.config.generate_sprites:
for angle in self.config.thumbnail_360_projections:

View File

@ -3,10 +3,10 @@
from pathlib import Path
import ffmpeg
from msprites2 import MontageSprites
from ..config import ProcessorConfig
from ..exceptions import EncodingError, FFmpegError
from ..utils.sprite_generator import FixedSpriteGenerator
class ThumbnailGenerator:
@ -99,45 +99,28 @@ class ThumbnailGenerator:
webvtt_file = output_dir / f"{video_id}_sprite.webvtt"
thumbnail_dir = output_dir / "frames"
# Create frames directory
thumbnail_dir.mkdir(exist_ok=True)
try:
# Generate sprites using msprites2 (the forked library)
MontageSprites.from_media(
video_path=str(video_path),
thumbnail_dir=str(thumbnail_dir),
sprite_file=str(sprite_file),
webvtt_file=str(webvtt_file),
# Optional parameters - can be made configurable
interval=self.config.sprite_interval,
width=160, # Individual thumbnail width
height=90, # Individual thumbnail height
columns=10, # Thumbnails per row in sprite
# Use our fixed sprite generator
sprite_path, webvtt_path = FixedSpriteGenerator.create_sprite_sheet(
video_path=video_path,
thumbnail_dir=thumbnail_dir,
sprite_file=sprite_file,
webvtt_file=webvtt_file,
ips=1.0 / self.config.sprite_interval,
width=160,
height=90,
cols=10,
rows=10,
cleanup=True,
)
except Exception as e:
raise EncodingError(f"Sprite generation failed: {e}") from e
if not sprite_file.exists():
if not sprite_path.exists():
raise EncodingError("Sprite generation failed - sprite file not created")
if not webvtt_file.exists():
if not webvtt_path.exists():
raise EncodingError("Sprite generation failed - WebVTT file not created")
# Clean up temporary frames directory
self._cleanup_frames_directory(thumbnail_dir)
return sprite_file, webvtt_file
def _cleanup_frames_directory(self, frames_dir: Path) -> None:
"""Clean up temporary frame files."""
try:
if frames_dir.exists():
for frame_file in frames_dir.iterdir():
if frame_file.is_file():
frame_file.unlink()
frames_dir.rmdir()
except Exception:
# Don't fail the entire process if cleanup fails
pass
return sprite_path, webvtt_path

View File

@ -13,7 +13,8 @@ from ..exceptions import EncodingError, FFmpegError
try:
import cv2
import numpy as np
from ..utils.video_360 import ProjectionType, Video360Utils, HAS_360_SUPPORT
from ..utils.video_360 import HAS_360_SUPPORT, ProjectionType, Video360Utils
except ImportError:
# Fallback types when dependencies not available
ProjectionType = str
@ -27,7 +28,7 @@ class Thumbnail360Generator:
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
if not HAS_360_SUPPORT:
raise ImportError(
"360° thumbnail generation requires optional dependencies. "
@ -61,30 +62,30 @@ class Thumbnail360Generator:
viewing_angles = self.config.thumbnail_360_projections
thumbnails = {}
# First extract a full equirectangular frame
equirect_frame = self._extract_equirectangular_frame(
video_path, timestamp, output_dir, video_id
)
try:
# Load the equirectangular image
equirect_img = cv2.imread(str(equirect_frame))
if equirect_img is None:
raise EncodingError(f"Failed to load equirectangular frame: {equirect_frame}")
# Generate thumbnails for each viewing angle
for angle in viewing_angles:
thumbnail_path = self._generate_angle_thumbnail(
equirect_img, angle, output_dir, video_id, timestamp
)
thumbnails[angle] = thumbnail_path
finally:
# Clean up temporary equirectangular frame
if equirect_frame.exists():
equirect_frame.unlink()
return thumbnails
def _extract_equirectangular_frame(
@ -92,7 +93,7 @@ class Thumbnail360Generator:
) -> Path:
"""Extract a full equirectangular frame from the 360° video."""
temp_frame = output_dir / f"{video_id}_temp_equirect_{timestamp}.jpg"
try:
# Get video info
probe = ffmpeg.probe(str(video_path))
@ -100,15 +101,15 @@ class Thumbnail360Generator:
stream for stream in probe["streams"]
if stream["codec_type"] == "video"
)
width = video_stream["width"]
height = video_stream["height"]
duration = float(video_stream.get("duration", 0))
# Adjust timestamp if beyond video duration
if timestamp >= duration:
timestamp = max(1, int(duration // 2))
# Extract full resolution frame
(
ffmpeg.input(str(video_path), ss=timestamp)
@ -117,14 +118,14 @@ class Thumbnail360Generator:
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True, quiet=True)
)
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else "Unknown FFmpeg error"
raise FFmpegError(f"Frame extraction failed: {error_msg}") from e
if not temp_frame.exists():
raise EncodingError("Frame extraction failed - output file not created")
return temp_frame
def _generate_angle_thumbnail(
@ -137,17 +138,17 @@ class Thumbnail360Generator:
) -> Path:
"""Generate thumbnail for a specific viewing angle."""
output_path = output_dir / f"{video_id}_360_{viewing_angle}_{timestamp}.jpg"
if viewing_angle == "stereographic":
# Generate "little planet" stereographic projection
thumbnail = self._create_stereographic_projection(equirect_img)
else:
# Generate perspective projection for the viewing angle
thumbnail = self._create_perspective_projection(equirect_img, viewing_angle)
# Save thumbnail
cv2.imwrite(str(output_path), thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 85])
return output_path
def _create_perspective_projection(
@ -155,7 +156,7 @@ class Thumbnail360Generator:
) -> "np.ndarray":
"""Create perspective projection for a viewing angle."""
height, width = equirect_img.shape[:2]
# Define viewing directions (yaw, pitch) in radians
viewing_directions = {
"front": (0, 0),
@ -165,68 +166,68 @@ class Thumbnail360Generator:
"up": (0, math.pi/2),
"down": (0, -math.pi/2),
}
if viewing_angle not in viewing_directions:
viewing_angle = "front"
yaw, pitch = viewing_directions[viewing_angle]
# Generate perspective view
thumbnail_size = self.config.thumbnail_width
fov = math.pi / 3 # 60 degrees field of view
# Create coordinate maps for perspective projection
u_map, v_map = self._create_perspective_maps(
thumbnail_size, thumbnail_size, fov, yaw, pitch, width, height
)
# Apply remapping
thumbnail = cv2.remap(equirect_img, u_map, v_map, cv2.INTER_LINEAR)
return thumbnail
def _create_stereographic_projection(self, equirect_img: "np.ndarray") -> "np.ndarray":
"""Create stereographic 'little planet' projection."""
height, width = equirect_img.shape[:2]
# Output size for stereographic projection
output_size = self.config.thumbnail_width
# Create coordinate maps for stereographic projection
y_coords, x_coords = np.mgrid[0:output_size, 0:output_size]
# Convert to centered coordinates
x_centered = (x_coords - output_size // 2) / (output_size // 2)
y_centered = (y_coords - output_size // 2) / (output_size // 2)
# Calculate distance from center
r = np.sqrt(x_centered**2 + y_centered**2)
# Create mask for circular boundary
mask = r <= 1.0
# Convert to spherical coordinates for stereographic projection
theta = np.arctan2(y_centered, x_centered)
phi = 2 * np.arctan(r)
# Convert to equirectangular coordinates
u = (theta + np.pi) / (2 * np.pi) * width
v = (np.pi/2 - phi) / np.pi * height
# Clamp coordinates
u = np.clip(u, 0, width - 1)
v = np.clip(v, 0, height - 1)
# Create maps for remapping
u_map = u.astype(np.float32)
v_map = v.astype(np.float32)
# Apply remapping
thumbnail = cv2.remap(equirect_img, u_map, v_map, cv2.INTER_LINEAR)
# Apply circular mask
thumbnail[~mask] = [0, 0, 0] # Black background
return thumbnail
def _create_perspective_maps(
@ -242,48 +243,48 @@ class Thumbnail360Generator:
"""Create coordinate mapping for perspective projection."""
# Create output coordinate grids
y_coords, x_coords = np.mgrid[0:out_height, 0:out_width]
# Convert to normalized device coordinates [-1, 1]
x_ndc = (x_coords - out_width / 2) / (out_width / 2)
y_ndc = (y_coords - out_height / 2) / (out_height / 2)
# Apply perspective projection
focal_length = 1.0 / math.tan(fov / 2)
# Create 3D ray directions
x_3d = x_ndc / focal_length
y_3d = y_ndc / focal_length
z_3d = np.ones_like(x_3d)
# Normalize ray directions
ray_length = np.sqrt(x_3d**2 + y_3d**2 + z_3d**2)
x_3d /= ray_length
y_3d /= ray_length
z_3d /= ray_length
# Apply rotation for viewing direction
# Rotate by yaw (around Y axis)
cos_yaw, sin_yaw = math.cos(yaw), math.sin(yaw)
x_rot = x_3d * cos_yaw - z_3d * sin_yaw
z_rot = x_3d * sin_yaw + z_3d * cos_yaw
# Rotate by pitch (around X axis)
cos_pitch, sin_pitch = math.cos(pitch), math.sin(pitch)
y_rot = y_3d * cos_pitch - z_rot * sin_pitch
z_final = y_3d * sin_pitch + z_rot * cos_pitch
# Convert 3D coordinates to spherical
theta = np.arctan2(x_rot, z_final)
phi = np.arcsin(np.clip(y_rot, -1, 1))
# Convert spherical to equirectangular coordinates
u = (theta + np.pi) / (2 * np.pi) * equirect_width
v = (np.pi/2 - phi) / np.pi * equirect_height
# Clamp to image boundaries
u = np.clip(u, 0, equirect_width - 1)
v = np.clip(v, 0, equirect_height - 1)
return u.astype(np.float32), v.astype(np.float32)
def generate_360_sprite_thumbnails(
@ -310,19 +311,19 @@ class Thumbnail360Generator:
sprite_file = output_dir / f"{video_id}_360_{viewing_angle}_sprite.jpg"
webvtt_file = output_dir / f"{video_id}_360_{viewing_angle}_sprite.webvtt"
frames_dir = output_dir / "frames_360"
# Create frames directory
frames_dir.mkdir(exist_ok=True)
try:
# Get video duration
probe = ffmpeg.probe(str(video_path))
duration = float(probe["format"]["duration"])
# Generate frames at specified intervals
interval = self.config.sprite_interval
timestamps = list(range(0, int(duration), interval))
frame_paths = []
for i, timestamp in enumerate(timestamps):
# Generate 360° thumbnail for this timestamp
@ -330,16 +331,16 @@ class Thumbnail360Generator:
video_path, frames_dir, timestamp, f"{video_id}_frame_{i}",
projection_type, [viewing_angle]
)
if viewing_angle in thumbnails:
frame_paths.append(thumbnails[viewing_angle])
# Create sprite sheet from frames
if frame_paths:
self._create_sprite_sheet(frame_paths, sprite_file, timestamps, webvtt_file)
return sprite_file, webvtt_file
finally:
# Clean up frame files
if frames_dir.exists():
@ -358,58 +359,58 @@ class Thumbnail360Generator:
"""Create sprite sheet from individual frames."""
if not frame_paths:
raise EncodingError("No frames available for sprite sheet creation")
# Load first frame to get dimensions
first_frame = cv2.imread(str(frame_paths[0]))
if first_frame is None:
raise EncodingError(f"Failed to load first frame: {frame_paths[0]}")
frame_height, frame_width = first_frame.shape[:2]
# Calculate sprite sheet layout
cols = 10 # 10 thumbnails per row
rows = math.ceil(len(frame_paths) / cols)
sprite_width = cols * frame_width
sprite_height = rows * frame_height
# Create sprite sheet
sprite_img = np.zeros((sprite_height, sprite_width, 3), dtype=np.uint8)
# Create WebVTT content
webvtt_content = ["WEBVTT", ""]
# Place frames in sprite sheet and create WebVTT entries
for i, (frame_path, timestamp) in enumerate(zip(frame_paths, timestamps)):
for i, (frame_path, timestamp) in enumerate(zip(frame_paths, timestamps, strict=False)):
frame = cv2.imread(str(frame_path))
if frame is None:
continue
# Calculate position in sprite
col = i % cols
row = i // cols
x_start = col * frame_width
y_start = row * frame_height
x_end = x_start + frame_width
y_end = y_start + frame_height
# Place frame in sprite
sprite_img[y_start:y_end, x_start:x_end] = frame
# Create WebVTT entry
start_time = f"{timestamp//3600:02d}:{(timestamp%3600)//60:02d}:{timestamp%60:02d}.000"
end_time = f"{(timestamp+1)//3600:02d}:{((timestamp+1)%3600)//60:02d}:{(timestamp+1)%60:02d}.000"
webvtt_content.extend([
f"{start_time} --> {end_time}",
f"{sprite_file.name}#xywh={x_start},{y_start},{frame_width},{frame_height}",
""
])
# Save sprite sheet
cv2.imwrite(str(sprite_file), sprite_img, [cv2.IMWRITE_JPEG_QUALITY, 85])
# Save WebVTT file
with open(webvtt_file, 'w') as f:
f.write('\n'.join(webvtt_content))
f.write('\n'.join(webvtt_content))

View File

@ -0,0 +1,190 @@
"""
Procrastinate version compatibility layer.
This module provides compatibility between Procrastinate 2.x and 3.x versions,
allowing the codebase to work with both versions during the migration period.
"""
from typing import Any
import procrastinate
def get_procrastinate_version() -> tuple[int, int, int]:
"""Get the current Procrastinate version."""
version_str = procrastinate.__version__
# Handle version strings like "3.0.0", "3.0.0a1", etc.
version_parts = version_str.split('.')
major = int(version_parts[0])
minor = int(version_parts[1])
# Handle patch versions with alpha/beta suffixes
patch_str = version_parts[2] if len(version_parts) > 2 else "0"
patch = int(''.join(c for c in patch_str if c.isdigit()) or "0")
return (major, minor, patch)
# Check Procrastinate version for compatibility
PROCRASTINATE_VERSION = get_procrastinate_version()
IS_PROCRASTINATE_3_PLUS = PROCRASTINATE_VERSION[0] >= 3
def get_connector_class():
"""Get the appropriate connector class based on Procrastinate version."""
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x
try:
from procrastinate import PsycopgConnector
return PsycopgConnector
except ImportError:
# Fall back to AiopgConnector if PsycopgConnector not available
from procrastinate import AiopgConnector
return AiopgConnector
else:
# Procrastinate 2.x
from procrastinate import AiopgConnector
return AiopgConnector
def create_connector(database_url: str, **kwargs):
"""Create a database connector compatible with the current Procrastinate version."""
connector_class = get_connector_class()
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x uses different parameter names
if connector_class.__name__ == "PsycopgConnector":
# PsycopgConnector uses 'conninfo' parameter (preferred in 3.5.x)
# Default to better pool settings for 3.5.2
default_kwargs = {
"pool_size": 10,
"max_pool_size": 20,
}
default_kwargs.update(kwargs)
return connector_class(conninfo=database_url, **default_kwargs)
else:
# AiopgConnector fallback
return connector_class(conninfo=database_url, **kwargs)
else:
# Procrastinate 2.x (legacy support)
return connector_class(conninfo=database_url, **kwargs)
def create_app_with_connector(database_url: str, **connector_kwargs) -> procrastinate.App:
"""Create a Procrastinate App with the appropriate connector."""
connector = create_connector(database_url, **connector_kwargs)
return procrastinate.App(connector=connector)
class CompatJobContext:
"""
Job context compatibility wrapper to handle differences between versions.
"""
def __init__(self, job_context):
self._context = job_context
self._version = PROCRASTINATE_VERSION
def should_abort(self) -> bool:
"""Check if the job should abort (compatible across versions)."""
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x
return self._context.should_abort()
else:
# Procrastinate 2.x
if hasattr(self._context, 'should_abort'):
return self._context.should_abort()
else:
# Fallback for older versions
return False
async def should_abort_async(self) -> bool:
"""Check if the job should abort asynchronously."""
if IS_PROCRASTINATE_3_PLUS:
# In 3.x, should_abort() works for both sync and async
return self.should_abort()
else:
# Procrastinate 2.x
if hasattr(self._context, 'should_abort_async'):
return await self._context.should_abort_async()
else:
return self.should_abort()
@property
def job(self):
"""Access the job object."""
return self._context.job
@property
def task(self):
"""Access the task object."""
return self._context.task
def __getattr__(self, name):
"""Delegate other attributes to the wrapped context."""
return getattr(self._context, name)
def get_migration_commands() -> dict[str, str]:
"""Get migration commands for the current Procrastinate version."""
if IS_PROCRASTINATE_3_PLUS:
return {
"pre_migrate": "procrastinate schema --apply --mode=pre",
"post_migrate": "procrastinate schema --apply --mode=post",
"check": "procrastinate schema --check",
}
else:
return {
"migrate": "procrastinate schema --apply",
"check": "procrastinate schema --check",
}
def get_worker_options_mapping() -> dict[str, str]:
"""Get mapping of worker options between versions."""
if IS_PROCRASTINATE_3_PLUS:
return {
"timeout": "fetch_job_polling_interval", # Renamed in 3.x
"remove_error": "remove_failed", # Renamed in 3.x
"include_error": "include_failed", # Renamed in 3.x
}
else:
return {
"timeout": "timeout",
"remove_error": "remove_error",
"include_error": "include_error",
}
def normalize_worker_kwargs(**kwargs) -> dict[str, Any]:
"""Normalize worker keyword arguments for the current version."""
mapping = get_worker_options_mapping()
normalized = {}
for key, value in kwargs.items():
# Map old names to new names if needed
normalized_key = mapping.get(key, key)
normalized[normalized_key] = value
return normalized
# Version-specific feature flags
FEATURES = {
"graceful_shutdown": IS_PROCRASTINATE_3_PLUS,
"job_cancellation": IS_PROCRASTINATE_3_PLUS,
"pre_post_migrations": IS_PROCRASTINATE_3_PLUS,
"psycopg3_support": IS_PROCRASTINATE_3_PLUS,
"improved_performance": PROCRASTINATE_VERSION >= (3, 5, 0), # Performance improvements in 3.5+
"schema_compatibility": PROCRASTINATE_VERSION >= (3, 5, 2), # Better schema support in 3.5.2
"enhanced_indexing": PROCRASTINATE_VERSION >= (3, 5, 0), # Improved indexes in 3.5+
}
def get_version_info() -> dict[str, Any]:
"""Get version and feature information."""
return {
"procrastinate_version": procrastinate.__version__,
"version_tuple": PROCRASTINATE_VERSION,
"is_v3_plus": IS_PROCRASTINATE_3_PLUS,
"features": FEATURES,
"migration_commands": get_migration_commands(),
}

View File

@ -0,0 +1,253 @@
"""
Procrastinate migration utilities for upgrading from 2.x to 3.x.
This module provides utilities to help with database migrations and
version compatibility during the upgrade process.
"""
import logging
import subprocess
import sys
from .compat import (
IS_PROCRASTINATE_3_PLUS,
get_migration_commands,
get_version_info,
)
logger = logging.getLogger(__name__)
class ProcrastinateMigrationHelper:
"""Helper class for managing Procrastinate migrations."""
def __init__(self, database_url: str):
self.database_url = database_url
self.version_info = get_version_info()
def get_migration_steps(self) -> list[str]:
"""Get the migration steps for the current version."""
commands = get_migration_commands()
if IS_PROCRASTINATE_3_PLUS:
return [
"1. Apply pre-migrations before deploying new code",
f" Command: {commands['pre_migrate']}",
"2. Deploy new application code",
"3. Apply post-migrations after deployment",
f" Command: {commands['post_migrate']}",
"4. Verify schema is current",
f" Command: {commands['check']}",
]
else:
return [
"1. Apply database migrations",
f" Command: {commands['migrate']}",
"2. Verify schema is current",
f" Command: {commands['check']}",
]
def print_migration_plan(self) -> None:
"""Print the migration plan for the current version."""
print(f"Procrastinate Migration Plan (v{self.version_info['procrastinate_version']})")
print("=" * 60)
for step in self.get_migration_steps():
print(step)
print("\nVersion Info:")
print(f" Current Version: {self.version_info['procrastinate_version']}")
print(f" Is 3.x+: {self.version_info['is_v3_plus']}")
print(f" Features Available: {list(self.version_info['features'].keys())}")
def run_migration_command(self, command: str) -> bool:
"""
Run a migration command.
Args:
command: The command to run
Returns:
True if successful, False otherwise
"""
try:
logger.info(f"Running migration command: {command}")
# Set environment variable for database URL
env = {"PROCRASTINATE_DATABASE_URL": self.database_url}
result = subprocess.run(
command.split(),
env={**dict(sys.environ), **env},
capture_output=True,
text=True,
check=True
)
if result.stdout:
logger.info(f"Migration output: {result.stdout}")
logger.info("Migration command completed successfully")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Migration command failed: {e}")
if e.stdout:
logger.error(f"stdout: {e.stdout}")
if e.stderr:
logger.error(f"stderr: {e.stderr}")
return False
def apply_pre_migration(self) -> bool:
"""Apply pre-migration for Procrastinate 3.x."""
if not IS_PROCRASTINATE_3_PLUS:
logger.warning("Pre-migration only applicable to Procrastinate 3.x+")
return True
commands = get_migration_commands()
return self.run_migration_command(commands["pre_migrate"])
def apply_post_migration(self) -> bool:
"""Apply post-migration for Procrastinate 3.x."""
if not IS_PROCRASTINATE_3_PLUS:
logger.warning("Post-migration only applicable to Procrastinate 3.x+")
return True
commands = get_migration_commands()
return self.run_migration_command(commands["post_migrate"])
def apply_legacy_migration(self) -> bool:
"""Apply legacy migration for Procrastinate 2.x."""
if IS_PROCRASTINATE_3_PLUS:
logger.warning("Legacy migration only applicable to Procrastinate 2.x")
return True
commands = get_migration_commands()
return self.run_migration_command(commands["migrate"])
def check_schema(self) -> bool:
"""Check if the database schema is current."""
commands = get_migration_commands()
return self.run_migration_command(commands["check"])
async def migrate_database(
database_url: str,
pre_migration_only: bool = False,
post_migration_only: bool = False,
) -> bool:
"""
Migrate the Procrastinate database schema.
Args:
database_url: Database connection string
pre_migration_only: Only apply pre-migration (for 3.x)
post_migration_only: Only apply post-migration (for 3.x)
Returns:
True if successful, False otherwise
"""
helper = ProcrastinateMigrationHelper(database_url)
logger.info("Starting Procrastinate database migration")
helper.print_migration_plan()
try:
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x migration process
if pre_migration_only:
success = helper.apply_pre_migration()
elif post_migration_only:
success = helper.apply_post_migration()
else:
# Apply both pre and post migrations
logger.warning(
"Applying both pre and post migrations. "
"In production, these should be run separately!"
)
success = (
helper.apply_pre_migration() and
helper.apply_post_migration()
)
else:
# Procrastinate 2.x migration process
success = helper.apply_legacy_migration()
if success:
# Verify schema is current
success = helper.check_schema()
if success:
logger.info("Database migration completed successfully")
else:
logger.error("Database migration failed")
return success
except Exception as e:
logger.error(f"Migration error: {e}")
return False
def create_migration_script() -> str:
"""Create a migration script for the current environment."""
version_info = get_version_info()
script = f"""#!/usr/bin/env python3
\"\"\"
Procrastinate migration script for version {version_info['procrastinate_version']}
This script helps migrate your Procrastinate database schema.
\"\"\"
import asyncio
import os
import sys
# Add the project root to Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from video_processor.tasks.migration import migrate_database
async def main():
database_url = os.environ.get(
'PROCRASTINATE_DATABASE_URL',
'postgresql://localhost/procrastinate_dev'
)
print(f"Migrating database: {{database_url}}")
# Parse command line arguments
pre_only = '--pre' in sys.argv
post_only = '--post' in sys.argv
success = await migrate_database(
database_url=database_url,
pre_migration_only=pre_only,
post_migration_only=post_only,
)
if not success:
print("Migration failed!")
sys.exit(1)
print("Migration completed successfully!")
if __name__ == "__main__":
asyncio.run(main())
"""
return script
if __name__ == "__main__":
# Generate migration script when run directly
script_content = create_migration_script()
with open("migrate_procrastinate.py", "w") as f:
f.write(script_content)
print("Generated migration script: migrate_procrastinate.py")
print("Run with: python migrate_procrastinate.py [--pre|--post]")

View File

@ -8,6 +8,11 @@ from procrastinate import App
from ..config import ProcessorConfig
from ..core.processor import VideoProcessor
from ..exceptions import VideoProcessorError
from .compat import (
create_app_with_connector,
get_version_info,
normalize_worker_kwargs,
)
logger = logging.getLogger(__name__)
@ -15,24 +20,45 @@ logger = logging.getLogger(__name__)
app = App(connector=None) # Connector will be set during setup
def setup_procrastinate(database_url: str) -> App:
def setup_procrastinate(
database_url: str,
connector_kwargs: dict | None = None,
) -> App:
"""
Set up Procrastinate with database connection.
Args:
database_url: PostgreSQL connection string
connector_kwargs: Additional connector configuration
Returns:
Configured Procrastinate app
"""
from procrastinate import AiopgConnector
connector_kwargs = connector_kwargs or {}
connector = AiopgConnector(conninfo=database_url)
app.connector = connector
# Use compatibility layer to create app with appropriate connector
configured_app = create_app_with_connector(database_url, **connector_kwargs)
# Update the global app instance
app.connector = configured_app.connector
logger.info(f"Procrastinate setup complete. Version info: {get_version_info()}")
return app
def get_worker_kwargs(**kwargs) -> dict:
"""
Get normalized worker kwargs for the current Procrastinate version.
Args:
**kwargs: Worker configuration options
Returns:
Normalized kwargs for the current version
"""
return normalize_worker_kwargs(**kwargs)
@app.task(queue="video_processing")
def process_video_async(
input_path: str,

View File

@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""
Worker compatibility module for Procrastinate 2.x and 3.x.
Provides a unified worker interface that works across different Procrastinate versions.
"""
import asyncio
import logging
import os
import sys
from typing import Optional
from .compat import (
IS_PROCRASTINATE_3_PLUS,
create_app_with_connector,
get_version_info,
map_worker_options,
)
logger = logging.getLogger(__name__)
def setup_worker_app(database_url: str, connector_kwargs: Optional[dict] = None):
"""Set up Procrastinate app for worker usage."""
connector_kwargs = connector_kwargs or {}
# Create app with proper connector
app = create_app_with_connector(database_url, **connector_kwargs)
# Import tasks to register them
from . import procrastinate_tasks # noqa: F401
logger.info(f"Worker app setup complete. {get_version_info()}")
return app
async def run_worker_async(
database_url: str,
queues: Optional[list[str]] = None,
concurrency: int = 1,
**worker_kwargs,
):
"""Run Procrastinate worker with version compatibility."""
logger.info(f"Starting Procrastinate worker (v{get_version_info()['procrastinate_version']})")
# Set up the app
app = setup_worker_app(database_url)
# Map worker options for compatibility
mapped_options = map_worker_options(worker_kwargs)
# Default queues
if queues is None:
queues = ["video_processing", "thumbnail_generation", "default"]
logger.info(f"Worker config: queues={queues}, concurrency={concurrency}")
logger.info(f"Worker options: {mapped_options}")
try:
if IS_PROCRASTINATE_3_PLUS:
# Procrastinate 3.x worker
async with app.open_async() as app_context:
worker = app_context.make_worker(
queues=queues,
concurrency=concurrency,
**mapped_options,
)
await worker.async_run()
else:
# Procrastinate 2.x worker
worker = app.make_worker(
queues=queues,
concurrency=concurrency,
**mapped_options,
)
await worker.async_run()
except KeyboardInterrupt:
logger.info("Worker stopped by user")
except Exception as e:
logger.error(f"Worker error: {e}")
raise
def run_worker_sync(
database_url: str,
queues: Optional[list[str]] = None,
concurrency: int = 1,
**worker_kwargs,
):
"""Synchronous wrapper for running the worker."""
try:
asyncio.run(
run_worker_async(
database_url=database_url,
queues=queues,
concurrency=concurrency,
**worker_kwargs,
)
)
except KeyboardInterrupt:
logger.info("Worker interrupted")
sys.exit(0)
def main():
"""Main entry point for worker CLI."""
import argparse
parser = argparse.ArgumentParser(description="Procrastinate Worker")
parser.add_argument("command", choices=["worker"], help="Command to run")
parser.add_argument(
"--database-url",
default=os.environ.get("PROCRASTINATE_DATABASE_URL"),
help="Database URL",
)
parser.add_argument(
"--queues",
nargs="*",
default=["video_processing", "thumbnail_generation", "default"],
help="Queue names to process",
)
parser.add_argument(
"--concurrency",
type=int,
default=int(os.environ.get("WORKER_CONCURRENCY", "1")),
help="Worker concurrency",
)
parser.add_argument(
"--timeout",
type=int,
default=int(os.environ.get("WORKER_TIMEOUT", "300")),
help="Worker timeout (maps to fetch_job_polling_interval in 3.x)",
)
args = parser.parse_args()
if not args.database_url:
logger.error("Database URL is required (--database-url or PROCRASTINATE_DATABASE_URL)")
sys.exit(1)
logger.info(f"Starting {args.command} with database: {args.database_url}")
if args.command == "worker":
run_worker_sync(
database_url=args.database_url,
queues=args.queues,
concurrency=args.concurrency,
timeout=args.timeout,
)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
main()

View File

@ -0,0 +1,184 @@
"""Custom sprite generator that fixes msprites2 ImageMagick compatibility issues."""
import logging
import os
import subprocess
import time
from pathlib import Path
logger = logging.getLogger(__name__)
class FixedSpriteGenerator:
"""Fixed sprite generator with proper ImageMagick compatibility."""
def __init__(
self,
video_path: str | Path,
thumbnail_dir: str | Path,
ips: float = 1.0,
width: int = 160,
height: int = 90,
cols: int = 10,
rows: int = 10,
):
self.video_path = str(video_path)
self.thumbnail_dir = str(thumbnail_dir)
self.ips = ips
self.width = width
self.height = height
self.cols = cols
self.rows = rows
self.filename_format = "%04d.jpg"
# Create thumbnail directory if it doesn't exist
Path(self.thumbnail_dir).mkdir(parents=True, exist_ok=True)
def generate_thumbnails(self) -> None:
"""Generate individual thumbnail frames using ffmpeg."""
output_pattern = os.path.join(self.thumbnail_dir, self.filename_format)
# Use ffmpeg to extract thumbnails
cmd = [
"ffmpeg", "-loglevel", "error", "-i", self.video_path,
"-r", f"1/{self.ips}",
"-vf", f"scale={self.width}:{self.height}",
"-y", # Overwrite existing files
output_pattern
]
logger.debug(f"Generating thumbnails with: {' '.join(cmd)}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
def generate_sprite(self, sprite_file: str | Path) -> Path:
"""Generate sprite sheet using ImageMagick montage."""
sprite_file = Path(sprite_file)
# Count available thumbnails
thumbnail_files = list(Path(self.thumbnail_dir).glob("*.jpg"))
if not thumbnail_files:
raise RuntimeError("No thumbnail files found to create sprite")
# Sort thumbnails by name to ensure correct order
thumbnail_files.sort()
# Limit number of thumbnails to avoid command line length issues
max_thumbnails = min(len(thumbnail_files), 100) # Limit to 100 thumbnails
thumbnail_files = thumbnail_files[:max_thumbnails]
# Build montage command with correct syntax
cmd = [
"magick", "montage",
"-background", "#336699",
"-tile", f"{self.cols}x{self.rows}",
"-geometry", f"{self.width}x{self.height}+0+0",
]
# Add thumbnail files
cmd.extend(str(f) for f in thumbnail_files)
cmd.append(str(sprite_file))
logger.debug(f"Generating sprite with {len(thumbnail_files)} thumbnails: {sprite_file}")
result = subprocess.run(cmd, check=False)
if result.returncode != 0:
raise RuntimeError(f"ImageMagick montage failed with return code {result.returncode}")
return sprite_file
def generate_webvtt(self, webvtt_file: str | Path, sprite_filename: str) -> Path:
"""Generate WebVTT file for seekbar thumbnails."""
webvtt_file = Path(webvtt_file)
# Count thumbnail files to determine timeline
thumbnail_files = list(Path(self.thumbnail_dir).glob("*.jpg"))
thumbnail_files.sort()
content_lines = ["WEBVTT\n\n"]
for i, _ in enumerate(thumbnail_files):
start_time = i * self.ips
end_time = (i + 1) * self.ips
# Calculate position in sprite grid
row = i // self.cols
col = i % self.cols
x = col * self.width
y = row * self.height
# Format timestamps
start_ts = self._seconds_to_timestamp(start_time)
end_ts = self._seconds_to_timestamp(end_time)
content_lines.extend([
f"{start_ts} --> {end_ts}\n",
f"{sprite_filename}#xywh={x},{y},{self.width},{self.height}\n\n"
])
# Write WebVTT content
with open(webvtt_file, 'w') as f:
f.writelines(content_lines)
return webvtt_file
def _seconds_to_timestamp(self, seconds: float) -> str:
"""Convert seconds to WebVTT timestamp format."""
return time.strftime("%H:%M:%S", time.gmtime(seconds))
def cleanup_thumbnails(self) -> None:
"""Remove temporary thumbnail files."""
try:
thumbnail_files = list(Path(self.thumbnail_dir).glob("*.jpg"))
for thumb_file in thumbnail_files:
thumb_file.unlink()
# Remove directory if empty
thumb_dir = Path(self.thumbnail_dir)
if thumb_dir.exists() and not any(thumb_dir.iterdir()):
thumb_dir.rmdir()
except Exception as e:
logger.warning(f"Failed to cleanup thumbnails: {e}")
@classmethod
def create_sprite_sheet(
cls,
video_path: str | Path,
thumbnail_dir: str | Path,
sprite_file: str | Path,
webvtt_file: str | Path,
ips: float = 1.0,
width: int = 160,
height: int = 90,
cols: int = 10,
rows: int = 10,
cleanup: bool = True,
) -> tuple[Path, Path]:
"""
Complete sprite sheet generation process.
Returns:
Tuple of (sprite_file_path, webvtt_file_path)
"""
generator = cls(
video_path=video_path,
thumbnail_dir=thumbnail_dir,
ips=ips,
width=width,
height=height,
cols=cols,
rows=rows,
)
# Generate components
generator.generate_thumbnails()
sprite_path = generator.generate_sprite(sprite_file)
webvtt_path = generator.generate_webvtt(webvtt_file, Path(sprite_file).name)
# Cleanup temporary thumbnails if requested (but not the final sprite/webvtt)
if cleanup:
generator.cleanup_thumbnails()
return sprite_path, webvtt_path

View File

@ -1,6 +1,5 @@
"""360° video detection and utility functions."""
from pathlib import Path
from typing import Any, Literal
# Optional dependency handling
@ -38,7 +37,7 @@ StereoMode = Literal["mono", "top-bottom", "left-right", "unknown"]
class Video360Detection:
"""Utilities for detecting and analyzing 360° videos."""
@staticmethod
def detect_360_video(video_metadata: dict[str, Any]) -> dict[str, Any]:
"""
@ -57,7 +56,7 @@ class Video360Detection:
"confidence": 0.0,
"detection_methods": [],
}
# Check for spherical video metadata (Google/YouTube standard)
spherical_metadata = Video360Detection._check_spherical_metadata(video_metadata)
if spherical_metadata["found"]:
@ -68,7 +67,7 @@ class Video360Detection:
"confidence": 1.0,
})
detection_result["detection_methods"].append("spherical_metadata")
# Check aspect ratio for equirectangular projection
aspect_ratio_check = Video360Detection._check_aspect_ratio(video_metadata)
if aspect_ratio_check["is_likely_360"]:
@ -79,7 +78,7 @@ class Video360Detection:
"confidence": aspect_ratio_check["confidence"],
})
detection_result["detection_methods"].append("aspect_ratio")
# Check filename patterns
filename_check = Video360Detection._check_filename_patterns(video_metadata)
if filename_check["is_likely_360"]:
@ -90,9 +89,9 @@ class Video360Detection:
"confidence": filename_check["confidence"],
})
detection_result["detection_methods"].append("filename")
return detection_result
@staticmethod
def _check_spherical_metadata(metadata: dict[str, Any]) -> dict[str, Any]:
"""Check for spherical video metadata tags."""
@ -101,14 +100,14 @@ class Video360Detection:
"projection_type": "equirectangular",
"stereo_mode": "mono",
}
# Check format tags for spherical metadata
format_tags = metadata.get("format", {}).get("tags", {})
# Google spherical video standard
if "spherical" in format_tags:
result["found"] = True
# Check for specific spherical video tags
spherical_indicators = [
"Spherical",
@ -117,11 +116,11 @@ class Video360Detection:
"ProjectionType",
"projection_type",
]
for tag_name, tag_value in format_tags.items():
if any(indicator.lower() in tag_name.lower() for indicator in spherical_indicators):
result["found"] = True
# Determine projection type from metadata
if isinstance(tag_value, str):
tag_lower = tag_value.lower()
@ -129,7 +128,7 @@ class Video360Detection:
result["projection_type"] = "equirectangular"
elif "cubemap" in tag_lower:
result["projection_type"] = "cubemap"
# Check for stereo mode indicators
stereo_indicators = ["StereoMode", "stereo_mode", "StereoscopicMode"]
for tag_name, tag_value in format_tags.items():
@ -140,9 +139,9 @@ class Video360Detection:
result["stereo_mode"] = "top-bottom"
elif "left-right" in tag_lower or "lr" in tag_lower:
result["stereo_mode"] = "left-right"
return result
@staticmethod
def _check_aspect_ratio(metadata: dict[str, Any]) -> dict[str, Any]:
"""Check if aspect ratio suggests 360° video."""
@ -150,28 +149,28 @@ class Video360Detection:
"is_likely_360": False,
"confidence": 0.0,
}
video_info = metadata.get("video", {})
if not video_info:
return result
width = video_info.get("width", 0)
height = video_info.get("height", 0)
if width <= 0 or height <= 0:
return result
aspect_ratio = width / height
# Equirectangular videos typically have 2:1 aspect ratio
if 1.9 <= aspect_ratio <= 2.1:
result["is_likely_360"] = True
result["confidence"] = 0.8
# Higher confidence for exact 2:1 ratio
if 1.98 <= aspect_ratio <= 2.02:
result["confidence"] = 0.9
# Some 360° videos use different aspect ratios
elif 1.5 <= aspect_ratio <= 2.5:
# Common resolutions for 360° video
@ -182,16 +181,16 @@ class Video360Detection:
(4096, 2048), # Cinema 4K 360°
(5760, 2880), # 6K 360°
]
for res_width, res_height in common_360_resolutions:
if (width == res_width and height == res_height) or \
(width == res_height and height == res_width):
result["is_likely_360"] = True
result["confidence"] = 0.7
break
return result
@staticmethod
def _check_filename_patterns(metadata: dict[str, Any]) -> dict[str, Any]:
"""Check filename for 360° indicators."""
@ -200,31 +199,31 @@ class Video360Detection:
"projection_type": "equirectangular",
"confidence": 0.0,
}
filename = metadata.get("filename", "").lower()
if not filename:
return result
# Common 360° filename patterns
patterns_360 = [
"360", "vr", "spherical", "equirectangular",
"360", "vr", "spherical", "equirectangular",
"panoramic", "immersive", "omnidirectional"
]
# Projection type patterns
projection_patterns = {
"equirectangular": ["equirect", "equi", "spherical"],
"cubemap": ["cube", "cubemap", "cubic"],
"cylindrical": ["cylindrical", "cylinder"],
}
# Check for 360° indicators
for pattern in patterns_360:
if pattern in filename:
result["is_likely_360"] = True
result["confidence"] = 0.6
break
# Check for specific projection types
if result["is_likely_360"]:
for projection, patterns in projection_patterns.items():
@ -232,13 +231,13 @@ class Video360Detection:
result["projection_type"] = projection
result["confidence"] = 0.7
break
return result
class Video360Utils:
"""Utility functions for 360° video processing."""
@staticmethod
def get_recommended_bitrate_multiplier(projection_type: ProjectionType) -> float:
"""
@ -260,9 +259,9 @@ class Video360Utils:
"stereographic": 2.2, # Good balance
"unknown": 2.0, # Safe default
}
return multipliers.get(projection_type, 2.0)
@staticmethod
def get_optimal_resolutions(projection_type: ProjectionType) -> list[tuple[int, int]]:
"""
@ -290,29 +289,29 @@ class Video360Utils:
(4096, 4096), # 4K per face
],
}
return resolutions.get(projection_type, resolutions["equirectangular"])
@staticmethod
def is_360_library_available() -> bool:
"""Check if 360° processing libraries are available."""
return HAS_360_SUPPORT
@staticmethod
def get_missing_dependencies() -> list[str]:
"""Get list of missing dependencies for 360° processing."""
missing = []
if not HAS_OPENCV:
missing.append("opencv-python")
if not HAS_NUMPY:
missing.append("numpy")
if not HAS_PY360CONVERT:
missing.append("py360convert")
if not HAS_EXIFREAD:
missing.append("exifread")
return missing
return missing

View File

@ -0,0 +1,314 @@
"""Tests for Procrastinate compatibility layer."""
import pytest
from video_processor.tasks.compat import (
CompatJobContext,
FEATURES,
IS_PROCRASTINATE_3_PLUS,
PROCRASTINATE_VERSION,
create_app_with_connector,
create_connector,
get_migration_commands,
get_procrastinate_version,
get_version_info,
get_worker_options_mapping,
normalize_worker_kwargs,
)
class TestProcrastinateVersionDetection:
"""Test version detection functionality."""
def test_version_parsing(self):
"""Test version string parsing."""
version = get_procrastinate_version()
assert isinstance(version, tuple)
assert len(version) == 3
assert all(isinstance(v, int) for v in version)
assert version[0] >= 2 # Should be at least version 2.x
def test_version_flags(self):
"""Test version-specific flags."""
assert isinstance(IS_PROCRASTINATE_3_PLUS, bool)
assert isinstance(PROCRASTINATE_VERSION, tuple)
if PROCRASTINATE_VERSION[0] >= 3:
assert IS_PROCRASTINATE_3_PLUS is True
else:
assert IS_PROCRASTINATE_3_PLUS is False
def test_version_info(self):
"""Test version info structure."""
info = get_version_info()
required_keys = {
"procrastinate_version",
"version_tuple",
"is_v3_plus",
"features",
"migration_commands",
}
assert set(info.keys()) == required_keys
assert isinstance(info["version_tuple"], tuple)
assert isinstance(info["is_v3_plus"], bool)
assert isinstance(info["features"], dict)
assert isinstance(info["migration_commands"], dict)
def test_features(self):
"""Test feature flags."""
assert isinstance(FEATURES, dict)
expected_features = {
"graceful_shutdown",
"job_cancellation",
"pre_post_migrations",
"psycopg3_support",
"improved_performance",
"schema_compatibility",
"enhanced_indexing",
}
assert set(FEATURES.keys()) == expected_features
assert all(isinstance(v, bool) for v in FEATURES.values())
class TestConnectorCreation:
"""Test connector creation functionality."""
def test_connector_class_selection(self):
"""Test that appropriate connector class is selected."""
from video_processor.tasks.compat import get_connector_class
connector_class = get_connector_class()
assert connector_class is not None
assert hasattr(connector_class, "__name__")
if IS_PROCRASTINATE_3_PLUS:
# Should prefer PsycopgConnector in 3.x
assert connector_class.__name__ in ["PsycopgConnector", "AiopgConnector"]
else:
assert connector_class.__name__ == "AiopgConnector"
def test_connector_creation(self):
"""Test connector creation with various parameters."""
database_url = "postgresql://test:test@localhost/test"
# Test basic creation
connector = create_connector(database_url)
assert connector is not None
# Test with additional kwargs
connector_with_kwargs = create_connector(
database_url,
pool_size=5,
max_pool_size=10,
)
assert connector_with_kwargs is not None
def test_app_creation(self):
"""Test Procrastinate app creation."""
database_url = "postgresql://test:test@localhost/test"
app = create_app_with_connector(database_url)
assert app is not None
assert hasattr(app, 'connector')
assert app.connector is not None
class TestWorkerOptions:
"""Test worker options compatibility."""
def test_option_mapping(self):
"""Test worker option mapping between versions."""
mapping = get_worker_options_mapping()
assert isinstance(mapping, dict)
if IS_PROCRASTINATE_3_PLUS:
expected_mappings = {
"timeout": "fetch_job_polling_interval",
"remove_error": "remove_failed",
"include_error": "include_failed",
}
assert mapping == expected_mappings
else:
# In 2.x, mappings should be identity
assert mapping["timeout"] == "timeout"
assert mapping["remove_error"] == "remove_error"
def test_kwargs_normalization(self):
"""Test worker kwargs normalization."""
test_kwargs = {
"concurrency": 4,
"timeout": 5,
"remove_error": True,
"include_error": False,
"name": "test-worker",
}
normalized = normalize_worker_kwargs(**test_kwargs)
assert isinstance(normalized, dict)
assert normalized["concurrency"] == 4
assert normalized["name"] == "test-worker"
if IS_PROCRASTINATE_3_PLUS:
assert "fetch_job_polling_interval" in normalized
assert "remove_failed" in normalized
assert "include_failed" in normalized
assert normalized["fetch_job_polling_interval"] == 5
assert normalized["remove_failed"] is True
assert normalized["include_failed"] is False
else:
assert normalized["timeout"] == 5
assert normalized["remove_error"] is True
assert normalized["include_error"] is False
def test_kwargs_passthrough(self):
"""Test that unknown kwargs are passed through unchanged."""
test_kwargs = {
"custom_option": "value",
"another_option": 42,
}
normalized = normalize_worker_kwargs(**test_kwargs)
assert normalized == test_kwargs
class TestMigrationCommands:
"""Test migration command generation."""
def test_migration_commands_structure(self):
"""Test migration command structure."""
commands = get_migration_commands()
assert isinstance(commands, dict)
if IS_PROCRASTINATE_3_PLUS:
expected_keys = {"pre_migrate", "post_migrate", "check"}
assert set(commands.keys()) == expected_keys
assert "procrastinate schema --apply --mode=pre" in commands["pre_migrate"]
assert "procrastinate schema --apply --mode=post" in commands["post_migrate"]
else:
expected_keys = {"migrate", "check"}
assert set(commands.keys()) == expected_keys
assert "procrastinate schema --apply" == commands["migrate"]
assert "procrastinate schema --check" == commands["check"]
class TestJobContextCompat:
"""Test job context compatibility wrapper."""
def test_compat_context_creation(self):
"""Test creation of compatibility context."""
# Create a mock context object
class MockContext:
def __init__(self):
self.job = "mock_job"
self.task = "mock_task"
def should_abort(self):
return False
async def should_abort_async(self):
return False
mock_context = MockContext()
compat_context = CompatJobContext(mock_context)
assert compat_context is not None
assert compat_context.job == "mock_job"
assert compat_context.task == "mock_task"
def test_should_abort_methods(self):
"""Test should_abort method compatibility."""
class MockContext:
def should_abort(self):
return True
async def should_abort_async(self):
return True
mock_context = MockContext()
compat_context = CompatJobContext(mock_context)
# Test synchronous method
assert compat_context.should_abort() is True
@pytest.mark.asyncio
async def test_should_abort_async(self):
"""Test async should_abort method."""
class MockContext:
def should_abort(self):
return True
async def should_abort_async(self):
return True
mock_context = MockContext()
compat_context = CompatJobContext(mock_context)
# Test asynchronous method
result = await compat_context.should_abort_async()
assert result is True
def test_attribute_delegation(self):
"""Test that unknown attributes are delegated to wrapped context."""
class MockContext:
def __init__(self):
self.custom_attr = "custom_value"
def custom_method(self):
return "custom_result"
mock_context = MockContext()
compat_context = CompatJobContext(mock_context)
assert compat_context.custom_attr == "custom_value"
assert compat_context.custom_method() == "custom_result"
class TestIntegration:
"""Integration tests for compatibility features."""
def test_full_compatibility_workflow(self):
"""Test complete compatibility workflow."""
# Get version info
version_info = get_version_info()
assert version_info["is_v3_plus"] == IS_PROCRASTINATE_3_PLUS
# Test worker options
worker_kwargs = normalize_worker_kwargs(
concurrency=2,
timeout=10,
remove_error=False,
)
assert "concurrency" in worker_kwargs
# Test migration commands
migration_commands = get_migration_commands()
assert "check" in migration_commands
if IS_PROCRASTINATE_3_PLUS:
assert "pre_migrate" in migration_commands
assert "post_migrate" in migration_commands
else:
assert "migrate" in migration_commands
def test_version_specific_behavior(self):
"""Test that version-specific behavior is consistent."""
version_info = get_version_info()
if version_info["is_v3_plus"]:
# Test 3.x specific features
assert FEATURES["graceful_shutdown"] is True
assert FEATURES["job_cancellation"] is True
assert FEATURES["pre_post_migrations"] is True
else:
# Test 2.x behavior
assert FEATURES["graceful_shutdown"] is False
assert FEATURES["job_cancellation"] is False
assert FEATURES["pre_post_migrations"] is False

View File

@ -0,0 +1,216 @@
"""Tests for Procrastinate migration utilities."""
import pytest
from video_processor.tasks.migration import ProcrastinateMigrationHelper, create_migration_script
from video_processor.tasks.compat import IS_PROCRASTINATE_3_PLUS
class TestProcrastinateMigrationHelper:
"""Test migration helper functionality."""
def test_migration_helper_creation(self):
"""Test migration helper initialization."""
database_url = "postgresql://test:test@localhost/test"
helper = ProcrastinateMigrationHelper(database_url)
assert helper.database_url == database_url
assert helper.version_info is not None
assert "procrastinate_version" in helper.version_info
def test_migration_steps_generation(self):
"""Test migration steps generation."""
helper = ProcrastinateMigrationHelper("postgresql://fake/db")
steps = helper.get_migration_steps()
assert isinstance(steps, list)
assert len(steps) > 0
if IS_PROCRASTINATE_3_PLUS:
# Should have pre/post migration steps
assert len(steps) >= 7 # Pre, deploy, post, verify
assert any("pre-migration" in step.lower() for step in steps)
assert any("post-migration" in step.lower() for step in steps)
else:
# Should have single migration step
assert len(steps) >= 2 # Migrate, verify
assert any("migration" in step.lower() for step in steps)
def test_print_migration_plan(self, capsys):
"""Test migration plan printing."""
helper = ProcrastinateMigrationHelper("postgresql://fake/db")
helper.print_migration_plan()
captured = capsys.readouterr()
assert "Procrastinate Migration Plan" in captured.out
assert "Version Info:" in captured.out
assert "Current Version:" in captured.out
def test_migration_command_structure(self):
"""Test that migration commands have correct structure."""
helper = ProcrastinateMigrationHelper("postgresql://fake/db")
# Test method availability
assert hasattr(helper, 'apply_pre_migration')
assert hasattr(helper, 'apply_post_migration')
assert hasattr(helper, 'apply_legacy_migration')
assert hasattr(helper, 'check_schema')
assert hasattr(helper, 'run_migration_command')
def test_migration_command_validation(self):
"""Test migration command validation without actually running."""
helper = ProcrastinateMigrationHelper("postgresql://fake/db")
# Test that methods return appropriate responses for invalid DB
if IS_PROCRASTINATE_3_PLUS:
# Pre-migration should be available
assert hasattr(helper, 'apply_pre_migration')
assert hasattr(helper, 'apply_post_migration')
else:
# Legacy migration should be available
assert hasattr(helper, 'apply_legacy_migration')
class TestMigrationScriptGeneration:
"""Test migration script generation."""
def test_script_generation(self):
"""Test that migration script is generated correctly."""
script_content = create_migration_script()
assert isinstance(script_content, str)
assert len(script_content) > 0
# Check for essential script components
assert "#!/usr/bin/env python3" in script_content
assert "Procrastinate migration script" in script_content
assert "migrate_database" in script_content
assert "asyncio" in script_content
# Check for command line argument handling
assert "--pre" in script_content or "--post" in script_content
def test_script_has_proper_structure(self):
"""Test that generated script has proper Python structure."""
script_content = create_migration_script()
# Should have proper Python script structure
lines = script_content.split('\n')
# Check shebang
assert lines[0] == "#!/usr/bin/env python3"
# Check for main function
assert 'def main():' in script_content
# Check for asyncio usage
assert 'asyncio.run(main())' in script_content
class TestMigrationWorkflow:
"""Test complete migration workflow scenarios."""
def test_version_aware_migration_selection(self):
"""Test that correct migration path is selected based on version."""
helper = ProcrastinateMigrationHelper("postgresql://fake/db")
if IS_PROCRASTINATE_3_PLUS:
# 3.x should use pre/post migrations
steps = helper.get_migration_steps()
step_text = ' '.join(steps).lower()
assert 'pre-migration' in step_text
assert 'post-migration' in step_text
else:
# 2.x should use legacy migration
steps = helper.get_migration_steps()
step_text = ' '.join(steps).lower()
assert 'migration' in step_text
assert 'pre-migration' not in step_text
def test_migration_helper_consistency(self):
"""Test that migration helper provides consistent information."""
helper = ProcrastinateMigrationHelper("postgresql://fake/db")
# Version info should be consistent
version_info = helper.version_info
steps = helper.get_migration_steps()
assert version_info["is_v3_plus"] == IS_PROCRASTINATE_3_PLUS
# Steps should match version
if version_info["is_v3_plus"]:
assert len(steps) > 4 # Should have multiple steps for 3.x
else:
assert len(steps) >= 2 # Should have basic steps for 2.x
@pytest.mark.asyncio
class TestAsyncMigration:
"""Test async migration functionality."""
async def test_migrate_database_function_exists(self):
"""Test that async migration function exists and is callable."""
from video_processor.tasks.migration import migrate_database
# Function should exist and be async
assert callable(migrate_database)
# Should handle invalid database gracefully (don't actually run)
# Just test that it exists and has the right signature
import inspect
sig = inspect.signature(migrate_database)
expected_params = ['database_url', 'pre_migration_only', 'post_migration_only']
actual_params = list(sig.parameters.keys())
for param in expected_params:
assert param in actual_params
class TestRegressionPrevention:
"""Tests to prevent regressions in migration functionality."""
def test_migration_helper_backwards_compatibility(self):
"""Ensure migration helper maintains backwards compatibility."""
helper = ProcrastinateMigrationHelper("postgresql://fake/db")
# Essential methods should always exist
required_methods = [
'get_migration_steps',
'print_migration_plan',
'run_migration_command',
'check_schema',
]
for method in required_methods:
assert hasattr(helper, method)
assert callable(getattr(helper, method))
def test_version_detection_stability(self):
"""Test that version detection is stable and predictable."""
from video_processor.tasks.compat import get_version_info, PROCRASTINATE_VERSION
info1 = get_version_info()
info2 = get_version_info()
# Should return consistent results
assert info1 == info2
assert info1["version_tuple"] == PROCRASTINATE_VERSION
def test_feature_flags_consistency(self):
"""Test that feature flags are consistent with version."""
from video_processor.tasks.compat import FEATURES, IS_PROCRASTINATE_3_PLUS
# 3.x features should only be available in 3.x
v3_features = [
"graceful_shutdown",
"job_cancellation",
"pre_post_migrations",
"psycopg3_support"
]
for feature in v3_features:
if IS_PROCRASTINATE_3_PLUS:
assert FEATURES[feature] is True, f"{feature} should be True in 3.x"
else:
assert FEATURES[feature] is False, f"{feature} should be False in 2.x"