diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ae8c747 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,84 @@ +# Video Processor Dockerfile with uv caching optimization +# Based on uv Docker integration best practices +# https://docs.astral.sh/uv/guides/integration/docker/ + +FROM python:3.11-slim as base + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + ffmpeg \ + imagemagick \ + postgresql-client \ + && rm -rf /var/lib/apt/lists/* + +# Install uv +COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv + +# Create app directory +WORKDIR /app + +# Create user for running the application +RUN groupadd -r app && useradd -r -g app app + +# Change to app user for dependency installation +USER app + +# Copy dependency files first for better caching +COPY --chown=app:app pyproject.toml uv.lock* ./ + +# Create virtual environment and install dependencies +# This layer will be cached if dependencies don't change +ENV UV_SYSTEM_PYTHON=1 +RUN uv sync --frozen --no-dev + +# Copy application code +COPY --chown=app:app . . + +# Install the application +RUN uv pip install -e . + +# Production stage +FROM base as production + +# Set environment variables +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PATH="/app/.venv/bin:$PATH" + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD python -c "from video_processor import VideoProcessor; print('OK')" || exit 1 + +# Default command +CMD ["python", "-m", "video_processor.tasks.procrastinate_tasks"] + +# Development stage with dev dependencies +FROM base as development + +# Install development dependencies +RUN uv sync --frozen + +# Install pre-commit hooks +RUN uv run pre-commit install || true + +# Set development environment +ENV FLASK_ENV=development +ENV PYTHONPATH=/app + +# Default command for development +CMD ["bash"] + +# Worker stage for Procrastinate workers +FROM production as worker + +# Set worker-specific environment +ENV PROCRASTINATE_WORKER=1 + +# Command to run Procrastinate worker +CMD ["python", "-m", "video_processor.tasks.worker_compatibility", "worker"] + +# Migration stage for database migrations +FROM production as migration + +# Command to run migrations +CMD ["python", "-m", "video_processor.tasks.migration"] \ No newline at end of file diff --git a/README.md b/README.md index f0edd62..bc93b77 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,67 @@ uv add "video-processor[video-360-full]" # Includes: All 360° dependencies + exifread ``` +### ⚡ Procrastinate Migration (2.x → 3.x) + +This library supports both **Procrastinate 2.x** and **3.x** for smooth migration: + +#### 🔄 Automatic Version Detection +```python +from video_processor.tasks.compat import get_version_info, IS_PROCRASTINATE_3_PLUS + +version_info = get_version_info() +print(f"Using Procrastinate {version_info['procrastinate_version']}") +print(f"Features available: {list(version_info['features'].keys())}") + +# Version-aware setup +if IS_PROCRASTINATE_3_PLUS: + # Use 3.x features like improved performance, graceful shutdown + pass +``` + +#### 📋 Migration Steps +1. **Install compatible version**: + ```bash + uv add "procrastinate>=3.5.2,<4.0.0" # Or keep 2.x support: ">=2.15.1,<4.0.0" + ``` + +2. **Apply database migrations**: + ```bash + # Procrastinate 3.x (two-step process) + procrastinate schema --apply --mode=pre # Before deploying + # Deploy new code + procrastinate schema --apply --mode=post # After deploying + + # Procrastinate 2.x (single step) + procrastinate schema --apply + ``` + +3. **Use migration helper**: + ```python + from video_processor.tasks.migration import migrate_database + + # Automatic version-aware migration + success = await migrate_database("postgresql://localhost/mydb") + ``` + +4. **Update worker configuration**: + ```python + from video_processor.tasks import get_worker_kwargs + + # Automatically normalizes options for your version + worker_options = get_worker_kwargs( + concurrency=4, + timeout=5, # Maps to fetch_job_polling_interval in 3.x + remove_error=True, # Maps to remove_failed in 3.x + ) + ``` + +#### 🆕 Procrastinate 3.x Benefits +- **Better performance** with improved job fetching +- **Graceful shutdown** with `shutdown_graceful_timeout` +- **Enhanced error handling** and job cancellation +- **Schema compatibility** improvements (3.5.2+) + ### Development Setup ```bash @@ -512,7 +573,7 @@ This project is licensed under the **MIT License** - see the [LICENSE](LICENSE) - ✨ **Multi-format encoding**: MP4, WebM, OGV support - 🖼️ **Thumbnail generation** with customizable timestamps - 🎞️ **Sprite sheet creation** with WebVTT files -- ⚡ **Background processing** with Procrastinate +- ⚡ **Background processing** with Procrastinate (2.x and 3.x compatible) - ⚙️ **Type-safe configuration** with Pydantic V2 - 🛠️ **Modern tooling**: uv, ruff, pytest integration - 📚 **Comprehensive documentation** and examples diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..30233de --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,158 @@ +# Docker Compose setup for Video Processor with Procrastinate +# Complete development and testing environment + +version: '3.8' + +services: + # PostgreSQL database for Procrastinate + postgres: + image: postgres:15-alpine + environment: + POSTGRES_DB: video_processor + POSTGRES_USER: video_user + POSTGRES_PASSWORD: video_password + POSTGRES_HOST_AUTH_METHOD: trust + volumes: + - postgres_data:/var/lib/postgresql/data + - ./docker/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U video_user -d video_processor"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - video_net + + # Redis for additional caching (optional) + redis: + image: redis:7-alpine + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - video_net + + # Video Processor API service + app: + build: + context: . + dockerfile: Dockerfile + target: development + environment: + - DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor + - REDIS_URL=redis://redis:6379/0 + - PYTHONPATH=/app + volumes: + - .:/app + - video_uploads:/app/uploads + - video_outputs:/app/outputs + ports: + - "8000:8000" + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + networks: + - video_net + command: ["python", "examples/docker_demo.py"] + + # Procrastinate worker for background processing + worker: + build: + context: . + dockerfile: Dockerfile + target: worker + environment: + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor + - WORKER_CONCURRENCY=4 + - WORKER_TIMEOUT=300 + volumes: + - video_uploads:/app/uploads + - video_outputs:/app/outputs + depends_on: + postgres: + condition: service_healthy + networks: + - video_net + command: ["python", "-m", "video_processor.tasks.worker_compatibility", "worker"] + + # Migration service (runs once to setup DB) + migrate: + build: + context: . + dockerfile: Dockerfile + target: migration + environment: + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor + depends_on: + postgres: + condition: service_healthy + networks: + - video_net + command: ["python", "-c", " + import asyncio; + from video_processor.tasks.migration import migrate_database; + asyncio.run(migrate_database('postgresql://video_user:video_password@postgres:5432/video_processor')) + "] + + # Test runner service + test: + build: + context: . + dockerfile: Dockerfile + target: development + environment: + - DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor_test + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor_test + volumes: + - .:/app + depends_on: + postgres: + condition: service_healthy + networks: + - video_net + command: ["uv", "run", "pytest", "tests/", "-v", "--cov=src/", "--cov-report=html", "--cov-report=term"] + + # Demo web interface (optional) + demo: + build: + context: . + dockerfile: Dockerfile + target: development + environment: + - DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres:5432/video_processor + ports: + - "8080:8080" + volumes: + - .:/app + - video_uploads:/app/uploads + - video_outputs:/app/outputs + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + networks: + - video_net + command: ["python", "examples/web_demo.py"] + +volumes: + postgres_data: + driver: local + video_uploads: + driver: local + video_outputs: + driver: local + +networks: + video_net: + driver: bridge \ No newline at end of file diff --git a/docker/init-db.sql b/docker/init-db.sql new file mode 100644 index 0000000..c8b4093 --- /dev/null +++ b/docker/init-db.sql @@ -0,0 +1,42 @@ +-- Database initialization for Video Processor +-- Creates necessary databases and extensions + +-- Create test database +CREATE DATABASE video_processor_test; + +-- Connect to main database +\c video_processor; + +-- Enable required extensions +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +-- Create basic schema (Procrastinate will handle its own tables) +CREATE SCHEMA IF NOT EXISTS video_processor; + +-- Grant permissions +GRANT ALL PRIVILEGES ON DATABASE video_processor TO video_user; +GRANT ALL PRIVILEGES ON DATABASE video_processor_test TO video_user; +GRANT ALL PRIVILEGES ON SCHEMA video_processor TO video_user; + +-- Create a sample videos table for demo purposes +CREATE TABLE IF NOT EXISTS video_processor.videos ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + filename VARCHAR(255) NOT NULL, + original_path TEXT, + processed_path TEXT, + status VARCHAR(50) DEFAULT 'pending', + metadata JSONB, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- Create index for efficient queries +CREATE INDEX IF NOT EXISTS idx_videos_status ON video_processor.videos(status); +CREATE INDEX IF NOT EXISTS idx_videos_created_at ON video_processor.videos(created_at); + +-- Insert sample data +INSERT INTO video_processor.videos (filename, status) VALUES + ('sample_video_1.mp4', 'pending'), + ('sample_video_2.mp4', 'processing'), + ('sample_video_3.mp4', 'completed') +ON CONFLICT DO NOTHING; \ No newline at end of file diff --git a/examples/async_processing.py b/examples/async_processing.py index 39c51be..0e0dd08 100644 --- a/examples/async_processing.py +++ b/examples/async_processing.py @@ -14,7 +14,8 @@ from pathlib import Path import procrastinate from video_processor import ProcessorConfig -from video_processor.tasks import setup_procrastinate +from video_processor.tasks import setup_procrastinate, get_worker_kwargs +from video_processor.tasks.compat import get_version_info, IS_PROCRASTINATE_3_PLUS async def async_processing_example(): @@ -25,8 +26,18 @@ async def async_processing_example(): database_url = "postgresql://localhost/procrastinate_test" try: - # Set up Procrastinate - app = setup_procrastinate(database_url) + # Print version information + version_info = get_version_info() + print(f"Using Procrastinate {version_info['procrastinate_version']}") + print(f"Version 3.x+: {version_info['is_v3_plus']}") + + # Set up Procrastinate with version-appropriate settings + connector_kwargs = {} + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x specific settings + connector_kwargs["pool_size"] = 10 + + app = setup_procrastinate(database_url, connector_kwargs=connector_kwargs) with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) diff --git a/examples/docker_demo.py b/examples/docker_demo.py new file mode 100644 index 0000000..85ce58e --- /dev/null +++ b/examples/docker_demo.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +""" +Docker Demo Application for Video Processor + +This demo shows how to use the video processor in a containerized environment +with Procrastinate background tasks and PostgreSQL. +""" + +import asyncio +import logging +import os +import tempfile +from pathlib import Path + +from video_processor import ProcessorConfig, VideoProcessor +from video_processor.tasks import setup_procrastinate +from video_processor.tasks.compat import get_version_info +from video_processor.tasks.migration import migrate_database + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +async def create_sample_video(output_path: Path) -> Path: + """Create a sample video using ffmpeg for testing.""" + video_file = output_path / "sample_test_video.mp4" + + # Create a simple test video using ffmpeg + import subprocess + + cmd = [ + "ffmpeg", "-y", + "-f", "lavfi", + "-i", "testsrc=duration=10:size=640x480:rate=30", + "-c:v", "libx264", + "-preset", "fast", + "-crf", "23", + str(video_file) + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + logger.error(f"FFmpeg failed: {result.stderr}") + raise RuntimeError("Failed to create sample video") + + logger.info(f"Created sample video: {video_file}") + return video_file + + except FileNotFoundError: + logger.error("FFmpeg not found. Please install FFmpeg.") + raise + + +async def demo_sync_processing(): + """Demonstrate synchronous video processing.""" + logger.info("🎬 Starting Synchronous Processing Demo") + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create sample video + sample_video = await create_sample_video(temp_path) + + # Configure processor + config = ProcessorConfig( + output_dir=temp_path / "outputs", + output_formats=["mp4", "webm"], + quality_preset="fast", + generate_thumbnails=True, + generate_sprites=True, + enable_360_processing=True, # Will be disabled if deps not available + ) + + # Process video + processor = VideoProcessor(config) + result = processor.process_video(sample_video) + + logger.info("✅ Synchronous processing completed!") + logger.info(f"📹 Processed video ID: {result.video_id}") + logger.info(f"📁 Output files: {len(result.encoded_files)} formats") + logger.info(f"🖼️ Thumbnails: {len(result.thumbnails)}") + + if result.sprite_file: + sprite_size = result.sprite_file.stat().st_size // 1024 + logger.info(f"🎯 Sprite sheet: {sprite_size}KB") + + if hasattr(result, 'thumbnails_360') and result.thumbnails_360: + logger.info(f"🌐 360° thumbnails: {len(result.thumbnails_360)}") + + +async def demo_async_processing(): + """Demonstrate asynchronous video processing with Procrastinate.""" + logger.info("⚡ Starting Asynchronous Processing Demo") + + # Get database URL from environment + database_url = os.environ.get( + 'PROCRASTINATE_DATABASE_URL', + 'postgresql://video_user:video_password@postgres:5432/video_processor' + ) + + try: + # Show version info + version_info = get_version_info() + logger.info(f"📦 Using Procrastinate {version_info['procrastinate_version']}") + + # Run migrations + logger.info("🔄 Running database migrations...") + migration_success = await migrate_database(database_url) + + if not migration_success: + logger.error("❌ Database migration failed") + return + + logger.info("✅ Database migrations completed") + + # Set up Procrastinate + app = setup_procrastinate(database_url) + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create sample video + sample_video = await create_sample_video(temp_path) + + # Configure processing + config_dict = { + "base_path": str(temp_path), + "output_formats": ["mp4"], + "quality_preset": "fast", + "generate_thumbnails": True, + "sprite_interval": 5, + } + + async with app.open_async() as app_context: + # Submit video processing task + logger.info("📤 Submitting async video processing job...") + + job = await app_context.configure_task( + "process_video_async", + queue="video_processing" + ).defer_async( + input_path=str(sample_video), + output_dir=str(temp_path / "async_outputs"), + config_dict=config_dict + ) + + logger.info(f"✅ Job submitted with ID: {job.id}") + logger.info("🔄 Job will be processed by background worker...") + + # In a real app, you would monitor job status or use webhooks + # For demo purposes, we'll just show the job was submitted + + # Submit additional tasks + logger.info("📤 Submitting thumbnail generation job...") + + thumb_job = await app_context.configure_task( + "generate_thumbnail_async", + queue="thumbnail_generation" + ).defer_async( + video_path=str(sample_video), + output_dir=str(temp_path / "thumbnails"), + timestamp=5, + video_id="demo_thumb" + ) + + logger.info(f"✅ Thumbnail job submitted: {thumb_job.id}") + + except Exception as e: + logger.error(f"❌ Async processing demo failed: {e}") + raise + + +async def demo_migration_features(): + """Demonstrate migration utilities.""" + logger.info("🔄 Migration Features Demo") + + from video_processor.tasks.migration import ProcrastinateMigrationHelper + + database_url = os.environ.get( + 'PROCRASTINATE_DATABASE_URL', + 'postgresql://video_user:video_password@postgres:5432/video_processor' + ) + + # Show migration plan + helper = ProcrastinateMigrationHelper(database_url) + helper.print_migration_plan() + + # Show version-specific features + version_info = get_version_info() + logger.info("🆕 Available Features:") + for feature, available in version_info['features'].items(): + status = "✅" if available else "❌" + logger.info(f" {status} {feature}") + + +async def main(): + """Run all demo scenarios.""" + logger.info("🚀 Video Processor Docker Demo Starting...") + + try: + # Run demos in sequence + await demo_sync_processing() + await demo_async_processing() + await demo_migration_features() + + logger.info("🎉 All demos completed successfully!") + + # Keep the container running to show logs + logger.info("📋 Demo completed. Container will keep running for log inspection...") + logger.info("💡 Check the logs with: docker-compose logs app") + logger.info("🛑 Stop with: docker-compose down") + + # Keep running for log inspection + while True: + await asyncio.sleep(30) + logger.info("💓 Demo container heartbeat - still running...") + + except KeyboardInterrupt: + logger.info("🛑 Demo interrupted by user") + except Exception as e: + logger.error(f"❌ Demo failed: {e}") + raise + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/web_demo.py b/examples/web_demo.py new file mode 100644 index 0000000..7c12774 --- /dev/null +++ b/examples/web_demo.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +""" +Simple web demo interface for Video Processor. + +This provides a basic Flask web interface to demonstrate video processing +capabilities in a browser-friendly format. +""" + +import asyncio +import os +import tempfile +from pathlib import Path +from typing import Optional + +try: + from flask import Flask, jsonify, render_template_string, request +except ImportError: + print("Flask not installed. Install with: uv add flask") + exit(1) + +from video_processor import ProcessorConfig, VideoProcessor +from video_processor.tasks import setup_procrastinate +from video_processor.tasks.compat import get_version_info + +# Simple HTML template +HTML_TEMPLATE = """ + + + + Video Processor Demo + + + +
+

🎬 Video Processor Demo

+ +
+ System Information:
+ Version: {{ version_info.version }}
+ Procrastinate: {{ version_info.procrastinate_version }}
+ Features: {{ version_info.features }} +
+ +

Test Video Processing

+ + + + +
+ +

Processing Logs

+
Ready...
+
+ + + + +""" + +app = Flask(__name__) + + +async def create_test_video(output_dir: Path) -> Path: + """Create a simple test video for processing.""" + import subprocess + + video_file = output_dir / "web_demo_test.mp4" + + cmd = [ + "ffmpeg", "-y", + "-f", "lavfi", + "-i", "testsrc=duration=5:size=320x240:rate=15", + "-c:v", "libx264", + "-preset", "ultrafast", + "-crf", "30", + str(video_file) + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f"FFmpeg failed: {result.stderr}") + return video_file + except FileNotFoundError: + raise RuntimeError("FFmpeg not found. Please install FFmpeg.") + + +@app.route('/') +def index(): + """Serve the demo web interface.""" + version_info = get_version_info() + return render_template_string(HTML_TEMPLATE, version_info=version_info) + + +@app.route('/api/info') +def api_info(): + """Get system information.""" + return jsonify(get_version_info()) + + +@app.route('/api/process-test', methods=['POST']) +def api_process_test(): + """Process a test video synchronously.""" + try: + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create test video + test_video = asyncio.run(create_test_video(temp_path)) + + # Configure processor for fast processing + config = ProcessorConfig( + output_dir=temp_path / "outputs", + output_formats=["mp4"], + quality_preset="ultrafast", + generate_thumbnails=True, + generate_sprites=False, # Skip sprites for faster demo + enable_360_processing=False, # Skip 360 for faster demo + ) + + # Process video + processor = VideoProcessor(config) + result = processor.process_video(test_video) + + return jsonify({ + "status": "success", + "video_id": result.video_id, + "encoded_files": len(result.encoded_files), + "thumbnails": len(result.thumbnails), + "processing_time": "< 30s (estimated)", + "message": "Test video processed successfully!" + }) + + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route('/api/async-job', methods=['POST']) +def api_async_job(): + """Submit an async processing job.""" + try: + database_url = os.environ.get( + 'PROCRASTINATE_DATABASE_URL', + 'postgresql://video_user:video_password@postgres:5432/video_processor' + ) + + # Set up Procrastinate + app_context = setup_procrastinate(database_url) + + # In a real application, you would: + # 1. Accept file uploads + # 2. Store them temporarily + # 3. Submit processing jobs + # 4. Return job IDs for status tracking + + # For demo, we'll just simulate job submission + job_id = f"demo-job-{os.urandom(4).hex()}" + + return jsonify({ + "status": "submitted", + "job_id": job_id, + "queue": "video_processing", + "message": "Job submitted to background worker", + "note": "In production, this would submit a real Procrastinate job" + }) + + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +def main(): + """Run the web demo server.""" + port = int(os.environ.get('PORT', 8080)) + debug = os.environ.get('FLASK_ENV') == 'development' + + print(f"🌐 Starting Video Processor Web Demo on port {port}") + print(f"📖 Open http://localhost:{port} in your browser") + + app.run(host='0.0.0.0', port=port, debug=debug) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/worker_compatibility.py b/examples/worker_compatibility.py new file mode 100644 index 0000000..e77808d --- /dev/null +++ b/examples/worker_compatibility.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +""" +Procrastinate worker compatibility example. + +This example demonstrates how to run a Procrastinate worker that works +with both version 2.x and 3.x of Procrastinate. +""" + +import asyncio +import logging +import signal +import sys +from pathlib import Path + +from video_processor.tasks import setup_procrastinate, get_worker_kwargs +from video_processor.tasks.compat import get_version_info, IS_PROCRASTINATE_3_PLUS +from video_processor.tasks.migration import migrate_database + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def setup_and_run_worker(): + """Set up and run a Procrastinate worker with version compatibility.""" + + # Database connection + database_url = "postgresql://localhost/procrastinate_dev" + + try: + # Print version information + version_info = get_version_info() + logger.info(f"Starting worker with Procrastinate {version_info['procrastinate_version']}") + logger.info(f"Available features: {list(version_info['features'].keys())}") + + # Optionally run database migration + migrate_success = await migrate_database(database_url) + if not migrate_success: + logger.error("Database migration failed") + return + + # Set up Procrastinate app + connector_kwargs = {} + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x connection pool settings + connector_kwargs.update({ + "pool_size": 20, + "max_pool_size": 50, + }) + + app = setup_procrastinate(database_url, connector_kwargs=connector_kwargs) + + # Configure worker options with version compatibility + worker_options = { + "concurrency": 4, + "name": "video-processor-worker", + } + + # Add version-specific options + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x options + worker_options.update({ + "fetch_job_polling_interval": 5, # Renamed from "timeout" in 2.x + "shutdown_graceful_timeout": 30, # New in 3.x + "remove_failed": True, # Renamed from "remove_error" + "include_failed": False, # Renamed from "include_error" + }) + else: + # Procrastinate 2.x options + worker_options.update({ + "timeout": 5, + "remove_error": True, + "include_error": False, + }) + + # Normalize options for the current version + normalized_options = get_worker_kwargs(**worker_options) + + logger.info(f"Worker options: {normalized_options}") + + # Create and configure worker + async with app.open_async() as app_context: + worker = app_context.create_worker( + queues=["video_processing", "thumbnail_generation", "sprite_generation"], + **normalized_options + ) + + # Set up signal handlers for graceful shutdown + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x has improved graceful shutdown + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}, shutting down gracefully...") + worker.stop() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting Procrastinate worker...") + logger.info("Queues: video_processing, thumbnail_generation, sprite_generation") + logger.info("Press Ctrl+C to stop") + + # Run the worker + await worker.run_async() + + except KeyboardInterrupt: + logger.info("Worker interrupted by user") + except Exception as e: + logger.error(f"Worker error: {e}") + raise + + +async def test_task_submission(): + """Test task submission with both Procrastinate versions.""" + + database_url = "postgresql://localhost/procrastinate_dev" + + try: + app = setup_procrastinate(database_url) + + # Test video processing task + with Path("test_video.mp4").open("w") as f: + f.write("") # Create dummy file for testing + + async with app.open_async() as app_context: + # Submit test task + job = await app_context.configure_task( + "process_video_async", + queue="video_processing" + ).defer_async( + input_path="test_video.mp4", + output_dir="/tmp/test_output", + config_dict={"quality_preset": "fast"} + ) + + logger.info(f"Submitted test job: {job.id}") + + # Clean up + Path("test_video.mp4").unlink(missing_ok=True) + + except Exception as e: + logger.error(f"Task submission test failed: {e}") + + +def show_migration_help(): + """Show migration help for upgrading from Procrastinate 2.x to 3.x.""" + + print("\nProcrastinate Migration Guide") + print("=" * 40) + + version_info = get_version_info() + + if version_info['is_v3_plus']: + print("✅ You are running Procrastinate 3.x") + print("\nMigration steps for 3.x:") + print("1. Apply pre-migration: python -m video_processor.tasks.migration --pre") + print("2. Deploy new application code") + print("3. Apply post-migration: python -m video_processor.tasks.migration --post") + print("4. Verify: procrastinate schema --check") + else: + print("📦 You are running Procrastinate 2.x") + print("\nTo upgrade to 3.x:") + print("1. Update dependencies: uv add 'procrastinate>=3.0,<4.0'") + print("2. Apply pre-migration: python -m video_processor.tasks.migration --pre") + print("3. Deploy new code") + print("4. Apply post-migration: python -m video_processor.tasks.migration --post") + + print(f"\nCurrent version: {version_info['procrastinate_version']}") + print(f"Available features: {list(version_info['features'].keys())}") + + +if __name__ == "__main__": + if len(sys.argv) > 1: + command = sys.argv[1] + + if command == "worker": + asyncio.run(setup_and_run_worker()) + elif command == "test": + asyncio.run(test_task_submission()) + elif command == "help": + show_migration_help() + else: + print("Usage: python worker_compatibility.py [worker|test|help]") + else: + print("Procrastinate Worker Compatibility Demo") + print("Usage:") + print(" python worker_compatibility.py worker - Run worker") + print(" python worker_compatibility.py test - Test task submission") + print(" python worker_compatibility.py help - Show migration help") + + show_migration_help() \ No newline at end of file diff --git a/pipeline_360_only/caa085b6/caa085b6_360_front_5.jpg b/pipeline_360_only/caa085b6/caa085b6_360_front_5.jpg new file mode 100644 index 0000000..6625528 Binary files /dev/null and b/pipeline_360_only/caa085b6/caa085b6_360_front_5.jpg differ diff --git a/pipeline_360_only/caa085b6/caa085b6_360_stereographic_5.jpg b/pipeline_360_only/caa085b6/caa085b6_360_stereographic_5.jpg new file mode 100644 index 0000000..ca923a1 Binary files /dev/null and b/pipeline_360_only/caa085b6/caa085b6_360_stereographic_5.jpg differ diff --git a/pyproject.toml b/pyproject.toml index 9cb095f..0b41f15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,20 +4,21 @@ build-backend = "hatchling.build" [project] name = "video-processor" -version = "0.1.0" +version = "0.2.0" description = "Standalone video processing pipeline with multiple format encoding" authors = [{name = "Video Processor", email = "dev@example.com"}] readme = "README.md" requires-python = ">=3.11" dependencies = [ "ffmpeg-python>=0.2.0", - "pillow>=11.2.1", + "pillow>=11.2.1", "msprites2 @ git+https://github.com/rsp2k/msprites2.git", - "procrastinate>=2.15.1", + "procrastinate>=2.15.1,<4.0.0", # Support both 2.x and 3.x during migration "psycopg[pool]>=3.2.9", "python-dateutil>=2.9.0", "pydantic>=2.0.0", "pydantic-settings>=2.0.0", + "exifread>=3.5.1", ] [project.optional-dependencies] @@ -102,11 +103,13 @@ testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] +asyncio_mode = "auto" [dependency-groups] dev = [ "mypy>=1.17.1", "pytest>=8.4.2", + "pytest-asyncio>=0.21.0", "pytest-cov>=6.2.1", "ruff>=0.12.12", ] diff --git a/src/video_processor/__init__.py b/src/video_processor/__init__.py index 2e895b4..223208e 100644 --- a/src/video_processor/__init__.py +++ b/src/video_processor/__init__.py @@ -11,8 +11,8 @@ from .exceptions import EncodingError, StorageError, VideoProcessorError # Optional 360° imports try: - from .utils.video_360 import Video360Detection, Video360Utils, HAS_360_SUPPORT from .core.thumbnails_360 import Thumbnail360Generator + from .utils.video_360 import HAS_360_SUPPORT, Video360Detection, Video360Utils except ImportError: HAS_360_SUPPORT = False @@ -30,6 +30,6 @@ __all__ = [ if HAS_360_SUPPORT: __all__.extend([ "Video360Detection", - "Video360Utils", + "Video360Utils", "Thumbnail360Generator", ]) diff --git a/src/video_processor/config.py b/src/video_processor/config.py index 2330ccf..83a436f 100644 --- a/src/video_processor/config.py +++ b/src/video_processor/config.py @@ -7,7 +7,12 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator # Optional dependency detection for 360° features try: - from .utils.video_360 import Video360Utils, ProjectionType, StereoMode, HAS_360_SUPPORT + from .utils.video_360 import ( + HAS_360_SUPPORT, + ProjectionType, + StereoMode, + Video360Utils, + ) except ImportError: # Fallback types when 360° libraries not available ProjectionType = str @@ -43,7 +48,7 @@ class ProcessorConfig(BaseModel): # File permissions file_permissions: int = 0o644 directory_permissions: int = 0o755 - + # 360° Video settings (only active if 360° libraries are available) enable_360_processing: bool = Field(default=HAS_360_SUPPORT) auto_detect_360: bool = Field(default=True) @@ -67,7 +72,7 @@ class ProcessorConfig(BaseModel): if not v: raise ValueError("At least one output format must be specified") return v - + @field_validator("enable_360_processing") @classmethod def validate_360_processing(cls, v: bool) -> bool: @@ -75,7 +80,7 @@ class ProcessorConfig(BaseModel): if v and not HAS_360_SUPPORT: raise ValueError( "360° processing requires optional dependencies. " - f"Install with: pip install 'video-processor[video-360]' or uv add 'video-processor[video-360]'" + "Install with: pip install 'video-processor[video-360]' or uv add 'video-processor[video-360]'" ) return v diff --git a/src/video_processor/core/metadata.py b/src/video_processor/core/metadata.py index fea5524..6963542 100644 --- a/src/video_processor/core/metadata.py +++ b/src/video_processor/core/metadata.py @@ -57,7 +57,7 @@ class VideoMetadata: # Raw probe data for advanced use cases "raw_probe_data": probe_data, } - + # Add 360° video detection video_360_info = Video360Detection.detect_360_video(metadata) metadata["video_360"] = video_360_info diff --git a/src/video_processor/core/processor.py b/src/video_processor/core/processor.py index b215214..08e9739 100644 --- a/src/video_processor/core/processor.py +++ b/src/video_processor/core/processor.py @@ -55,7 +55,7 @@ class VideoProcessor: self.encoder = VideoEncoder(config) self.thumbnail_generator = ThumbnailGenerator(config) self.metadata_extractor = VideoMetadata(config) - + # Initialize 360° thumbnail generator if available and enabled if HAS_360_SUPPORT and config.enable_360_processing: self.thumbnail_360_generator = Thumbnail360Generator(config) @@ -138,19 +138,19 @@ class VideoProcessor: sprite_file, webvtt_file = self.thumbnail_generator.generate_sprites( encoded_files["mp4"], output_dir, video_id ) - + # Generate 360° thumbnails and sprites if this is a 360° video thumbnails_360 = {} sprite_360_files = {} - - if (self.thumbnail_360_generator and + + if (self.thumbnail_360_generator and self.config.generate_360_thumbnails and metadata.get("video_360", {}).get("is_360_video", False)): - + # Get 360° video information video_360_info = metadata["video_360"] projection_type = video_360_info.get("projection_type", "equirectangular") - + # Generate 360° thumbnails for each timestamp for timestamp in self.config.thumbnail_timestamps: angle_thumbnails = self.thumbnail_360_generator.generate_360_thumbnails( @@ -161,12 +161,12 @@ class VideoProcessor: projection_type, self.config.thumbnail_360_projections, ) - + # Store thumbnails by timestamp and angle for angle, thumbnail_path in angle_thumbnails.items(): key = f"{timestamp}s_{angle}" thumbnails_360[key] = thumbnail_path - + # Generate 360° sprite sheets for each viewing angle if self.config.generate_sprites: for angle in self.config.thumbnail_360_projections: diff --git a/src/video_processor/core/thumbnails.py b/src/video_processor/core/thumbnails.py index 6cfe2e5..481d228 100644 --- a/src/video_processor/core/thumbnails.py +++ b/src/video_processor/core/thumbnails.py @@ -3,10 +3,10 @@ from pathlib import Path import ffmpeg -from msprites2 import MontageSprites from ..config import ProcessorConfig from ..exceptions import EncodingError, FFmpegError +from ..utils.sprite_generator import FixedSpriteGenerator class ThumbnailGenerator: @@ -99,45 +99,28 @@ class ThumbnailGenerator: webvtt_file = output_dir / f"{video_id}_sprite.webvtt" thumbnail_dir = output_dir / "frames" - # Create frames directory - thumbnail_dir.mkdir(exist_ok=True) - try: - # Generate sprites using msprites2 (the forked library) - MontageSprites.from_media( - video_path=str(video_path), - thumbnail_dir=str(thumbnail_dir), - sprite_file=str(sprite_file), - webvtt_file=str(webvtt_file), - # Optional parameters - can be made configurable - interval=self.config.sprite_interval, - width=160, # Individual thumbnail width - height=90, # Individual thumbnail height - columns=10, # Thumbnails per row in sprite + # Use our fixed sprite generator + sprite_path, webvtt_path = FixedSpriteGenerator.create_sprite_sheet( + video_path=video_path, + thumbnail_dir=thumbnail_dir, + sprite_file=sprite_file, + webvtt_file=webvtt_file, + ips=1.0 / self.config.sprite_interval, + width=160, + height=90, + cols=10, + rows=10, + cleanup=True, ) except Exception as e: raise EncodingError(f"Sprite generation failed: {e}") from e - if not sprite_file.exists(): + if not sprite_path.exists(): raise EncodingError("Sprite generation failed - sprite file not created") - if not webvtt_file.exists(): + if not webvtt_path.exists(): raise EncodingError("Sprite generation failed - WebVTT file not created") - # Clean up temporary frames directory - self._cleanup_frames_directory(thumbnail_dir) - - return sprite_file, webvtt_file - - def _cleanup_frames_directory(self, frames_dir: Path) -> None: - """Clean up temporary frame files.""" - try: - if frames_dir.exists(): - for frame_file in frames_dir.iterdir(): - if frame_file.is_file(): - frame_file.unlink() - frames_dir.rmdir() - except Exception: - # Don't fail the entire process if cleanup fails - pass + return sprite_path, webvtt_path diff --git a/src/video_processor/core/thumbnails_360.py b/src/video_processor/core/thumbnails_360.py index 5f73857..30437de 100644 --- a/src/video_processor/core/thumbnails_360.py +++ b/src/video_processor/core/thumbnails_360.py @@ -13,7 +13,8 @@ from ..exceptions import EncodingError, FFmpegError try: import cv2 import numpy as np - from ..utils.video_360 import ProjectionType, Video360Utils, HAS_360_SUPPORT + + from ..utils.video_360 import HAS_360_SUPPORT, ProjectionType, Video360Utils except ImportError: # Fallback types when dependencies not available ProjectionType = str @@ -27,7 +28,7 @@ class Thumbnail360Generator: def __init__(self, config: ProcessorConfig) -> None: self.config = config - + if not HAS_360_SUPPORT: raise ImportError( "360° thumbnail generation requires optional dependencies. " @@ -61,30 +62,30 @@ class Thumbnail360Generator: viewing_angles = self.config.thumbnail_360_projections thumbnails = {} - + # First extract a full equirectangular frame equirect_frame = self._extract_equirectangular_frame( video_path, timestamp, output_dir, video_id ) - + try: # Load the equirectangular image equirect_img = cv2.imread(str(equirect_frame)) if equirect_img is None: raise EncodingError(f"Failed to load equirectangular frame: {equirect_frame}") - + # Generate thumbnails for each viewing angle for angle in viewing_angles: thumbnail_path = self._generate_angle_thumbnail( equirect_img, angle, output_dir, video_id, timestamp ) thumbnails[angle] = thumbnail_path - + finally: # Clean up temporary equirectangular frame if equirect_frame.exists(): equirect_frame.unlink() - + return thumbnails def _extract_equirectangular_frame( @@ -92,7 +93,7 @@ class Thumbnail360Generator: ) -> Path: """Extract a full equirectangular frame from the 360° video.""" temp_frame = output_dir / f"{video_id}_temp_equirect_{timestamp}.jpg" - + try: # Get video info probe = ffmpeg.probe(str(video_path)) @@ -100,15 +101,15 @@ class Thumbnail360Generator: stream for stream in probe["streams"] if stream["codec_type"] == "video" ) - + width = video_stream["width"] height = video_stream["height"] duration = float(video_stream.get("duration", 0)) - + # Adjust timestamp if beyond video duration if timestamp >= duration: timestamp = max(1, int(duration // 2)) - + # Extract full resolution frame ( ffmpeg.input(str(video_path), ss=timestamp) @@ -117,14 +118,14 @@ class Thumbnail360Generator: .overwrite_output() .run(capture_stdout=True, capture_stderr=True, quiet=True) ) - + except ffmpeg.Error as e: error_msg = e.stderr.decode() if e.stderr else "Unknown FFmpeg error" raise FFmpegError(f"Frame extraction failed: {error_msg}") from e - + if not temp_frame.exists(): raise EncodingError("Frame extraction failed - output file not created") - + return temp_frame def _generate_angle_thumbnail( @@ -137,17 +138,17 @@ class Thumbnail360Generator: ) -> Path: """Generate thumbnail for a specific viewing angle.""" output_path = output_dir / f"{video_id}_360_{viewing_angle}_{timestamp}.jpg" - + if viewing_angle == "stereographic": # Generate "little planet" stereographic projection thumbnail = self._create_stereographic_projection(equirect_img) else: # Generate perspective projection for the viewing angle thumbnail = self._create_perspective_projection(equirect_img, viewing_angle) - + # Save thumbnail cv2.imwrite(str(output_path), thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 85]) - + return output_path def _create_perspective_projection( @@ -155,7 +156,7 @@ class Thumbnail360Generator: ) -> "np.ndarray": """Create perspective projection for a viewing angle.""" height, width = equirect_img.shape[:2] - + # Define viewing directions (yaw, pitch) in radians viewing_directions = { "front": (0, 0), @@ -165,68 +166,68 @@ class Thumbnail360Generator: "up": (0, math.pi/2), "down": (0, -math.pi/2), } - + if viewing_angle not in viewing_directions: viewing_angle = "front" - + yaw, pitch = viewing_directions[viewing_angle] - + # Generate perspective view thumbnail_size = self.config.thumbnail_width fov = math.pi / 3 # 60 degrees field of view - + # Create coordinate maps for perspective projection u_map, v_map = self._create_perspective_maps( thumbnail_size, thumbnail_size, fov, yaw, pitch, width, height ) - + # Apply remapping thumbnail = cv2.remap(equirect_img, u_map, v_map, cv2.INTER_LINEAR) - + return thumbnail def _create_stereographic_projection(self, equirect_img: "np.ndarray") -> "np.ndarray": """Create stereographic 'little planet' projection.""" height, width = equirect_img.shape[:2] - + # Output size for stereographic projection output_size = self.config.thumbnail_width - + # Create coordinate maps for stereographic projection y_coords, x_coords = np.mgrid[0:output_size, 0:output_size] - + # Convert to centered coordinates x_centered = (x_coords - output_size // 2) / (output_size // 2) y_centered = (y_coords - output_size // 2) / (output_size // 2) - + # Calculate distance from center r = np.sqrt(x_centered**2 + y_centered**2) - + # Create mask for circular boundary mask = r <= 1.0 - + # Convert to spherical coordinates for stereographic projection theta = np.arctan2(y_centered, x_centered) phi = 2 * np.arctan(r) - + # Convert to equirectangular coordinates u = (theta + np.pi) / (2 * np.pi) * width v = (np.pi/2 - phi) / np.pi * height - + # Clamp coordinates u = np.clip(u, 0, width - 1) v = np.clip(v, 0, height - 1) - + # Create maps for remapping u_map = u.astype(np.float32) v_map = v.astype(np.float32) - + # Apply remapping thumbnail = cv2.remap(equirect_img, u_map, v_map, cv2.INTER_LINEAR) - + # Apply circular mask thumbnail[~mask] = [0, 0, 0] # Black background - + return thumbnail def _create_perspective_maps( @@ -242,48 +243,48 @@ class Thumbnail360Generator: """Create coordinate mapping for perspective projection.""" # Create output coordinate grids y_coords, x_coords = np.mgrid[0:out_height, 0:out_width] - + # Convert to normalized device coordinates [-1, 1] x_ndc = (x_coords - out_width / 2) / (out_width / 2) y_ndc = (y_coords - out_height / 2) / (out_height / 2) - + # Apply perspective projection focal_length = 1.0 / math.tan(fov / 2) - + # Create 3D ray directions x_3d = x_ndc / focal_length y_3d = y_ndc / focal_length z_3d = np.ones_like(x_3d) - + # Normalize ray directions ray_length = np.sqrt(x_3d**2 + y_3d**2 + z_3d**2) x_3d /= ray_length y_3d /= ray_length z_3d /= ray_length - + # Apply rotation for viewing direction # Rotate by yaw (around Y axis) cos_yaw, sin_yaw = math.cos(yaw), math.sin(yaw) x_rot = x_3d * cos_yaw - z_3d * sin_yaw z_rot = x_3d * sin_yaw + z_3d * cos_yaw - + # Rotate by pitch (around X axis) cos_pitch, sin_pitch = math.cos(pitch), math.sin(pitch) y_rot = y_3d * cos_pitch - z_rot * sin_pitch z_final = y_3d * sin_pitch + z_rot * cos_pitch - + # Convert 3D coordinates to spherical theta = np.arctan2(x_rot, z_final) phi = np.arcsin(np.clip(y_rot, -1, 1)) - + # Convert spherical to equirectangular coordinates u = (theta + np.pi) / (2 * np.pi) * equirect_width v = (np.pi/2 - phi) / np.pi * equirect_height - + # Clamp to image boundaries u = np.clip(u, 0, equirect_width - 1) v = np.clip(v, 0, equirect_height - 1) - + return u.astype(np.float32), v.astype(np.float32) def generate_360_sprite_thumbnails( @@ -310,19 +311,19 @@ class Thumbnail360Generator: sprite_file = output_dir / f"{video_id}_360_{viewing_angle}_sprite.jpg" webvtt_file = output_dir / f"{video_id}_360_{viewing_angle}_sprite.webvtt" frames_dir = output_dir / "frames_360" - + # Create frames directory frames_dir.mkdir(exist_ok=True) - + try: # Get video duration probe = ffmpeg.probe(str(video_path)) duration = float(probe["format"]["duration"]) - + # Generate frames at specified intervals interval = self.config.sprite_interval timestamps = list(range(0, int(duration), interval)) - + frame_paths = [] for i, timestamp in enumerate(timestamps): # Generate 360° thumbnail for this timestamp @@ -330,16 +331,16 @@ class Thumbnail360Generator: video_path, frames_dir, timestamp, f"{video_id}_frame_{i}", projection_type, [viewing_angle] ) - + if viewing_angle in thumbnails: frame_paths.append(thumbnails[viewing_angle]) - + # Create sprite sheet from frames if frame_paths: self._create_sprite_sheet(frame_paths, sprite_file, timestamps, webvtt_file) - + return sprite_file, webvtt_file - + finally: # Clean up frame files if frames_dir.exists(): @@ -358,58 +359,58 @@ class Thumbnail360Generator: """Create sprite sheet from individual frames.""" if not frame_paths: raise EncodingError("No frames available for sprite sheet creation") - + # Load first frame to get dimensions first_frame = cv2.imread(str(frame_paths[0])) if first_frame is None: raise EncodingError(f"Failed to load first frame: {frame_paths[0]}") - + frame_height, frame_width = first_frame.shape[:2] - + # Calculate sprite sheet layout cols = 10 # 10 thumbnails per row rows = math.ceil(len(frame_paths) / cols) - + sprite_width = cols * frame_width sprite_height = rows * frame_height - + # Create sprite sheet sprite_img = np.zeros((sprite_height, sprite_width, 3), dtype=np.uint8) - + # Create WebVTT content webvtt_content = ["WEBVTT", ""] - + # Place frames in sprite sheet and create WebVTT entries - for i, (frame_path, timestamp) in enumerate(zip(frame_paths, timestamps)): + for i, (frame_path, timestamp) in enumerate(zip(frame_paths, timestamps, strict=False)): frame = cv2.imread(str(frame_path)) if frame is None: continue - + # Calculate position in sprite col = i % cols row = i // cols - + x_start = col * frame_width y_start = row * frame_height x_end = x_start + frame_width y_end = y_start + frame_height - + # Place frame in sprite sprite_img[y_start:y_end, x_start:x_end] = frame - + # Create WebVTT entry start_time = f"{timestamp//3600:02d}:{(timestamp%3600)//60:02d}:{timestamp%60:02d}.000" end_time = f"{(timestamp+1)//3600:02d}:{((timestamp+1)%3600)//60:02d}:{(timestamp+1)%60:02d}.000" - + webvtt_content.extend([ f"{start_time} --> {end_time}", f"{sprite_file.name}#xywh={x_start},{y_start},{frame_width},{frame_height}", "" ]) - + # Save sprite sheet cv2.imwrite(str(sprite_file), sprite_img, [cv2.IMWRITE_JPEG_QUALITY, 85]) - + # Save WebVTT file with open(webvtt_file, 'w') as f: - f.write('\n'.join(webvtt_content)) \ No newline at end of file + f.write('\n'.join(webvtt_content)) diff --git a/src/video_processor/tasks/compat.py b/src/video_processor/tasks/compat.py new file mode 100644 index 0000000..f8cc989 --- /dev/null +++ b/src/video_processor/tasks/compat.py @@ -0,0 +1,190 @@ +""" +Procrastinate version compatibility layer. + +This module provides compatibility between Procrastinate 2.x and 3.x versions, +allowing the codebase to work with both versions during the migration period. +""" + +from typing import Any + +import procrastinate + + +def get_procrastinate_version() -> tuple[int, int, int]: + """Get the current Procrastinate version.""" + version_str = procrastinate.__version__ + # Handle version strings like "3.0.0", "3.0.0a1", etc. + version_parts = version_str.split('.') + major = int(version_parts[0]) + minor = int(version_parts[1]) + # Handle patch versions with alpha/beta suffixes + patch_str = version_parts[2] if len(version_parts) > 2 else "0" + patch = int(''.join(c for c in patch_str if c.isdigit()) or "0") + return (major, minor, patch) + + +# Check Procrastinate version for compatibility +PROCRASTINATE_VERSION = get_procrastinate_version() +IS_PROCRASTINATE_3_PLUS = PROCRASTINATE_VERSION[0] >= 3 + + +def get_connector_class(): + """Get the appropriate connector class based on Procrastinate version.""" + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x + try: + from procrastinate import PsycopgConnector + return PsycopgConnector + except ImportError: + # Fall back to AiopgConnector if PsycopgConnector not available + from procrastinate import AiopgConnector + return AiopgConnector + else: + # Procrastinate 2.x + from procrastinate import AiopgConnector + return AiopgConnector + + +def create_connector(database_url: str, **kwargs): + """Create a database connector compatible with the current Procrastinate version.""" + connector_class = get_connector_class() + + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x uses different parameter names + if connector_class.__name__ == "PsycopgConnector": + # PsycopgConnector uses 'conninfo' parameter (preferred in 3.5.x) + # Default to better pool settings for 3.5.2 + default_kwargs = { + "pool_size": 10, + "max_pool_size": 20, + } + default_kwargs.update(kwargs) + return connector_class(conninfo=database_url, **default_kwargs) + else: + # AiopgConnector fallback + return connector_class(conninfo=database_url, **kwargs) + else: + # Procrastinate 2.x (legacy support) + return connector_class(conninfo=database_url, **kwargs) + + +def create_app_with_connector(database_url: str, **connector_kwargs) -> procrastinate.App: + """Create a Procrastinate App with the appropriate connector.""" + connector = create_connector(database_url, **connector_kwargs) + return procrastinate.App(connector=connector) + + +class CompatJobContext: + """ + Job context compatibility wrapper to handle differences between versions. + """ + + def __init__(self, job_context): + self._context = job_context + self._version = PROCRASTINATE_VERSION + + def should_abort(self) -> bool: + """Check if the job should abort (compatible across versions).""" + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x + return self._context.should_abort() + else: + # Procrastinate 2.x + if hasattr(self._context, 'should_abort'): + return self._context.should_abort() + else: + # Fallback for older versions + return False + + async def should_abort_async(self) -> bool: + """Check if the job should abort asynchronously.""" + if IS_PROCRASTINATE_3_PLUS: + # In 3.x, should_abort() works for both sync and async + return self.should_abort() + else: + # Procrastinate 2.x + if hasattr(self._context, 'should_abort_async'): + return await self._context.should_abort_async() + else: + return self.should_abort() + + @property + def job(self): + """Access the job object.""" + return self._context.job + + @property + def task(self): + """Access the task object.""" + return self._context.task + + def __getattr__(self, name): + """Delegate other attributes to the wrapped context.""" + return getattr(self._context, name) + + +def get_migration_commands() -> dict[str, str]: + """Get migration commands for the current Procrastinate version.""" + if IS_PROCRASTINATE_3_PLUS: + return { + "pre_migrate": "procrastinate schema --apply --mode=pre", + "post_migrate": "procrastinate schema --apply --mode=post", + "check": "procrastinate schema --check", + } + else: + return { + "migrate": "procrastinate schema --apply", + "check": "procrastinate schema --check", + } + + +def get_worker_options_mapping() -> dict[str, str]: + """Get mapping of worker options between versions.""" + if IS_PROCRASTINATE_3_PLUS: + return { + "timeout": "fetch_job_polling_interval", # Renamed in 3.x + "remove_error": "remove_failed", # Renamed in 3.x + "include_error": "include_failed", # Renamed in 3.x + } + else: + return { + "timeout": "timeout", + "remove_error": "remove_error", + "include_error": "include_error", + } + + +def normalize_worker_kwargs(**kwargs) -> dict[str, Any]: + """Normalize worker keyword arguments for the current version.""" + mapping = get_worker_options_mapping() + normalized = {} + + for key, value in kwargs.items(): + # Map old names to new names if needed + normalized_key = mapping.get(key, key) + normalized[normalized_key] = value + + return normalized + + +# Version-specific feature flags +FEATURES = { + "graceful_shutdown": IS_PROCRASTINATE_3_PLUS, + "job_cancellation": IS_PROCRASTINATE_3_PLUS, + "pre_post_migrations": IS_PROCRASTINATE_3_PLUS, + "psycopg3_support": IS_PROCRASTINATE_3_PLUS, + "improved_performance": PROCRASTINATE_VERSION >= (3, 5, 0), # Performance improvements in 3.5+ + "schema_compatibility": PROCRASTINATE_VERSION >= (3, 5, 2), # Better schema support in 3.5.2 + "enhanced_indexing": PROCRASTINATE_VERSION >= (3, 5, 0), # Improved indexes in 3.5+ +} + + +def get_version_info() -> dict[str, Any]: + """Get version and feature information.""" + return { + "procrastinate_version": procrastinate.__version__, + "version_tuple": PROCRASTINATE_VERSION, + "is_v3_plus": IS_PROCRASTINATE_3_PLUS, + "features": FEATURES, + "migration_commands": get_migration_commands(), + } diff --git a/src/video_processor/tasks/migration.py b/src/video_processor/tasks/migration.py new file mode 100644 index 0000000..6afac17 --- /dev/null +++ b/src/video_processor/tasks/migration.py @@ -0,0 +1,253 @@ +""" +Procrastinate migration utilities for upgrading from 2.x to 3.x. + +This module provides utilities to help with database migrations and +version compatibility during the upgrade process. +""" + +import logging +import subprocess +import sys + +from .compat import ( + IS_PROCRASTINATE_3_PLUS, + get_migration_commands, + get_version_info, +) + +logger = logging.getLogger(__name__) + + +class ProcrastinateMigrationHelper: + """Helper class for managing Procrastinate migrations.""" + + def __init__(self, database_url: str): + self.database_url = database_url + self.version_info = get_version_info() + + def get_migration_steps(self) -> list[str]: + """Get the migration steps for the current version.""" + commands = get_migration_commands() + + if IS_PROCRASTINATE_3_PLUS: + return [ + "1. Apply pre-migrations before deploying new code", + f" Command: {commands['pre_migrate']}", + "2. Deploy new application code", + "3. Apply post-migrations after deployment", + f" Command: {commands['post_migrate']}", + "4. Verify schema is current", + f" Command: {commands['check']}", + ] + else: + return [ + "1. Apply database migrations", + f" Command: {commands['migrate']}", + "2. Verify schema is current", + f" Command: {commands['check']}", + ] + + def print_migration_plan(self) -> None: + """Print the migration plan for the current version.""" + print(f"Procrastinate Migration Plan (v{self.version_info['procrastinate_version']})") + print("=" * 60) + + for step in self.get_migration_steps(): + print(step) + + print("\nVersion Info:") + print(f" Current Version: {self.version_info['procrastinate_version']}") + print(f" Is 3.x+: {self.version_info['is_v3_plus']}") + print(f" Features Available: {list(self.version_info['features'].keys())}") + + def run_migration_command(self, command: str) -> bool: + """ + Run a migration command. + + Args: + command: The command to run + + Returns: + True if successful, False otherwise + """ + try: + logger.info(f"Running migration command: {command}") + + # Set environment variable for database URL + env = {"PROCRASTINATE_DATABASE_URL": self.database_url} + + result = subprocess.run( + command.split(), + env={**dict(sys.environ), **env}, + capture_output=True, + text=True, + check=True + ) + + if result.stdout: + logger.info(f"Migration output: {result.stdout}") + + logger.info("Migration command completed successfully") + return True + + except subprocess.CalledProcessError as e: + logger.error(f"Migration command failed: {e}") + if e.stdout: + logger.error(f"stdout: {e.stdout}") + if e.stderr: + logger.error(f"stderr: {e.stderr}") + return False + + def apply_pre_migration(self) -> bool: + """Apply pre-migration for Procrastinate 3.x.""" + if not IS_PROCRASTINATE_3_PLUS: + logger.warning("Pre-migration only applicable to Procrastinate 3.x+") + return True + + commands = get_migration_commands() + return self.run_migration_command(commands["pre_migrate"]) + + def apply_post_migration(self) -> bool: + """Apply post-migration for Procrastinate 3.x.""" + if not IS_PROCRASTINATE_3_PLUS: + logger.warning("Post-migration only applicable to Procrastinate 3.x+") + return True + + commands = get_migration_commands() + return self.run_migration_command(commands["post_migrate"]) + + def apply_legacy_migration(self) -> bool: + """Apply legacy migration for Procrastinate 2.x.""" + if IS_PROCRASTINATE_3_PLUS: + logger.warning("Legacy migration only applicable to Procrastinate 2.x") + return True + + commands = get_migration_commands() + return self.run_migration_command(commands["migrate"]) + + def check_schema(self) -> bool: + """Check if the database schema is current.""" + commands = get_migration_commands() + return self.run_migration_command(commands["check"]) + + +async def migrate_database( + database_url: str, + pre_migration_only: bool = False, + post_migration_only: bool = False, +) -> bool: + """ + Migrate the Procrastinate database schema. + + Args: + database_url: Database connection string + pre_migration_only: Only apply pre-migration (for 3.x) + post_migration_only: Only apply post-migration (for 3.x) + + Returns: + True if successful, False otherwise + """ + helper = ProcrastinateMigrationHelper(database_url) + + logger.info("Starting Procrastinate database migration") + helper.print_migration_plan() + + try: + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x migration process + if pre_migration_only: + success = helper.apply_pre_migration() + elif post_migration_only: + success = helper.apply_post_migration() + else: + # Apply both pre and post migrations + logger.warning( + "Applying both pre and post migrations. " + "In production, these should be run separately!" + ) + success = ( + helper.apply_pre_migration() and + helper.apply_post_migration() + ) + else: + # Procrastinate 2.x migration process + success = helper.apply_legacy_migration() + + if success: + # Verify schema is current + success = helper.check_schema() + + if success: + logger.info("Database migration completed successfully") + else: + logger.error("Database migration failed") + + return success + + except Exception as e: + logger.error(f"Migration error: {e}") + return False + + +def create_migration_script() -> str: + """Create a migration script for the current environment.""" + version_info = get_version_info() + + script = f"""#!/usr/bin/env python3 +\"\"\" +Procrastinate migration script for version {version_info['procrastinate_version']} + +This script helps migrate your Procrastinate database schema. +\"\"\" + +import asyncio +import os +import sys + +# Add the project root to Python path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from video_processor.tasks.migration import migrate_database + + +async def main(): + database_url = os.environ.get( + 'PROCRASTINATE_DATABASE_URL', + 'postgresql://localhost/procrastinate_dev' + ) + + print(f"Migrating database: {{database_url}}") + + # Parse command line arguments + pre_only = '--pre' in sys.argv + post_only = '--post' in sys.argv + + success = await migrate_database( + database_url=database_url, + pre_migration_only=pre_only, + post_migration_only=post_only, + ) + + if not success: + print("Migration failed!") + sys.exit(1) + + print("Migration completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) +""" + + return script + + +if __name__ == "__main__": + # Generate migration script when run directly + script_content = create_migration_script() + + with open("migrate_procrastinate.py", "w") as f: + f.write(script_content) + + print("Generated migration script: migrate_procrastinate.py") + print("Run with: python migrate_procrastinate.py [--pre|--post]") diff --git a/src/video_processor/tasks/procrastinate_tasks.py b/src/video_processor/tasks/procrastinate_tasks.py index 713f3ac..00ba28c 100644 --- a/src/video_processor/tasks/procrastinate_tasks.py +++ b/src/video_processor/tasks/procrastinate_tasks.py @@ -8,6 +8,11 @@ from procrastinate import App from ..config import ProcessorConfig from ..core.processor import VideoProcessor from ..exceptions import VideoProcessorError +from .compat import ( + create_app_with_connector, + get_version_info, + normalize_worker_kwargs, +) logger = logging.getLogger(__name__) @@ -15,24 +20,45 @@ logger = logging.getLogger(__name__) app = App(connector=None) # Connector will be set during setup -def setup_procrastinate(database_url: str) -> App: +def setup_procrastinate( + database_url: str, + connector_kwargs: dict | None = None, +) -> App: """ Set up Procrastinate with database connection. Args: database_url: PostgreSQL connection string + connector_kwargs: Additional connector configuration Returns: Configured Procrastinate app """ - from procrastinate import AiopgConnector + connector_kwargs = connector_kwargs or {} - connector = AiopgConnector(conninfo=database_url) - app.connector = connector + # Use compatibility layer to create app with appropriate connector + configured_app = create_app_with_connector(database_url, **connector_kwargs) + # Update the global app instance + app.connector = configured_app.connector + + logger.info(f"Procrastinate setup complete. Version info: {get_version_info()}") return app +def get_worker_kwargs(**kwargs) -> dict: + """ + Get normalized worker kwargs for the current Procrastinate version. + + Args: + **kwargs: Worker configuration options + + Returns: + Normalized kwargs for the current version + """ + return normalize_worker_kwargs(**kwargs) + + @app.task(queue="video_processing") def process_video_async( input_path: str, diff --git a/src/video_processor/tasks/worker_compatibility.py b/src/video_processor/tasks/worker_compatibility.py new file mode 100644 index 0000000..a48579e --- /dev/null +++ b/src/video_processor/tasks/worker_compatibility.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +""" +Worker compatibility module for Procrastinate 2.x and 3.x. + +Provides a unified worker interface that works across different Procrastinate versions. +""" + +import asyncio +import logging +import os +import sys +from typing import Optional + +from .compat import ( + IS_PROCRASTINATE_3_PLUS, + create_app_with_connector, + get_version_info, + map_worker_options, +) + +logger = logging.getLogger(__name__) + + +def setup_worker_app(database_url: str, connector_kwargs: Optional[dict] = None): + """Set up Procrastinate app for worker usage.""" + connector_kwargs = connector_kwargs or {} + + # Create app with proper connector + app = create_app_with_connector(database_url, **connector_kwargs) + + # Import tasks to register them + from . import procrastinate_tasks # noqa: F401 + + logger.info(f"Worker app setup complete. {get_version_info()}") + return app + + +async def run_worker_async( + database_url: str, + queues: Optional[list[str]] = None, + concurrency: int = 1, + **worker_kwargs, +): + """Run Procrastinate worker with version compatibility.""" + logger.info(f"Starting Procrastinate worker (v{get_version_info()['procrastinate_version']})") + + # Set up the app + app = setup_worker_app(database_url) + + # Map worker options for compatibility + mapped_options = map_worker_options(worker_kwargs) + + # Default queues + if queues is None: + queues = ["video_processing", "thumbnail_generation", "default"] + + logger.info(f"Worker config: queues={queues}, concurrency={concurrency}") + logger.info(f"Worker options: {mapped_options}") + + try: + if IS_PROCRASTINATE_3_PLUS: + # Procrastinate 3.x worker + async with app.open_async() as app_context: + worker = app_context.make_worker( + queues=queues, + concurrency=concurrency, + **mapped_options, + ) + await worker.async_run() + else: + # Procrastinate 2.x worker + worker = app.make_worker( + queues=queues, + concurrency=concurrency, + **mapped_options, + ) + await worker.async_run() + + except KeyboardInterrupt: + logger.info("Worker stopped by user") + except Exception as e: + logger.error(f"Worker error: {e}") + raise + + +def run_worker_sync( + database_url: str, + queues: Optional[list[str]] = None, + concurrency: int = 1, + **worker_kwargs, +): + """Synchronous wrapper for running the worker.""" + try: + asyncio.run( + run_worker_async( + database_url=database_url, + queues=queues, + concurrency=concurrency, + **worker_kwargs, + ) + ) + except KeyboardInterrupt: + logger.info("Worker interrupted") + sys.exit(0) + + +def main(): + """Main entry point for worker CLI.""" + import argparse + + parser = argparse.ArgumentParser(description="Procrastinate Worker") + parser.add_argument("command", choices=["worker"], help="Command to run") + parser.add_argument( + "--database-url", + default=os.environ.get("PROCRASTINATE_DATABASE_URL"), + help="Database URL", + ) + parser.add_argument( + "--queues", + nargs="*", + default=["video_processing", "thumbnail_generation", "default"], + help="Queue names to process", + ) + parser.add_argument( + "--concurrency", + type=int, + default=int(os.environ.get("WORKER_CONCURRENCY", "1")), + help="Worker concurrency", + ) + parser.add_argument( + "--timeout", + type=int, + default=int(os.environ.get("WORKER_TIMEOUT", "300")), + help="Worker timeout (maps to fetch_job_polling_interval in 3.x)", + ) + + args = parser.parse_args() + + if not args.database_url: + logger.error("Database URL is required (--database-url or PROCRASTINATE_DATABASE_URL)") + sys.exit(1) + + logger.info(f"Starting {args.command} with database: {args.database_url}") + + if args.command == "worker": + run_worker_sync( + database_url=args.database_url, + queues=args.queues, + concurrency=args.concurrency, + timeout=args.timeout, + ) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + main() \ No newline at end of file diff --git a/src/video_processor/utils/sprite_generator.py b/src/video_processor/utils/sprite_generator.py new file mode 100644 index 0000000..e2ef2a5 --- /dev/null +++ b/src/video_processor/utils/sprite_generator.py @@ -0,0 +1,184 @@ +"""Custom sprite generator that fixes msprites2 ImageMagick compatibility issues.""" + +import logging +import os +import subprocess +import time +from pathlib import Path + +logger = logging.getLogger(__name__) + + +class FixedSpriteGenerator: + """Fixed sprite generator with proper ImageMagick compatibility.""" + + def __init__( + self, + video_path: str | Path, + thumbnail_dir: str | Path, + ips: float = 1.0, + width: int = 160, + height: int = 90, + cols: int = 10, + rows: int = 10, + ): + self.video_path = str(video_path) + self.thumbnail_dir = str(thumbnail_dir) + self.ips = ips + self.width = width + self.height = height + self.cols = cols + self.rows = rows + self.filename_format = "%04d.jpg" + + # Create thumbnail directory if it doesn't exist + Path(self.thumbnail_dir).mkdir(parents=True, exist_ok=True) + + def generate_thumbnails(self) -> None: + """Generate individual thumbnail frames using ffmpeg.""" + output_pattern = os.path.join(self.thumbnail_dir, self.filename_format) + + # Use ffmpeg to extract thumbnails + cmd = [ + "ffmpeg", "-loglevel", "error", "-i", self.video_path, + "-r", f"1/{self.ips}", + "-vf", f"scale={self.width}:{self.height}", + "-y", # Overwrite existing files + output_pattern + ] + + logger.debug(f"Generating thumbnails with: {' '.join(cmd)}") + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + if result.returncode != 0: + raise RuntimeError(f"FFmpeg failed: {result.stderr}") + + def generate_sprite(self, sprite_file: str | Path) -> Path: + """Generate sprite sheet using ImageMagick montage.""" + sprite_file = Path(sprite_file) + + # Count available thumbnails + thumbnail_files = list(Path(self.thumbnail_dir).glob("*.jpg")) + if not thumbnail_files: + raise RuntimeError("No thumbnail files found to create sprite") + + # Sort thumbnails by name to ensure correct order + thumbnail_files.sort() + + # Limit number of thumbnails to avoid command line length issues + max_thumbnails = min(len(thumbnail_files), 100) # Limit to 100 thumbnails + thumbnail_files = thumbnail_files[:max_thumbnails] + + # Build montage command with correct syntax + cmd = [ + "magick", "montage", + "-background", "#336699", + "-tile", f"{self.cols}x{self.rows}", + "-geometry", f"{self.width}x{self.height}+0+0", + ] + + # Add thumbnail files + cmd.extend(str(f) for f in thumbnail_files) + cmd.append(str(sprite_file)) + + logger.debug(f"Generating sprite with {len(thumbnail_files)} thumbnails: {sprite_file}") + result = subprocess.run(cmd, check=False) + + if result.returncode != 0: + raise RuntimeError(f"ImageMagick montage failed with return code {result.returncode}") + + return sprite_file + + def generate_webvtt(self, webvtt_file: str | Path, sprite_filename: str) -> Path: + """Generate WebVTT file for seekbar thumbnails.""" + webvtt_file = Path(webvtt_file) + + # Count thumbnail files to determine timeline + thumbnail_files = list(Path(self.thumbnail_dir).glob("*.jpg")) + thumbnail_files.sort() + + content_lines = ["WEBVTT\n\n"] + + for i, _ in enumerate(thumbnail_files): + start_time = i * self.ips + end_time = (i + 1) * self.ips + + # Calculate position in sprite grid + row = i // self.cols + col = i % self.cols + x = col * self.width + y = row * self.height + + # Format timestamps + start_ts = self._seconds_to_timestamp(start_time) + end_ts = self._seconds_to_timestamp(end_time) + + content_lines.extend([ + f"{start_ts} --> {end_ts}\n", + f"{sprite_filename}#xywh={x},{y},{self.width},{self.height}\n\n" + ]) + + # Write WebVTT content + with open(webvtt_file, 'w') as f: + f.writelines(content_lines) + + return webvtt_file + + def _seconds_to_timestamp(self, seconds: float) -> str: + """Convert seconds to WebVTT timestamp format.""" + return time.strftime("%H:%M:%S", time.gmtime(seconds)) + + def cleanup_thumbnails(self) -> None: + """Remove temporary thumbnail files.""" + try: + thumbnail_files = list(Path(self.thumbnail_dir).glob("*.jpg")) + for thumb_file in thumbnail_files: + thumb_file.unlink() + + # Remove directory if empty + thumb_dir = Path(self.thumbnail_dir) + if thumb_dir.exists() and not any(thumb_dir.iterdir()): + thumb_dir.rmdir() + except Exception as e: + logger.warning(f"Failed to cleanup thumbnails: {e}") + + @classmethod + def create_sprite_sheet( + cls, + video_path: str | Path, + thumbnail_dir: str | Path, + sprite_file: str | Path, + webvtt_file: str | Path, + ips: float = 1.0, + width: int = 160, + height: int = 90, + cols: int = 10, + rows: int = 10, + cleanup: bool = True, + ) -> tuple[Path, Path]: + """ + Complete sprite sheet generation process. + + Returns: + Tuple of (sprite_file_path, webvtt_file_path) + """ + generator = cls( + video_path=video_path, + thumbnail_dir=thumbnail_dir, + ips=ips, + width=width, + height=height, + cols=cols, + rows=rows, + ) + + # Generate components + generator.generate_thumbnails() + sprite_path = generator.generate_sprite(sprite_file) + webvtt_path = generator.generate_webvtt(webvtt_file, Path(sprite_file).name) + + # Cleanup temporary thumbnails if requested (but not the final sprite/webvtt) + if cleanup: + generator.cleanup_thumbnails() + + return sprite_path, webvtt_path diff --git a/src/video_processor/utils/video_360.py b/src/video_processor/utils/video_360.py index cf17be7..6b1dc8a 100644 --- a/src/video_processor/utils/video_360.py +++ b/src/video_processor/utils/video_360.py @@ -1,6 +1,5 @@ """360° video detection and utility functions.""" -from pathlib import Path from typing import Any, Literal # Optional dependency handling @@ -38,7 +37,7 @@ StereoMode = Literal["mono", "top-bottom", "left-right", "unknown"] class Video360Detection: """Utilities for detecting and analyzing 360° videos.""" - + @staticmethod def detect_360_video(video_metadata: dict[str, Any]) -> dict[str, Any]: """ @@ -57,7 +56,7 @@ class Video360Detection: "confidence": 0.0, "detection_methods": [], } - + # Check for spherical video metadata (Google/YouTube standard) spherical_metadata = Video360Detection._check_spherical_metadata(video_metadata) if spherical_metadata["found"]: @@ -68,7 +67,7 @@ class Video360Detection: "confidence": 1.0, }) detection_result["detection_methods"].append("spherical_metadata") - + # Check aspect ratio for equirectangular projection aspect_ratio_check = Video360Detection._check_aspect_ratio(video_metadata) if aspect_ratio_check["is_likely_360"]: @@ -79,7 +78,7 @@ class Video360Detection: "confidence": aspect_ratio_check["confidence"], }) detection_result["detection_methods"].append("aspect_ratio") - + # Check filename patterns filename_check = Video360Detection._check_filename_patterns(video_metadata) if filename_check["is_likely_360"]: @@ -90,9 +89,9 @@ class Video360Detection: "confidence": filename_check["confidence"], }) detection_result["detection_methods"].append("filename") - + return detection_result - + @staticmethod def _check_spherical_metadata(metadata: dict[str, Any]) -> dict[str, Any]: """Check for spherical video metadata tags.""" @@ -101,14 +100,14 @@ class Video360Detection: "projection_type": "equirectangular", "stereo_mode": "mono", } - + # Check format tags for spherical metadata format_tags = metadata.get("format", {}).get("tags", {}) - + # Google spherical video standard if "spherical" in format_tags: result["found"] = True - + # Check for specific spherical video tags spherical_indicators = [ "Spherical", @@ -117,11 +116,11 @@ class Video360Detection: "ProjectionType", "projection_type", ] - + for tag_name, tag_value in format_tags.items(): if any(indicator.lower() in tag_name.lower() for indicator in spherical_indicators): result["found"] = True - + # Determine projection type from metadata if isinstance(tag_value, str): tag_lower = tag_value.lower() @@ -129,7 +128,7 @@ class Video360Detection: result["projection_type"] = "equirectangular" elif "cubemap" in tag_lower: result["projection_type"] = "cubemap" - + # Check for stereo mode indicators stereo_indicators = ["StereoMode", "stereo_mode", "StereoscopicMode"] for tag_name, tag_value in format_tags.items(): @@ -140,9 +139,9 @@ class Video360Detection: result["stereo_mode"] = "top-bottom" elif "left-right" in tag_lower or "lr" in tag_lower: result["stereo_mode"] = "left-right" - + return result - + @staticmethod def _check_aspect_ratio(metadata: dict[str, Any]) -> dict[str, Any]: """Check if aspect ratio suggests 360° video.""" @@ -150,28 +149,28 @@ class Video360Detection: "is_likely_360": False, "confidence": 0.0, } - + video_info = metadata.get("video", {}) if not video_info: return result - + width = video_info.get("width", 0) height = video_info.get("height", 0) - + if width <= 0 or height <= 0: return result - + aspect_ratio = width / height - + # Equirectangular videos typically have 2:1 aspect ratio if 1.9 <= aspect_ratio <= 2.1: result["is_likely_360"] = True result["confidence"] = 0.8 - + # Higher confidence for exact 2:1 ratio if 1.98 <= aspect_ratio <= 2.02: result["confidence"] = 0.9 - + # Some 360° videos use different aspect ratios elif 1.5 <= aspect_ratio <= 2.5: # Common resolutions for 360° video @@ -182,16 +181,16 @@ class Video360Detection: (4096, 2048), # Cinema 4K 360° (5760, 2880), # 6K 360° ] - + for res_width, res_height in common_360_resolutions: if (width == res_width and height == res_height) or \ (width == res_height and height == res_width): result["is_likely_360"] = True result["confidence"] = 0.7 break - + return result - + @staticmethod def _check_filename_patterns(metadata: dict[str, Any]) -> dict[str, Any]: """Check filename for 360° indicators.""" @@ -200,31 +199,31 @@ class Video360Detection: "projection_type": "equirectangular", "confidence": 0.0, } - + filename = metadata.get("filename", "").lower() if not filename: return result - + # Common 360° filename patterns patterns_360 = [ - "360", "vr", "spherical", "equirectangular", + "360", "vr", "spherical", "equirectangular", "panoramic", "immersive", "omnidirectional" ] - + # Projection type patterns projection_patterns = { "equirectangular": ["equirect", "equi", "spherical"], "cubemap": ["cube", "cubemap", "cubic"], "cylindrical": ["cylindrical", "cylinder"], } - + # Check for 360° indicators for pattern in patterns_360: if pattern in filename: result["is_likely_360"] = True result["confidence"] = 0.6 break - + # Check for specific projection types if result["is_likely_360"]: for projection, patterns in projection_patterns.items(): @@ -232,13 +231,13 @@ class Video360Detection: result["projection_type"] = projection result["confidence"] = 0.7 break - + return result class Video360Utils: """Utility functions for 360° video processing.""" - + @staticmethod def get_recommended_bitrate_multiplier(projection_type: ProjectionType) -> float: """ @@ -260,9 +259,9 @@ class Video360Utils: "stereographic": 2.2, # Good balance "unknown": 2.0, # Safe default } - + return multipliers.get(projection_type, 2.0) - + @staticmethod def get_optimal_resolutions(projection_type: ProjectionType) -> list[tuple[int, int]]: """ @@ -290,29 +289,29 @@ class Video360Utils: (4096, 4096), # 4K per face ], } - + return resolutions.get(projection_type, resolutions["equirectangular"]) - + @staticmethod def is_360_library_available() -> bool: """Check if 360° processing libraries are available.""" return HAS_360_SUPPORT - + @staticmethod def get_missing_dependencies() -> list[str]: """Get list of missing dependencies for 360° processing.""" missing = [] - + if not HAS_OPENCV: missing.append("opencv-python") - + if not HAS_NUMPY: missing.append("numpy") - + if not HAS_PY360CONVERT: missing.append("py360convert") - + if not HAS_EXIFREAD: missing.append("exifread") - - return missing \ No newline at end of file + + return missing diff --git a/tests/test_procrastinate_compat.py b/tests/test_procrastinate_compat.py new file mode 100644 index 0000000..8dcb6d7 --- /dev/null +++ b/tests/test_procrastinate_compat.py @@ -0,0 +1,314 @@ +"""Tests for Procrastinate compatibility layer.""" + +import pytest + +from video_processor.tasks.compat import ( + CompatJobContext, + FEATURES, + IS_PROCRASTINATE_3_PLUS, + PROCRASTINATE_VERSION, + create_app_with_connector, + create_connector, + get_migration_commands, + get_procrastinate_version, + get_version_info, + get_worker_options_mapping, + normalize_worker_kwargs, +) + + +class TestProcrastinateVersionDetection: + """Test version detection functionality.""" + + def test_version_parsing(self): + """Test version string parsing.""" + version = get_procrastinate_version() + assert isinstance(version, tuple) + assert len(version) == 3 + assert all(isinstance(v, int) for v in version) + assert version[0] >= 2 # Should be at least version 2.x + + def test_version_flags(self): + """Test version-specific flags.""" + assert isinstance(IS_PROCRASTINATE_3_PLUS, bool) + assert isinstance(PROCRASTINATE_VERSION, tuple) + + if PROCRASTINATE_VERSION[0] >= 3: + assert IS_PROCRASTINATE_3_PLUS is True + else: + assert IS_PROCRASTINATE_3_PLUS is False + + def test_version_info(self): + """Test version info structure.""" + info = get_version_info() + + required_keys = { + "procrastinate_version", + "version_tuple", + "is_v3_plus", + "features", + "migration_commands", + } + + assert set(info.keys()) == required_keys + assert isinstance(info["version_tuple"], tuple) + assert isinstance(info["is_v3_plus"], bool) + assert isinstance(info["features"], dict) + assert isinstance(info["migration_commands"], dict) + + def test_features(self): + """Test feature flags.""" + assert isinstance(FEATURES, dict) + + expected_features = { + "graceful_shutdown", + "job_cancellation", + "pre_post_migrations", + "psycopg3_support", + "improved_performance", + "schema_compatibility", + "enhanced_indexing", + } + + assert set(FEATURES.keys()) == expected_features + assert all(isinstance(v, bool) for v in FEATURES.values()) + + +class TestConnectorCreation: + """Test connector creation functionality.""" + + def test_connector_class_selection(self): + """Test that appropriate connector class is selected.""" + from video_processor.tasks.compat import get_connector_class + + connector_class = get_connector_class() + assert connector_class is not None + assert hasattr(connector_class, "__name__") + + if IS_PROCRASTINATE_3_PLUS: + # Should prefer PsycopgConnector in 3.x + assert connector_class.__name__ in ["PsycopgConnector", "AiopgConnector"] + else: + assert connector_class.__name__ == "AiopgConnector" + + def test_connector_creation(self): + """Test connector creation with various parameters.""" + database_url = "postgresql://test:test@localhost/test" + + # Test basic creation + connector = create_connector(database_url) + assert connector is not None + + # Test with additional kwargs + connector_with_kwargs = create_connector( + database_url, + pool_size=5, + max_pool_size=10, + ) + assert connector_with_kwargs is not None + + def test_app_creation(self): + """Test Procrastinate app creation.""" + database_url = "postgresql://test:test@localhost/test" + + app = create_app_with_connector(database_url) + assert app is not None + assert hasattr(app, 'connector') + assert app.connector is not None + + +class TestWorkerOptions: + """Test worker options compatibility.""" + + def test_option_mapping(self): + """Test worker option mapping between versions.""" + mapping = get_worker_options_mapping() + assert isinstance(mapping, dict) + + if IS_PROCRASTINATE_3_PLUS: + expected_mappings = { + "timeout": "fetch_job_polling_interval", + "remove_error": "remove_failed", + "include_error": "include_failed", + } + assert mapping == expected_mappings + else: + # In 2.x, mappings should be identity + assert mapping["timeout"] == "timeout" + assert mapping["remove_error"] == "remove_error" + + def test_kwargs_normalization(self): + """Test worker kwargs normalization.""" + test_kwargs = { + "concurrency": 4, + "timeout": 5, + "remove_error": True, + "include_error": False, + "name": "test-worker", + } + + normalized = normalize_worker_kwargs(**test_kwargs) + + assert isinstance(normalized, dict) + assert normalized["concurrency"] == 4 + assert normalized["name"] == "test-worker" + + if IS_PROCRASTINATE_3_PLUS: + assert "fetch_job_polling_interval" in normalized + assert "remove_failed" in normalized + assert "include_failed" in normalized + assert normalized["fetch_job_polling_interval"] == 5 + assert normalized["remove_failed"] is True + assert normalized["include_failed"] is False + else: + assert normalized["timeout"] == 5 + assert normalized["remove_error"] is True + assert normalized["include_error"] is False + + def test_kwargs_passthrough(self): + """Test that unknown kwargs are passed through unchanged.""" + test_kwargs = { + "custom_option": "value", + "another_option": 42, + } + + normalized = normalize_worker_kwargs(**test_kwargs) + assert normalized == test_kwargs + + +class TestMigrationCommands: + """Test migration command generation.""" + + def test_migration_commands_structure(self): + """Test migration command structure.""" + commands = get_migration_commands() + assert isinstance(commands, dict) + + if IS_PROCRASTINATE_3_PLUS: + expected_keys = {"pre_migrate", "post_migrate", "check"} + assert set(commands.keys()) == expected_keys + + assert "procrastinate schema --apply --mode=pre" in commands["pre_migrate"] + assert "procrastinate schema --apply --mode=post" in commands["post_migrate"] + else: + expected_keys = {"migrate", "check"} + assert set(commands.keys()) == expected_keys + + assert "procrastinate schema --apply" == commands["migrate"] + + assert "procrastinate schema --check" == commands["check"] + + +class TestJobContextCompat: + """Test job context compatibility wrapper.""" + + def test_compat_context_creation(self): + """Test creation of compatibility context.""" + # Create a mock context object + class MockContext: + def __init__(self): + self.job = "mock_job" + self.task = "mock_task" + + def should_abort(self): + return False + + async def should_abort_async(self): + return False + + mock_context = MockContext() + compat_context = CompatJobContext(mock_context) + + assert compat_context is not None + assert compat_context.job == "mock_job" + assert compat_context.task == "mock_task" + + def test_should_abort_methods(self): + """Test should_abort method compatibility.""" + class MockContext: + def should_abort(self): + return True + + async def should_abort_async(self): + return True + + mock_context = MockContext() + compat_context = CompatJobContext(mock_context) + + # Test synchronous method + assert compat_context.should_abort() is True + + @pytest.mark.asyncio + async def test_should_abort_async(self): + """Test async should_abort method.""" + class MockContext: + def should_abort(self): + return True + + async def should_abort_async(self): + return True + + mock_context = MockContext() + compat_context = CompatJobContext(mock_context) + + # Test asynchronous method + result = await compat_context.should_abort_async() + assert result is True + + def test_attribute_delegation(self): + """Test that unknown attributes are delegated to wrapped context.""" + class MockContext: + def __init__(self): + self.custom_attr = "custom_value" + + def custom_method(self): + return "custom_result" + + mock_context = MockContext() + compat_context = CompatJobContext(mock_context) + + assert compat_context.custom_attr == "custom_value" + assert compat_context.custom_method() == "custom_result" + + +class TestIntegration: + """Integration tests for compatibility features.""" + + def test_full_compatibility_workflow(self): + """Test complete compatibility workflow.""" + # Get version info + version_info = get_version_info() + assert version_info["is_v3_plus"] == IS_PROCRASTINATE_3_PLUS + + # Test worker options + worker_kwargs = normalize_worker_kwargs( + concurrency=2, + timeout=10, + remove_error=False, + ) + assert "concurrency" in worker_kwargs + + # Test migration commands + migration_commands = get_migration_commands() + assert "check" in migration_commands + + if IS_PROCRASTINATE_3_PLUS: + assert "pre_migrate" in migration_commands + assert "post_migrate" in migration_commands + else: + assert "migrate" in migration_commands + + def test_version_specific_behavior(self): + """Test that version-specific behavior is consistent.""" + version_info = get_version_info() + + if version_info["is_v3_plus"]: + # Test 3.x specific features + assert FEATURES["graceful_shutdown"] is True + assert FEATURES["job_cancellation"] is True + assert FEATURES["pre_post_migrations"] is True + else: + # Test 2.x behavior + assert FEATURES["graceful_shutdown"] is False + assert FEATURES["job_cancellation"] is False + assert FEATURES["pre_post_migrations"] is False \ No newline at end of file diff --git a/tests/test_procrastinate_migration.py b/tests/test_procrastinate_migration.py new file mode 100644 index 0000000..e4925a1 --- /dev/null +++ b/tests/test_procrastinate_migration.py @@ -0,0 +1,216 @@ +"""Tests for Procrastinate migration utilities.""" + +import pytest + +from video_processor.tasks.migration import ProcrastinateMigrationHelper, create_migration_script +from video_processor.tasks.compat import IS_PROCRASTINATE_3_PLUS + + +class TestProcrastinateMigrationHelper: + """Test migration helper functionality.""" + + def test_migration_helper_creation(self): + """Test migration helper initialization.""" + database_url = "postgresql://test:test@localhost/test" + helper = ProcrastinateMigrationHelper(database_url) + + assert helper.database_url == database_url + assert helper.version_info is not None + assert "procrastinate_version" in helper.version_info + + def test_migration_steps_generation(self): + """Test migration steps generation.""" + helper = ProcrastinateMigrationHelper("postgresql://fake/db") + steps = helper.get_migration_steps() + + assert isinstance(steps, list) + assert len(steps) > 0 + + if IS_PROCRASTINATE_3_PLUS: + # Should have pre/post migration steps + assert len(steps) >= 7 # Pre, deploy, post, verify + assert any("pre-migration" in step.lower() for step in steps) + assert any("post-migration" in step.lower() for step in steps) + else: + # Should have single migration step + assert len(steps) >= 2 # Migrate, verify + assert any("migration" in step.lower() for step in steps) + + def test_print_migration_plan(self, capsys): + """Test migration plan printing.""" + helper = ProcrastinateMigrationHelper("postgresql://fake/db") + helper.print_migration_plan() + + captured = capsys.readouterr() + assert "Procrastinate Migration Plan" in captured.out + assert "Version Info:" in captured.out + assert "Current Version:" in captured.out + + def test_migration_command_structure(self): + """Test that migration commands have correct structure.""" + helper = ProcrastinateMigrationHelper("postgresql://fake/db") + + # Test method availability + assert hasattr(helper, 'apply_pre_migration') + assert hasattr(helper, 'apply_post_migration') + assert hasattr(helper, 'apply_legacy_migration') + assert hasattr(helper, 'check_schema') + assert hasattr(helper, 'run_migration_command') + + def test_migration_command_validation(self): + """Test migration command validation without actually running.""" + helper = ProcrastinateMigrationHelper("postgresql://fake/db") + + # Test that methods return appropriate responses for invalid DB + if IS_PROCRASTINATE_3_PLUS: + # Pre-migration should be available + assert hasattr(helper, 'apply_pre_migration') + assert hasattr(helper, 'apply_post_migration') + else: + # Legacy migration should be available + assert hasattr(helper, 'apply_legacy_migration') + + +class TestMigrationScriptGeneration: + """Test migration script generation.""" + + def test_script_generation(self): + """Test that migration script is generated correctly.""" + script_content = create_migration_script() + + assert isinstance(script_content, str) + assert len(script_content) > 0 + + # Check for essential script components + assert "#!/usr/bin/env python3" in script_content + assert "Procrastinate migration script" in script_content + assert "migrate_database" in script_content + assert "asyncio" in script_content + + # Check for command line argument handling + assert "--pre" in script_content or "--post" in script_content + + def test_script_has_proper_structure(self): + """Test that generated script has proper Python structure.""" + script_content = create_migration_script() + + # Should have proper Python script structure + lines = script_content.split('\n') + + # Check shebang + assert lines[0] == "#!/usr/bin/env python3" + + # Check for main function + assert 'def main():' in script_content + + # Check for asyncio usage + assert 'asyncio.run(main())' in script_content + + +class TestMigrationWorkflow: + """Test complete migration workflow scenarios.""" + + def test_version_aware_migration_selection(self): + """Test that correct migration path is selected based on version.""" + helper = ProcrastinateMigrationHelper("postgresql://fake/db") + + if IS_PROCRASTINATE_3_PLUS: + # 3.x should use pre/post migrations + steps = helper.get_migration_steps() + step_text = ' '.join(steps).lower() + assert 'pre-migration' in step_text + assert 'post-migration' in step_text + else: + # 2.x should use legacy migration + steps = helper.get_migration_steps() + step_text = ' '.join(steps).lower() + assert 'migration' in step_text + assert 'pre-migration' not in step_text + + def test_migration_helper_consistency(self): + """Test that migration helper provides consistent information.""" + helper = ProcrastinateMigrationHelper("postgresql://fake/db") + + # Version info should be consistent + version_info = helper.version_info + steps = helper.get_migration_steps() + + assert version_info["is_v3_plus"] == IS_PROCRASTINATE_3_PLUS + + # Steps should match version + if version_info["is_v3_plus"]: + assert len(steps) > 4 # Should have multiple steps for 3.x + else: + assert len(steps) >= 2 # Should have basic steps for 2.x + + +@pytest.mark.asyncio +class TestAsyncMigration: + """Test async migration functionality.""" + + async def test_migrate_database_function_exists(self): + """Test that async migration function exists and is callable.""" + from video_processor.tasks.migration import migrate_database + + # Function should exist and be async + assert callable(migrate_database) + + # Should handle invalid database gracefully (don't actually run) + # Just test that it exists and has the right signature + import inspect + sig = inspect.signature(migrate_database) + + expected_params = ['database_url', 'pre_migration_only', 'post_migration_only'] + actual_params = list(sig.parameters.keys()) + + for param in expected_params: + assert param in actual_params + + +class TestRegressionPrevention: + """Tests to prevent regressions in migration functionality.""" + + def test_migration_helper_backwards_compatibility(self): + """Ensure migration helper maintains backwards compatibility.""" + helper = ProcrastinateMigrationHelper("postgresql://fake/db") + + # Essential methods should always exist + required_methods = [ + 'get_migration_steps', + 'print_migration_plan', + 'run_migration_command', + 'check_schema', + ] + + for method in required_methods: + assert hasattr(helper, method) + assert callable(getattr(helper, method)) + + def test_version_detection_stability(self): + """Test that version detection is stable and predictable.""" + from video_processor.tasks.compat import get_version_info, PROCRASTINATE_VERSION + + info1 = get_version_info() + info2 = get_version_info() + + # Should return consistent results + assert info1 == info2 + assert info1["version_tuple"] == PROCRASTINATE_VERSION + + def test_feature_flags_consistency(self): + """Test that feature flags are consistent with version.""" + from video_processor.tasks.compat import FEATURES, IS_PROCRASTINATE_3_PLUS + + # 3.x features should only be available in 3.x + v3_features = [ + "graceful_shutdown", + "job_cancellation", + "pre_post_migrations", + "psycopg3_support" + ] + + for feature in v3_features: + if IS_PROCRASTINATE_3_PLUS: + assert FEATURES[feature] is True, f"{feature} should be True in 3.x" + else: + assert FEATURES[feature] is False, f"{feature} should be False in 2.x" \ No newline at end of file