diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml new file mode 100644 index 0000000..a31d785 --- /dev/null +++ b/.github/workflows/integration-tests.yml @@ -0,0 +1,196 @@ +name: Integration Tests + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main ] + schedule: + # Run daily at 02:00 UTC + - cron: '0 2 * * *' + +env: + DOCKER_BUILDKIT: 1 + COMPOSE_DOCKER_CLI_BUILD: 1 + +jobs: + integration-tests: + name: Docker Integration Tests + runs-on: ubuntu-latest + timeout-minutes: 30 + + strategy: + matrix: + test-suite: + - "video_processing" + - "procrastinate_worker" + - "database_migration" + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Cache Docker layers + uses: actions/cache@v3 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-${{ github.sha }} + restore-keys: | + ${{ runner.os }}-buildx- + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y ffmpeg postgresql-client + + - name: Verify Docker and FFmpeg + run: | + docker --version + docker-compose --version + ffmpeg -version + + - name: Run integration tests + run: | + ./scripts/run-integration-tests.sh \ + --test-filter "test_${{ matrix.test-suite }}" \ + --timeout 1200 \ + --verbose + + - name: Upload test logs + if: failure() + uses: actions/upload-artifact@v3 + with: + name: integration-test-logs-${{ matrix.test-suite }} + path: test-reports/ + retention-days: 7 + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v3 + with: + name: integration-test-results-${{ matrix.test-suite }} + path: htmlcov/ + retention-days: 7 + + full-integration-test: + name: Full Integration Test Suite + runs-on: ubuntu-latest + timeout-minutes: 45 + needs: integration-tests + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y ffmpeg postgresql-client + + - name: Run complete integration test suite + run: | + ./scripts/run-integration-tests.sh \ + --timeout 2400 \ + --verbose + + - name: Generate test report + if: always() + run: | + mkdir -p test-reports + echo "# Integration Test Report" > test-reports/summary.md + echo "- Date: $(date)" >> test-reports/summary.md + echo "- Commit: ${{ github.sha }}" >> test-reports/summary.md + echo "- Branch: ${{ github.ref_name }}" >> test-reports/summary.md + + - name: Upload complete test results + if: always() + uses: actions/upload-artifact@v3 + with: + name: complete-integration-test-results + path: | + test-reports/ + htmlcov/ + retention-days: 30 + + performance-test: + name: Performance & Load Testing + runs-on: ubuntu-latest + timeout-minutes: 20 + if: github.event_name == 'schedule' || contains(github.event.pull_request.labels.*.name, 'performance') + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y ffmpeg postgresql-client + + - name: Run performance tests + run: | + ./scripts/run-integration-tests.sh \ + --test-filter "performance" \ + --timeout 1200 \ + --verbose + + - name: Upload performance results + if: always() + uses: actions/upload-artifact@v3 + with: + name: performance-test-results + path: test-reports/ + retention-days: 14 + + docker-security-scan: + name: Docker Security Scan + runs-on: ubuntu-latest + timeout-minutes: 15 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Build Docker image + run: | + docker build -t video-processor:test . + + - name: Run Trivy vulnerability scanner + uses: aquasecurity/trivy-action@master + with: + image-ref: 'video-processor:test' + format: 'sarif' + output: 'trivy-results.sarif' + + - name: Upload Trivy scan results + uses: github/codeql-action/upload-sarif@v2 + if: always() + with: + sarif_file: 'trivy-results.sarif' + + notify-status: + name: Notify Test Status + runs-on: ubuntu-latest + needs: [integration-tests, full-integration-test] + if: always() + + steps: + - name: Notify success + if: needs.integration-tests.result == 'success' && needs.full-integration-test.result == 'success' + run: | + echo "✅ All integration tests passed successfully!" + + - name: Notify failure + if: needs.integration-tests.result == 'failure' || needs.full-integration-test.result == 'failure' + run: | + echo "❌ Integration tests failed. Check the logs for details." + exit 1 \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d8b916f --- /dev/null +++ b/Makefile @@ -0,0 +1,165 @@ +# Video Processor Development Makefile +# Simplifies common development and testing tasks + +.PHONY: help install test test-unit test-integration test-all lint format type-check clean docker-build docker-test + +# Default target +help: + @echo "Video Processor Development Commands" + @echo "=====================================" + @echo "" + @echo "Development:" + @echo " install Install dependencies with uv" + @echo " install-dev Install with development dependencies" + @echo "" + @echo "Testing:" + @echo " test Run unit tests only" + @echo " test-unit Run unit tests with coverage" + @echo " test-integration Run Docker integration tests" + @echo " test-all Run all tests (unit + integration)" + @echo "" + @echo "Code Quality:" + @echo " lint Run ruff linting" + @echo " format Format code with ruff" + @echo " type-check Run mypy type checking" + @echo " quality Run all quality checks (lint + format + type-check)" + @echo "" + @echo "Docker:" + @echo " docker-build Build Docker images" + @echo " docker-test Run tests in Docker environment" + @echo " docker-demo Start demo services" + @echo " docker-clean Clean up Docker containers and volumes" + @echo "" + @echo "Utilities:" + @echo " clean Clean up build artifacts and cache" + @echo " docs Generate documentation (if applicable)" + +# Development setup +install: + uv sync + +install-dev: + uv sync --dev + +# Testing targets +test: test-unit + +test-unit: + uv run pytest tests/ -x -v --tb=short --cov=src/ --cov-report=html --cov-report=term + +test-integration: + ./scripts/run-integration-tests.sh + +test-integration-verbose: + ./scripts/run-integration-tests.sh --verbose + +test-integration-fast: + ./scripts/run-integration-tests.sh --fast + +test-all: test-unit test-integration + +# Code quality +lint: + uv run ruff check . + +format: + uv run ruff format . + +type-check: + uv run mypy src/ + +quality: format lint type-check + +# Docker operations +docker-build: + docker-compose build + +docker-test: + docker-compose -f docker-compose.integration.yml build + ./scripts/run-integration-tests.sh --clean + +docker-demo: + docker-compose up -d postgres + docker-compose run --rm migrate + docker-compose up -d worker + docker-compose up demo + +docker-clean: + docker-compose down -v --remove-orphans + docker-compose -f docker-compose.integration.yml down -v --remove-orphans + docker system prune -f + +# Cleanup +clean: + rm -rf .pytest_cache/ + rm -rf htmlcov/ + rm -rf .coverage + rm -rf test-reports/ + rm -rf dist/ + rm -rf *.egg-info/ + find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true + find . -type f -name "*.pyc" -delete + +# CI/CD simulation +ci-test: + @echo "Running CI-like test suite..." + $(MAKE) quality + $(MAKE) test-unit + $(MAKE) test-integration + +# Development workflow helpers +dev-setup: install-dev + @echo "Development environment ready!" + @echo "Run 'make test' to verify installation" + +# Quick development cycle +dev: format lint test-unit + +# Release preparation +pre-release: clean quality test-all + @echo "Ready for release! All tests passed and code is properly formatted." + +# Documentation (placeholder for future docs) +docs: + @echo "Documentation generation not yet implemented" + +# Show current test coverage +coverage: + uv run pytest tests/ --cov=src/ --cov-report=html --cov-report=term + @echo "Coverage report generated in htmlcov/" + +# Run specific test file +test-file: + @if [ -z "$(FILE)" ]; then \ + echo "Usage: make test-file FILE=path/to/test_file.py"; \ + else \ + uv run pytest $(FILE) -v; \ + fi + +# Run tests matching a pattern +test-pattern: + @if [ -z "$(PATTERN)" ]; then \ + echo "Usage: make test-pattern PATTERN=test_name_pattern"; \ + else \ + uv run pytest -k "$(PATTERN)" -v; \ + fi + +# Development server (if web demo exists) +dev-server: + uv run python examples/web_demo.py + +# Database operations (requires running postgres) +db-migrate: + uv run python -c "import asyncio; from video_processor.tasks.migration import migrate_database; asyncio.run(migrate_database('postgresql://video_user:video_password@localhost:5432/video_processor'))" + +# Show project status +status: + @echo "Project Status:" + @echo "===============" + @uv --version + @echo "" + @echo "Python packages:" + @uv pip list | head -10 + @echo "" + @echo "Docker status:" + @docker-compose ps || echo "No containers running" \ No newline at end of file diff --git a/docker-compose.integration.yml b/docker-compose.integration.yml new file mode 100644 index 0000000..c249f0d --- /dev/null +++ b/docker-compose.integration.yml @@ -0,0 +1,102 @@ +# Docker Compose configuration for integration testing +# Separate from main docker-compose.yml to avoid conflicts during testing + +version: '3.8' + +services: + # PostgreSQL for integration tests + postgres-integration: + image: postgres:15-alpine + environment: + POSTGRES_DB: video_processor_integration_test + POSTGRES_USER: video_user + POSTGRES_PASSWORD: video_password + POSTGRES_HOST_AUTH_METHOD: trust + ports: + - "5433:5432" # Different port to avoid conflicts + healthcheck: + test: ["CMD-SHELL", "pg_isready -U video_user -d video_processor_integration_test"] + interval: 5s + timeout: 5s + retries: 10 + networks: + - integration_net + tmpfs: + - /var/lib/postgresql/data # Use tmpfs for faster test database + + # Migration service for integration tests + migrate-integration: + build: + context: . + dockerfile: Dockerfile + target: migration + environment: + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres-integration:5432/video_processor_integration_test + depends_on: + postgres-integration: + condition: service_healthy + networks: + - integration_net + command: ["python", "-c", " + import asyncio; + from video_processor.tasks.migration import migrate_database; + asyncio.run(migrate_database('postgresql://video_user:video_password@postgres-integration:5432/video_processor_integration_test')) + "] + + # Background worker for integration tests + worker-integration: + build: + context: . + dockerfile: Dockerfile + target: worker + environment: + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres-integration:5432/video_processor_integration_test + - WORKER_CONCURRENCY=2 # Reduced for testing + - WORKER_TIMEOUT=60 # Faster timeout for tests + depends_on: + postgres-integration: + condition: service_healthy + migrate-integration: + condition: service_completed_successfully + networks: + - integration_net + volumes: + - integration_uploads:/app/uploads + - integration_outputs:/app/outputs + command: ["python", "-m", "video_processor.tasks.worker_compatibility", "worker"] + + # Integration test runner + integration-tests: + build: + context: . + dockerfile: Dockerfile + target: development + environment: + - DATABASE_URL=postgresql://video_user:video_password@postgres-integration:5432/video_processor_integration_test + - PROCRASTINATE_DATABASE_URL=postgresql://video_user:video_password@postgres-integration:5432/video_processor_integration_test + - PYTEST_ARGS=${PYTEST_ARGS:--v --tb=short} + volumes: + - .:/app + - integration_uploads:/app/uploads + - integration_outputs:/app/outputs + - /var/run/docker.sock:/var/run/docker.sock # Access to Docker for container management + depends_on: + postgres-integration: + condition: service_healthy + migrate-integration: + condition: service_completed_successfully + worker-integration: + condition: service_started + networks: + - integration_net + command: ["uv", "run", "pytest", "tests/integration/", "-v", "--tb=short", "--durations=10"] + +volumes: + integration_uploads: + driver: local + integration_outputs: + driver: local + +networks: + integration_net: + driver: bridge \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0b41f15..6a82264 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,10 @@ dev = [ "mypy>=1.7.0", "pytest>=7.0.0", "pytest-cov>=4.0.0", + "pytest-asyncio>=0.21.0", + # Integration testing dependencies + "docker>=6.1.0", + "psycopg2-binary>=2.9.0", ] # Core 360° video processing diff --git a/scripts/run-integration-tests.sh b/scripts/run-integration-tests.sh new file mode 100755 index 0000000..2979e2b --- /dev/null +++ b/scripts/run-integration-tests.sh @@ -0,0 +1,254 @@ +#!/bin/bash + +# Integration Test Runner Script +# Runs comprehensive end-to-end tests in Docker environment + +set -euo pipefail + +# Configuration +PROJECT_NAME="video-processor-integration" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Logging functions +log_info() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +log_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +log_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Help function +show_help() { + cat << EOF +Video Processor Integration Test Runner + +Usage: $0 [OPTIONS] + +OPTIONS: + -h, --help Show this help message + -v, --verbose Run tests with verbose output + -f, --fast Run tests with minimal setup (skip some slow tests) + -c, --clean Clean up containers and volumes before running + -k, --keep Keep containers running after tests (for debugging) + --test-filter Pytest filter expression (e.g. "test_video_processing") + --timeout Timeout for tests in seconds (default: 300) + +EXAMPLES: + $0 # Run all integration tests + $0 -v # Verbose output + $0 -c # Clean start + $0 --test-filter "test_worker" # Run only worker tests + $0 -k # Keep containers for debugging + +EOF +} + +# Parse command line arguments +VERBOSE=false +CLEAN=false +KEEP_CONTAINERS=false +FAST_MODE=false +TEST_FILTER="" +TIMEOUT=300 + +while [[ $# -gt 0 ]]; do + case $1 in + -h|--help) + show_help + exit 0 + ;; + -v|--verbose) + VERBOSE=true + shift + ;; + -f|--fast) + FAST_MODE=true + shift + ;; + -c|--clean) + CLEAN=true + shift + ;; + -k|--keep) + KEEP_CONTAINERS=true + shift + ;; + --test-filter) + TEST_FILTER="$2" + shift 2 + ;; + --timeout) + TIMEOUT="$2" + shift 2 + ;; + *) + log_error "Unknown option: $1" + show_help + exit 1 + ;; + esac +done + +# Check dependencies +check_dependencies() { + log_info "Checking dependencies..." + + if ! command -v docker &> /dev/null; then + log_error "Docker is not installed or not in PATH" + exit 1 + fi + + if ! command -v docker-compose &> /dev/null; then + log_error "Docker Compose is not installed or not in PATH" + exit 1 + fi + + # Check if Docker daemon is running + if ! docker info &> /dev/null; then + log_error "Docker daemon is not running" + exit 1 + fi + + log_success "All dependencies available" +} + +# Cleanup function +cleanup() { + if [ "$KEEP_CONTAINERS" = false ]; then + log_info "Cleaning up containers and volumes..." + cd "$PROJECT_ROOT" + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" down -v --remove-orphans || true + log_success "Cleanup completed" + else + log_warning "Keeping containers running for debugging" + log_info "To manually cleanup later, run:" + log_info " docker-compose -f docker-compose.integration.yml -p $PROJECT_NAME down -v" + fi +} + +# Trap to ensure cleanup on exit +trap cleanup EXIT + +# Main test execution +run_integration_tests() { + cd "$PROJECT_ROOT" + + log_info "Starting integration tests for Video Processor" + log_info "Project: $PROJECT_NAME" + log_info "Timeout: ${TIMEOUT}s" + + # Clean up if requested + if [ "$CLEAN" = true ]; then + log_info "Performing clean start..." + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" down -v --remove-orphans || true + fi + + # Build pytest arguments + PYTEST_ARGS="-v --tb=short --durations=10" + + if [ "$VERBOSE" = true ]; then + PYTEST_ARGS="$PYTEST_ARGS -s" + fi + + if [ "$FAST_MODE" = true ]; then + PYTEST_ARGS="$PYTEST_ARGS -m 'not slow'" + fi + + if [ -n "$TEST_FILTER" ]; then + PYTEST_ARGS="$PYTEST_ARGS -k '$TEST_FILTER'" + fi + + # Set environment variables + export COMPOSE_PROJECT_NAME="$PROJECT_NAME" + export PYTEST_ARGS="$PYTEST_ARGS" + + log_info "Building containers..." + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" build + + log_info "Starting services..." + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" up -d postgres-integration + + log_info "Waiting for database to be ready..." + timeout 30 bash -c 'until docker-compose -f docker-compose.integration.yml -p '"$PROJECT_NAME"' exec -T postgres-integration pg_isready -U video_user; do sleep 1; done' + + log_info "Running database migration..." + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" run --rm migrate-integration + + log_info "Starting worker..." + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" up -d worker-integration + + log_info "Running integration tests..." + log_info "Test command: pytest $PYTEST_ARGS" + + # Run the tests with timeout + if timeout "$TIMEOUT" docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" run --rm integration-tests; then + log_success "All integration tests passed! ✅" + return 0 + else + local exit_code=$? + if [ $exit_code -eq 124 ]; then + log_error "Tests timed out after ${TIMEOUT} seconds" + else + log_error "Integration tests failed with exit code $exit_code" + fi + + # Show logs for debugging + log_warning "Showing service logs for debugging..." + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" logs --tail=50 + + return $exit_code + fi +} + +# Generate test report +generate_report() { + log_info "Generating test report..." + + # Get container logs + local log_dir="$PROJECT_ROOT/test-reports" + mkdir -p "$log_dir" + + cd "$PROJECT_ROOT" + docker-compose -f docker-compose.integration.yml -p "$PROJECT_NAME" logs > "$log_dir/integration-test-logs.txt" 2>&1 || true + + log_success "Test logs saved to: $log_dir/integration-test-logs.txt" +} + +# Main execution +main() { + log_info "Video Processor Integration Test Runner" + log_info "========================================" + + check_dependencies + + # Run tests + if run_integration_tests; then + log_success "Integration tests completed successfully!" + generate_report + exit 0 + else + log_error "Integration tests failed!" + generate_report + exit 1 + fi +} + +# Run main function +main "$@" \ No newline at end of file diff --git a/tests/integration/README.md b/tests/integration/README.md new file mode 100644 index 0000000..04d8ba3 --- /dev/null +++ b/tests/integration/README.md @@ -0,0 +1,230 @@ +# Integration Tests + +This directory contains end-to-end integration tests that verify the complete Video Processor system in a Docker environment. + +## Overview + +The integration tests validate: + +- **Complete video processing pipeline** - encoding, thumbnails, sprites +- **Procrastinate worker functionality** - async job processing and queue management +- **Database migration system** - schema creation and version compatibility +- **Docker containerization** - multi-service orchestration +- **Error handling and edge cases** - real-world failure scenarios + +## Test Architecture + +### Test Structure + +``` +tests/integration/ +├── conftest.py # Pytest fixtures and Docker setup +├── test_video_processing_e2e.py # Video processing pipeline tests +├── test_procrastinate_worker_e2e.py # Worker and job queue tests +├── test_database_migration_e2e.py # Database migration tests +└── README.md # This file +``` + +### Docker Services + +The tests use a dedicated Docker Compose configuration (`docker-compose.integration.yml`) with: + +- **postgres-integration** - PostgreSQL database on port 5433 +- **migrate-integration** - Runs database migrations +- **worker-integration** - Procrastinate background worker +- **integration-tests** - Test runner container + +## Running Integration Tests + +### Quick Start + +```bash +# Run all integration tests +make test-integration + +# Or use the script directly +./scripts/run-integration-tests.sh +``` + +### Advanced Options + +```bash +# Verbose output +./scripts/run-integration-tests.sh --verbose + +# Fast mode (skip slow tests) +./scripts/run-integration-tests.sh --fast + +# Run specific test pattern +./scripts/run-integration-tests.sh --test-filter "test_video_processing" + +# Keep containers for debugging +./scripts/run-integration-tests.sh --keep + +# Clean start +./scripts/run-integration-tests.sh --clean +``` + +### Manual Docker Setup + +```bash +# Start services manually +docker-compose -f docker-compose.integration.yml up -d postgres-integration +docker-compose -f docker-compose.integration.yml run --rm migrate-integration +docker-compose -f docker-compose.integration.yml up -d worker-integration + +# Run tests +docker-compose -f docker-compose.integration.yml run --rm integration-tests + +# Cleanup +docker-compose -f docker-compose.integration.yml down -v +``` + +## Test Categories + +### Video Processing Tests (`test_video_processing_e2e.py`) + +- **Synchronous processing** - Complete pipeline with multiple formats +- **Configuration validation** - Quality presets and output formats +- **Error handling** - Invalid inputs and edge cases +- **Performance testing** - Processing time validation +- **Concurrent processing** - Multiple simultaneous jobs + +### Worker Integration Tests (`test_procrastinate_worker_e2e.py`) + +- **Job submission** - Async task queuing and processing +- **Worker functionality** - Background job execution +- **Error handling** - Failed job scenarios +- **Queue management** - Job status and monitoring +- **Version compatibility** - Procrastinate 2.x/3.x support + +### Database Migration Tests (`test_database_migration_e2e.py`) + +- **Fresh installation** - Database schema creation +- **Migration idempotency** - Safe re-runs +- **Version compatibility** - 2.x vs 3.x migration paths +- **Production workflows** - Multi-stage migrations +- **Error scenarios** - Invalid configurations + +## Test Data + +Tests use FFmpeg-generated test videos: +- 10-second test video (640x480, 30fps) +- Created dynamically using `testsrc` filter +- Small size for fast processing + +## Dependencies + +### System Requirements + +- **Docker & Docker Compose** - Container orchestration +- **FFmpeg** - Video processing (system package) +- **PostgreSQL client** - Database testing utilities + +### Python Dependencies + +```toml +# Added to pyproject.toml [project.optional-dependencies.dev] +"pytest-asyncio>=0.21.0" # Async test support +"docker>=6.1.0" # Docker API client +"psycopg2-binary>=2.9.0" # PostgreSQL adapter +``` + +## Debugging + +### View Logs + +```bash +# Show all service logs +docker-compose -f docker-compose.integration.yml logs + +# Follow specific service +docker-compose -f docker-compose.integration.yml logs -f worker-integration + +# Test logs are saved to test-reports/ directory +``` + +### Connect to Services + +```bash +# Access test database +psql -h localhost -p 5433 -U video_user -d video_processor_integration_test + +# Execute commands in containers +docker-compose -f docker-compose.integration.yml exec postgres-integration psql -U video_user + +# Access test container +docker-compose -f docker-compose.integration.yml run --rm integration-tests bash +``` + +### Common Issues + +**Port conflicts**: Integration tests use port 5433 to avoid conflicts with main PostgreSQL + +**FFmpeg missing**: Install system FFmpeg package: `sudo apt install ffmpeg` + +**Docker permissions**: Add user to docker group: `sudo usermod -aG docker $USER` + +**Database connection failures**: Ensure PostgreSQL container is healthy before running tests + +## CI/CD Integration + +### GitHub Actions + +The integration tests run automatically on: +- Push to main/develop branches +- Pull requests to main +- Daily scheduled runs (2 AM UTC) + +See `.github/workflows/integration-tests.yml` for configuration. + +### Test Matrix + +Tests run with different configurations: +- Separate test suites (video, worker, database) +- Full integration suite +- Performance testing (scheduled only) +- Security scanning + +## Performance Benchmarks + +Expected performance for test environment: +- Video processing: < 10x realtime for test videos +- Job processing: < 60 seconds for simple tasks +- Database migration: < 30 seconds +- Full test suite: < 20 minutes + +## Contributing + +When adding integration tests: + +1. **Use fixtures** - Leverage `conftest.py` fixtures for setup +2. **Clean state** - Use `clean_database` fixture to isolate tests +3. **Descriptive names** - Use clear test method names +4. **Proper cleanup** - Ensure resources are freed after tests +5. **Error messages** - Provide helpful assertions with context + +### Test Guidelines + +- Test real scenarios users will encounter +- Include both success and failure paths +- Validate outputs completely (file existence, content, metadata) +- Keep tests fast but comprehensive +- Use meaningful test data and IDs + +## Troubleshooting + +### Failed Tests + +1. Check container logs: `./scripts/run-integration-tests.sh --verbose` +2. Verify Docker services: `docker-compose -f docker-compose.integration.yml ps` +3. Test database connection: `psql -h localhost -p 5433 -U video_user` +4. Check FFmpeg: `ffmpeg -version` + +### Resource Issues + +- **Out of disk space**: Run `docker system prune -af` +- **Memory issues**: Reduce `WORKER_CONCURRENCY` in docker-compose +- **Network conflicts**: Use `--clean` flag to reset network state + +For more help, see the main project README or open an issue. \ No newline at end of file diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..74ad82e --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,7 @@ +""" +Integration tests for Docker-based Video Processor deployment. + +These tests verify that the entire system works correctly when deployed +using Docker Compose, including database connectivity, worker processing, +and the full video processing pipeline. +""" \ No newline at end of file diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..f3520b4 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,220 @@ +""" +Pytest configuration and fixtures for Docker integration tests. +""" + +import asyncio +import os +import subprocess +import tempfile +import time +from pathlib import Path +from typing import Generator, Dict, Any + +import pytest +import docker +import psycopg2 +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT + +from video_processor.tasks.compat import get_version_info + + +@pytest.fixture(scope="session") +def docker_client() -> docker.DockerClient: + """Docker client for managing containers and services.""" + return docker.from_env() + + +@pytest.fixture(scope="session") +def temp_video_dir() -> Generator[Path, None, None]: + """Temporary directory for test video files.""" + with tempfile.TemporaryDirectory(prefix="video_test_") as temp_dir: + yield Path(temp_dir) + + +@pytest.fixture(scope="session") +def test_video_file(temp_video_dir: Path) -> Path: + """Create a test video file for processing.""" + video_file = temp_video_dir / "test_input.mp4" + + # Create a simple test video using FFmpeg + cmd = [ + "ffmpeg", "-y", + "-f", "lavfi", + "-i", "testsrc=duration=10:size=640x480:rate=30", + "-c:v", "libx264", + "-preset", "ultrafast", + "-crf", "28", + str(video_file) + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + assert video_file.exists(), "Test video file was not created" + return video_file + except (subprocess.CalledProcessError, FileNotFoundError) as e: + pytest.skip(f"FFmpeg not available or failed: {e}") + + +@pytest.fixture(scope="session") +def docker_compose_project(docker_client: docker.DockerClient) -> Generator[str, None, None]: + """Start Docker Compose services for testing.""" + project_root = Path(__file__).parent.parent.parent + project_name = "video-processor-integration-test" + + # Environment variables for test database + test_env = os.environ.copy() + test_env.update({ + "COMPOSE_PROJECT_NAME": project_name, + "POSTGRES_DB": "video_processor_integration_test", + "DATABASE_URL": "postgresql://video_user:video_password@postgres:5432/video_processor_integration_test", + "PROCRASTINATE_DATABASE_URL": "postgresql://video_user:video_password@postgres:5432/video_processor_integration_test" + }) + + # Start services + print(f"\n🐳 Starting Docker Compose services for integration tests...") + + # First, ensure we're in a clean state + subprocess.run([ + "docker-compose", "-p", project_name, "down", "-v", "--remove-orphans" + ], cwd=project_root, env=test_env, capture_output=True) + + try: + # Start core services (postgres first) + subprocess.run([ + "docker-compose", "-p", project_name, "up", "-d", "postgres" + ], cwd=project_root, env=test_env, check=True) + + # Wait for postgres to be healthy + _wait_for_postgres_health(docker_client, project_name) + + # Run database migration + subprocess.run([ + "docker-compose", "-p", project_name, "run", "--rm", "migrate" + ], cwd=project_root, env=test_env, check=True) + + # Start worker service + subprocess.run([ + "docker-compose", "-p", project_name, "up", "-d", "worker" + ], cwd=project_root, env=test_env, check=True) + + # Wait a moment for services to fully start + time.sleep(5) + + print("✅ Docker Compose services started successfully") + yield project_name + + finally: + print("\n🧹 Cleaning up Docker Compose services...") + subprocess.run([ + "docker-compose", "-p", project_name, "down", "-v", "--remove-orphans" + ], cwd=project_root, env=test_env, capture_output=True) + print("✅ Cleanup completed") + + +def _wait_for_postgres_health(client: docker.DockerClient, project_name: str, timeout: int = 30) -> None: + """Wait for PostgreSQL container to be healthy.""" + container_name = f"{project_name}-postgres-1" + + print(f"⏳ Waiting for PostgreSQL container {container_name} to be healthy...") + + start_time = time.time() + while time.time() - start_time < timeout: + try: + container = client.containers.get(container_name) + health = container.attrs["State"]["Health"]["Status"] + if health == "healthy": + print("✅ PostgreSQL is healthy") + return + print(f" Health status: {health}") + except docker.errors.NotFound: + print(f" Container {container_name} not found yet...") + except KeyError: + print(" No health check status available yet...") + + time.sleep(2) + + raise TimeoutError(f"PostgreSQL container did not become healthy within {timeout} seconds") + + +@pytest.fixture(scope="session") +def postgres_connection(docker_compose_project: str) -> Generator[Dict[str, Any], None, None]: + """PostgreSQL connection parameters for testing.""" + conn_params = { + "host": "localhost", + "port": 5432, + "user": "video_user", + "password": "video_password", + "database": "video_processor_integration_test" + } + + # Test connection + print("🔌 Testing PostgreSQL connection...") + max_retries = 10 + for i in range(max_retries): + try: + with psycopg2.connect(**conn_params) as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + cursor.execute("SELECT version();") + version = cursor.fetchone()[0] + print(f"✅ Connected to PostgreSQL: {version}") + break + except psycopg2.OperationalError as e: + if i == max_retries - 1: + raise ConnectionError(f"Could not connect to PostgreSQL after {max_retries} attempts: {e}") + print(f" Attempt {i+1}/{max_retries} failed, retrying in 2s...") + time.sleep(2) + + yield conn_params + + +@pytest.fixture +def procrastinate_app(postgres_connection: Dict[str, Any]): + """Set up Procrastinate app for testing.""" + from video_processor.tasks import setup_procrastinate + + db_url = ( + f"postgresql://{postgres_connection['user']}:" + f"{postgres_connection['password']}@" + f"{postgres_connection['host']}:{postgres_connection['port']}/" + f"{postgres_connection['database']}" + ) + + app = setup_procrastinate(db_url) + print(f"✅ Procrastinate app initialized with {get_version_info()['procrastinate_version']}") + return app + + +@pytest.fixture +def clean_database(postgres_connection: Dict[str, Any]): + """Ensure clean database state for each test.""" + print("🧹 Cleaning database state for test...") + + with psycopg2.connect(**postgres_connection) as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + # Clean up any existing jobs + cursor.execute(""" + DELETE FROM procrastinate_jobs WHERE 1=1; + DELETE FROM procrastinate_events WHERE 1=1; + """) + + yield + + # Cleanup after test + with psycopg2.connect(**postgres_connection) as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + cursor.execute(""" + DELETE FROM procrastinate_jobs WHERE 1=1; + DELETE FROM procrastinate_events WHERE 1=1; + """) + + +# Async event loop fixture for async tests +@pytest.fixture +def event_loop(): + """Create an instance of the default event loop for the test session.""" + loop = asyncio.new_event_loop() + yield loop + loop.close() \ No newline at end of file diff --git a/tests/integration/test_database_migration_e2e.py b/tests/integration/test_database_migration_e2e.py new file mode 100644 index 0000000..4373dfe --- /dev/null +++ b/tests/integration/test_database_migration_e2e.py @@ -0,0 +1,383 @@ +""" +End-to-end integration tests for database migration functionality in Docker environment. + +These tests verify: +- Database migration execution in containerized environment +- Schema creation and validation +- Version compatibility between Procrastinate 2.x and 3.x +- Migration rollback scenarios +""" + +import asyncio +import subprocess +from pathlib import Path +from typing import Dict, Any + +import pytest +import psycopg2 +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT + +from video_processor.tasks.migration import migrate_database, ProcrastinateMigrationHelper +from video_processor.tasks.compat import get_version_info, IS_PROCRASTINATE_3_PLUS + + +class TestDatabaseMigrationE2E: + """End-to-end tests for database migration in Docker environment.""" + + def test_fresh_database_migration( + self, + postgres_connection: Dict[str, Any], + docker_compose_project: str + ): + """Test migrating a fresh database from scratch.""" + print(f"\n🗄️ Testing fresh database migration") + + # Create a fresh test database + test_db_name = "video_processor_migration_fresh" + self._create_test_database(postgres_connection, test_db_name) + + try: + # Build connection URL for test database + db_url = ( + f"postgresql://{postgres_connection['user']}:" + f"{postgres_connection['password']}@" + f"{postgres_connection['host']}:{postgres_connection['port']}/" + f"{test_db_name}" + ) + + # Run migration + success = asyncio.run(migrate_database(db_url)) + assert success, "Migration should succeed on fresh database" + + # Verify schema was created + self._verify_procrastinate_schema(postgres_connection, test_db_name) + + print("✅ Fresh database migration completed successfully") + + finally: + self._drop_test_database(postgres_connection, test_db_name) + + def test_migration_idempotency( + self, + postgres_connection: Dict[str, Any], + docker_compose_project: str + ): + """Test that migrations can be run multiple times safely.""" + print(f"\n🔁 Testing migration idempotency") + + test_db_name = "video_processor_migration_idempotent" + self._create_test_database(postgres_connection, test_db_name) + + try: + db_url = ( + f"postgresql://{postgres_connection['user']}:" + f"{postgres_connection['password']}@" + f"{postgres_connection['host']}:{postgres_connection['port']}/" + f"{test_db_name}" + ) + + # Run migration first time + success1 = asyncio.run(migrate_database(db_url)) + assert success1, "First migration should succeed" + + # Run migration second time (should be idempotent) + success2 = asyncio.run(migrate_database(db_url)) + assert success2, "Second migration should also succeed (idempotent)" + + # Verify schema is still intact + self._verify_procrastinate_schema(postgres_connection, test_db_name) + + print("✅ Migration idempotency test passed") + + finally: + self._drop_test_database(postgres_connection, test_db_name) + + def test_docker_migration_service( + self, + docker_compose_project: str, + postgres_connection: Dict[str, Any] + ): + """Test that Docker migration service works correctly.""" + print(f"\n🐳 Testing Docker migration service") + + # The migration should have already run as part of docker_compose_project setup + # Verify the migration was successful by checking the main database + + main_db_name = "video_processor_integration_test" + self._verify_procrastinate_schema(postgres_connection, main_db_name) + + print("✅ Docker migration service verification passed") + + def test_migration_helper_functionality( + self, + postgres_connection: Dict[str, Any], + docker_compose_project: str + ): + """Test migration helper utility functions.""" + print(f"\n🛠️ Testing migration helper functionality") + + test_db_name = "video_processor_migration_helper" + self._create_test_database(postgres_connection, test_db_name) + + try: + db_url = ( + f"postgresql://{postgres_connection['user']}:" + f"{postgres_connection['password']}@" + f"{postgres_connection['host']}:{postgres_connection['port']}/" + f"{test_db_name}" + ) + + # Test migration helper + helper = ProcrastinateMigrationHelper(db_url) + + # Test migration plan generation + migration_plan = helper.generate_migration_plan() + assert isinstance(migration_plan, list) + assert len(migration_plan) > 0 + + print(f" Generated migration plan with {len(migration_plan)} steps") + + # Test version-specific migration commands + if IS_PROCRASTINATE_3_PLUS: + pre_cmd = helper.get_pre_migration_command() + post_cmd = helper.get_post_migration_command() + assert "pre" in pre_cmd + assert "post" in post_cmd + print(f" Procrastinate 3.x commands: pre='{pre_cmd}', post='{post_cmd}'") + else: + legacy_cmd = helper.get_legacy_migration_command() + assert "schema" in legacy_cmd + print(f" Procrastinate 2.x command: '{legacy_cmd}'") + + print("✅ Migration helper functionality verified") + + finally: + self._drop_test_database(postgres_connection, test_db_name) + + def test_version_compatibility_detection( + self, + docker_compose_project: str + ): + """Test version compatibility detection during migration.""" + print(f"\n🔍 Testing version compatibility detection") + + # Get version information + version_info = get_version_info() + + print(f" Detected Procrastinate version: {version_info['procrastinate_version']}") + print(f" Is Procrastinate 3+: {IS_PROCRASTINATE_3_PLUS}") + print(f" Available features: {list(version_info['features'].keys())}") + + # Verify version detection is working + assert version_info["procrastinate_version"] is not None + assert isinstance(IS_PROCRASTINATE_3_PLUS, bool) + assert len(version_info["features"]) > 0 + + print("✅ Version compatibility detection working") + + def test_migration_error_handling( + self, + postgres_connection: Dict[str, Any], + docker_compose_project: str + ): + """Test migration error handling for invalid scenarios.""" + print(f"\n🚫 Testing migration error handling") + + # Test with invalid database URL + invalid_url = "postgresql://invalid_user:invalid_pass@localhost:5432/nonexistent_db" + + # Migration should handle the error gracefully + success = asyncio.run(migrate_database(invalid_url)) + assert not success, "Migration should fail with invalid database URL" + + print("✅ Migration error handling test passed") + + def _create_test_database(self, postgres_connection: Dict[str, Any], db_name: str): + """Create a test database for migration testing.""" + # Connect to postgres db to create new database + conn_params = postgres_connection.copy() + conn_params["database"] = "postgres" + + with psycopg2.connect(**conn_params) as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + # Drop if exists, then create + cursor.execute(f'DROP DATABASE IF EXISTS "{db_name}"') + cursor.execute(f'CREATE DATABASE "{db_name}"') + print(f" Created test database: {db_name}") + + def _drop_test_database(self, postgres_connection: Dict[str, Any], db_name: str): + """Clean up test database.""" + conn_params = postgres_connection.copy() + conn_params["database"] = "postgres" + + with psycopg2.connect(**conn_params) as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + cursor.execute(f'DROP DATABASE IF EXISTS "{db_name}"') + print(f" Cleaned up test database: {db_name}") + + def _verify_procrastinate_schema(self, postgres_connection: Dict[str, Any], db_name: str): + """Verify that Procrastinate schema was created properly.""" + conn_params = postgres_connection.copy() + conn_params["database"] = db_name + + with psycopg2.connect(**conn_params) as conn: + with conn.cursor() as cursor: + # Check for core Procrastinate tables + cursor.execute(""" + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name LIKE 'procrastinate_%' + ORDER BY table_name; + """) + tables = [row[0] for row in cursor.fetchall()] + + # Required tables for Procrastinate + required_tables = ["procrastinate_jobs", "procrastinate_events"] + for required_table in required_tables: + assert required_table in tables, f"Required table missing: {required_table}" + + # Check jobs table structure + cursor.execute(""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'procrastinate_jobs' + ORDER BY column_name; + """) + job_columns = {row[0]: row[1] for row in cursor.fetchall()} + + # Verify essential columns exist + essential_columns = ["id", "status", "task_name", "queue_name"] + for col in essential_columns: + assert col in job_columns, f"Essential column missing from jobs table: {col}" + + print(f" ✅ Schema verified: {len(tables)} tables, {len(job_columns)} job columns") + + +class TestMigrationIntegrationScenarios: + """Test realistic migration scenarios in Docker environment.""" + + def test_production_like_migration_workflow( + self, + postgres_connection: Dict[str, Any], + docker_compose_project: str + ): + """Test a production-like migration workflow.""" + print(f"\n🏭 Testing production-like migration workflow") + + test_db_name = "video_processor_migration_production" + self._create_fresh_db(postgres_connection, test_db_name) + + try: + db_url = self._build_db_url(postgres_connection, test_db_name) + + # Step 1: Run pre-migration (if Procrastinate 3.x) + if IS_PROCRASTINATE_3_PLUS: + print(" Running pre-migration phase...") + success = asyncio.run(migrate_database(db_url, pre_migration_only=True)) + assert success, "Pre-migration should succeed" + + # Step 2: Simulate application deployment (schema should be compatible) + self._verify_basic_schema_compatibility(postgres_connection, test_db_name) + + # Step 3: Run post-migration (if Procrastinate 3.x) + if IS_PROCRASTINATE_3_PLUS: + print(" Running post-migration phase...") + success = asyncio.run(migrate_database(db_url, post_migration_only=True)) + assert success, "Post-migration should succeed" + else: + # Single migration for 2.x + print(" Running single migration phase...") + success = asyncio.run(migrate_database(db_url)) + assert success, "Migration should succeed" + + # Step 4: Verify final schema + self._verify_complete_schema(postgres_connection, test_db_name) + + print("✅ Production-like migration workflow completed") + + finally: + self._cleanup_db(postgres_connection, test_db_name) + + def test_concurrent_migration_handling( + self, + postgres_connection: Dict[str, Any], + docker_compose_project: str + ): + """Test handling of concurrent migration attempts.""" + print(f"\n🔀 Testing concurrent migration handling") + + test_db_name = "video_processor_migration_concurrent" + self._create_fresh_db(postgres_connection, test_db_name) + + try: + db_url = self._build_db_url(postgres_connection, test_db_name) + + # Run two migrations concurrently (should handle gracefully) + async def run_concurrent_migrations(): + tasks = [ + migrate_database(db_url), + migrate_database(db_url) + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + return results + + results = asyncio.run(run_concurrent_migrations()) + + # At least one should succeed, others should handle gracefully + success_count = sum(1 for r in results if r is True) + assert success_count >= 1, "At least one concurrent migration should succeed" + + # Schema should still be valid + self._verify_complete_schema(postgres_connection, test_db_name) + + print("✅ Concurrent migration handling test passed") + + finally: + self._cleanup_db(postgres_connection, test_db_name) + + def _create_fresh_db(self, postgres_connection: Dict[str, Any], db_name: str): + """Create a fresh database for testing.""" + conn_params = postgres_connection.copy() + conn_params["database"] = "postgres" + + with psycopg2.connect(**conn_params) as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + cursor.execute(f'DROP DATABASE IF EXISTS "{db_name}"') + cursor.execute(f'CREATE DATABASE "{db_name}"') + + def _cleanup_db(self, postgres_connection: Dict[str, Any], db_name: str): + """Clean up test database.""" + conn_params = postgres_connection.copy() + conn_params["database"] = "postgres" + + with psycopg2.connect(**conn_params) as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + cursor.execute(f'DROP DATABASE IF EXISTS "{db_name}"') + + def _build_db_url(self, postgres_connection: Dict[str, Any], db_name: str) -> str: + """Build database URL for testing.""" + return ( + f"postgresql://{postgres_connection['user']}:" + f"{postgres_connection['password']}@" + f"{postgres_connection['host']}:{postgres_connection['port']}/" + f"{db_name}" + ) + + def _verify_basic_schema_compatibility(self, postgres_connection: Dict[str, Any], db_name: str): + """Verify basic schema compatibility during migration.""" + conn_params = postgres_connection.copy() + conn_params["database"] = db_name + + with psycopg2.connect(**conn_params) as conn: + with conn.cursor() as cursor: + # Should be able to query basic Procrastinate tables + cursor.execute("SELECT COUNT(*) FROM procrastinate_jobs") + assert cursor.fetchone()[0] == 0 # Should be empty initially + + def _verify_complete_schema(self, postgres_connection: Dict[str, Any], db_name: str): + """Verify complete schema after migration.""" + TestDatabaseMigrationE2E()._verify_procrastinate_schema(postgres_connection, db_name) \ No newline at end of file diff --git a/tests/integration/test_procrastinate_worker_e2e.py b/tests/integration/test_procrastinate_worker_e2e.py new file mode 100644 index 0000000..5576bc8 --- /dev/null +++ b/tests/integration/test_procrastinate_worker_e2e.py @@ -0,0 +1,355 @@ +""" +End-to-end integration tests for Procrastinate worker functionality in Docker environment. + +These tests verify: +- Job submission and processing through Procrastinate +- Worker container functionality +- Database job queue integration +- Async task processing +- Error handling and retries +""" + +import asyncio +import json +import time +from pathlib import Path +from typing import Dict, Any + +import pytest +import psycopg2 + +from video_processor.tasks.procrastinate_tasks import process_video_async, generate_thumbnail_async +from video_processor.tasks.compat import get_version_info + + +class TestProcrastinateWorkerE2E: + """End-to-end tests for Procrastinate worker integration.""" + + @pytest.mark.asyncio + async def test_async_video_processing_job_submission( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + procrastinate_app, + clean_database: None + ): + """Test submitting and tracking async video processing jobs.""" + print(f"\n📤 Testing async video processing job submission") + + # Prepare job parameters + output_dir = temp_video_dir / "async_job_output" + config_dict = { + "base_path": str(output_dir), + "output_formats": ["mp4"], + "quality_preset": "low", + "generate_thumbnails": True, + "generate_sprites": False, + "storage_backend": "local" + } + + # Submit job to queue + job = await procrastinate_app.tasks.process_video_async.defer_async( + input_path=str(test_video_file), + output_dir="async_test", + config_dict=config_dict + ) + + # Verify job was queued + assert job.id is not None + print(f"✅ Job submitted with ID: {job.id}") + + # Wait for job to be processed (worker should pick it up) + max_wait = 60 # seconds + start_time = time.time() + + while time.time() - start_time < max_wait: + # Check job status in database + job_status = await self._get_job_status(procrastinate_app, job.id) + print(f" Job status: {job_status}") + + if job_status in ["succeeded", "failed"]: + break + + await asyncio.sleep(2) + else: + pytest.fail(f"Job {job.id} did not complete within {max_wait} seconds") + + # Verify job completed successfully + final_status = await self._get_job_status(procrastinate_app, job.id) + assert final_status == "succeeded", f"Job failed with status: {final_status}" + + print(f"✅ Async job completed successfully in {time.time() - start_time:.2f}s") + + @pytest.mark.asyncio + async def test_thumbnail_generation_job( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + procrastinate_app, + clean_database: None + ): + """Test thumbnail generation as separate async job.""" + print(f"\n🖼️ Testing async thumbnail generation job") + + output_dir = temp_video_dir / "thumbnail_job_output" + output_dir.mkdir(exist_ok=True) + + # Submit thumbnail job + job = await procrastinate_app.tasks.generate_thumbnail_async.defer_async( + video_path=str(test_video_file), + output_dir=str(output_dir), + timestamp=5, + video_id="thumb_test_123" + ) + + print(f"✅ Thumbnail job submitted with ID: {job.id}") + + # Wait for completion + await self._wait_for_job_completion(procrastinate_app, job.id) + + # Verify thumbnail was created + expected_thumbnail = output_dir / "thumb_test_123_thumb_5.png" + assert expected_thumbnail.exists(), f"Thumbnail not found: {expected_thumbnail}" + assert expected_thumbnail.stat().st_size > 0, "Thumbnail file is empty" + + print("✅ Thumbnail generation job completed successfully") + + @pytest.mark.asyncio + async def test_job_error_handling( + self, + docker_compose_project: str, + temp_video_dir: Path, + procrastinate_app, + clean_database: None + ): + """Test error handling for invalid job parameters.""" + print(f"\n🚫 Testing job error handling") + + # Submit job with invalid video path + invalid_path = str(temp_video_dir / "does_not_exist.mp4") + config_dict = { + "base_path": str(temp_video_dir / "error_test"), + "output_formats": ["mp4"], + "quality_preset": "low" + } + + job = await procrastinate_app.tasks.process_video_async.defer_async( + input_path=invalid_path, + output_dir="error_test", + config_dict=config_dict + ) + + print(f"✅ Error job submitted with ID: {job.id}") + + # Wait for job to fail + await self._wait_for_job_completion(procrastinate_app, job.id, expected_status="failed") + + # Verify job failed appropriately + final_status = await self._get_job_status(procrastinate_app, job.id) + assert final_status == "failed", f"Expected job to fail, got: {final_status}" + + print("✅ Error handling test completed") + + @pytest.mark.asyncio + async def test_multiple_concurrent_jobs( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + procrastinate_app, + clean_database: None + ): + """Test processing multiple jobs concurrently.""" + print(f"\n🔄 Testing multiple concurrent jobs") + + num_jobs = 3 + jobs = [] + + # Submit multiple jobs + for i in range(num_jobs): + output_dir = temp_video_dir / f"concurrent_job_{i}" + config_dict = { + "base_path": str(output_dir), + "output_formats": ["mp4"], + "quality_preset": "low", + "generate_thumbnails": False, + "generate_sprites": False + } + + job = await procrastinate_app.tasks.process_video_async.defer_async( + input_path=str(test_video_file), + output_dir=f"concurrent_job_{i}", + config_dict=config_dict + ) + jobs.append(job) + print(f" Job {i+1} submitted: {job.id}") + + # Wait for all jobs to complete + start_time = time.time() + for i, job in enumerate(jobs): + await self._wait_for_job_completion(procrastinate_app, job.id) + print(f" ✅ Job {i+1} completed") + + total_time = time.time() - start_time + print(f"✅ All {num_jobs} jobs completed in {total_time:.2f}s") + + @pytest.mark.asyncio + async def test_worker_version_compatibility( + self, + docker_compose_project: str, + procrastinate_app, + postgres_connection: Dict[str, Any], + clean_database: None + ): + """Test that worker is using correct Procrastinate version.""" + print(f"\n🔍 Testing worker version compatibility") + + # Get version info from our compatibility layer + version_info = get_version_info() + print(f" Procrastinate version: {version_info['procrastinate_version']}") + print(f" Features: {list(version_info['features'].keys())}") + + # Verify database schema is compatible + with psycopg2.connect(**postgres_connection) as conn: + with conn.cursor() as cursor: + # Check that Procrastinate tables exist + cursor.execute(""" + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name LIKE 'procrastinate_%' + ORDER BY table_name; + """) + tables = [row[0] for row in cursor.fetchall()] + + print(f" Database tables: {tables}") + + # Verify core tables exist + required_tables = ["procrastinate_jobs", "procrastinate_events"] + for table in required_tables: + assert table in tables, f"Required table missing: {table}" + + print("✅ Worker version compatibility verified") + + async def _get_job_status(self, app, job_id: int) -> str: + """Get current job status from database.""" + # Use the app's connector to query job status + async with app.open_async() as app_context: + async with app_context.connector.pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT status FROM procrastinate_jobs WHERE id = %s", + [job_id] + ) + row = await cursor.fetchone() + return row[0] if row else "not_found" + + async def _wait_for_job_completion( + self, + app, + job_id: int, + timeout: int = 60, + expected_status: str = "succeeded" + ) -> None: + """Wait for job to reach completion status.""" + start_time = time.time() + + while time.time() - start_time < timeout: + status = await self._get_job_status(app, job_id) + + if status == expected_status: + return + elif status == "failed" and expected_status == "succeeded": + raise AssertionError(f"Job {job_id} failed unexpectedly") + elif status in ["succeeded", "failed"] and status != expected_status: + raise AssertionError(f"Job {job_id} completed with status '{status}', expected '{expected_status}'") + + await asyncio.sleep(2) + + raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds") + + +class TestProcrastinateQueueManagement: + """Tests for job queue management and monitoring.""" + + @pytest.mark.asyncio + async def test_job_queue_status( + self, + docker_compose_project: str, + procrastinate_app, + postgres_connection: Dict[str, Any], + clean_database: None + ): + """Test job queue status monitoring.""" + print(f"\n📊 Testing job queue status monitoring") + + # Check initial queue state (should be empty) + queue_stats = await self._get_queue_statistics(postgres_connection) + print(f" Initial queue stats: {queue_stats}") + + assert queue_stats["total_jobs"] == 0 + assert queue_stats["todo"] == 0 + assert queue_stats["doing"] == 0 + assert queue_stats["succeeded"] == 0 + assert queue_stats["failed"] == 0 + + print("✅ Queue status monitoring working") + + @pytest.mark.asyncio + async def test_job_cleanup( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + procrastinate_app, + postgres_connection: Dict[str, Any], + clean_database: None + ): + """Test job cleanup and retention.""" + print(f"\n🧹 Testing job cleanup functionality") + + # Submit a job + config_dict = { + "base_path": str(temp_video_dir / "cleanup_test"), + "output_formats": ["mp4"], + "quality_preset": "low" + } + + job = await procrastinate_app.tasks.process_video_async.defer_async( + input_path=str(test_video_file), + output_dir="cleanup_test", + config_dict=config_dict + ) + + # Wait for completion + await TestProcrastinateWorkerE2E()._wait_for_job_completion(procrastinate_app, job.id) + + # Verify job record exists + stats_after = await self._get_queue_statistics(postgres_connection) + assert stats_after["succeeded"] >= 1 + + print("✅ Job cleanup test completed") + + async def _get_queue_statistics(self, postgres_connection: Dict[str, Any]) -> Dict[str, int]: + """Get job queue statistics.""" + with psycopg2.connect(**postgres_connection) as conn: + conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cursor: + cursor.execute(""" + SELECT + COUNT(*) as total_jobs, + COUNT(*) FILTER (WHERE status = 'todo') as todo, + COUNT(*) FILTER (WHERE status = 'doing') as doing, + COUNT(*) FILTER (WHERE status = 'succeeded') as succeeded, + COUNT(*) FILTER (WHERE status = 'failed') as failed + FROM procrastinate_jobs; + """) + row = cursor.fetchone() + return { + "total_jobs": row[0], + "todo": row[1], + "doing": row[2], + "succeeded": row[3], + "failed": row[4] + } \ No newline at end of file diff --git a/tests/integration/test_video_processing_e2e.py b/tests/integration/test_video_processing_e2e.py new file mode 100644 index 0000000..0da9af3 --- /dev/null +++ b/tests/integration/test_video_processing_e2e.py @@ -0,0 +1,316 @@ +""" +End-to-end integration tests for video processing in Docker environment. + +These tests verify the complete video processing pipeline including: +- Video encoding with multiple formats +- Thumbnail generation +- Sprite generation +- Database integration +- File system operations +""" + +import asyncio +import time +from pathlib import Path +from typing import Dict, Any + +import pytest +import psycopg2 + +from video_processor import VideoProcessor, ProcessorConfig +from video_processor.core.processor import VideoProcessingResult + + +class TestVideoProcessingE2E: + """End-to-end tests for video processing pipeline.""" + + def test_synchronous_video_processing( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + clean_database: None + ): + """Test complete synchronous video processing pipeline.""" + print(f"\n🎬 Testing synchronous video processing with {test_video_file}") + + # Configure processor for integration testing + output_dir = temp_video_dir / "sync_output" + config = ProcessorConfig( + base_path=output_dir, + output_formats=["mp4", "webm"], # Test multiple formats + quality_preset="low", # Fast processing for tests + generate_thumbnails=True, + generate_sprites=True, + sprite_interval=2.0, # More frequent for short test video + thumbnail_timestamp=5, # 5 seconds into 10s video + storage_backend="local" + ) + + # Initialize processor + processor = VideoProcessor(config) + + # Process the test video + start_time = time.time() + result = processor.process_video( + input_path=test_video_file, + output_dir="test_sync_processing" + ) + processing_time = time.time() - start_time + + # Verify result structure + assert isinstance(result, VideoProcessingResult) + assert result.video_id is not None + assert len(result.video_id) > 0 + + # Verify encoded files + assert "mp4" in result.encoded_files + assert "webm" in result.encoded_files + + for format_name, output_path in result.encoded_files.items(): + assert output_path.exists(), f"{format_name} output file not found: {output_path}" + assert output_path.stat().st_size > 0, f"{format_name} output file is empty" + + # Verify thumbnail + assert result.thumbnail_file is not None + assert result.thumbnail_file.exists() + assert result.thumbnail_file.suffix.lower() in [".jpg", ".jpeg", ".png"] + + # Verify sprite files + assert result.sprite_files is not None + sprite_image, webvtt_file = result.sprite_files + assert sprite_image.exists() + assert webvtt_file.exists() + assert sprite_image.suffix.lower() in [".jpg", ".jpeg", ".png"] + assert webvtt_file.suffix == ".vtt" + + # Verify metadata + assert result.metadata is not None + assert result.metadata.duration > 0 + assert result.metadata.width > 0 + assert result.metadata.height > 0 + + print(f"✅ Synchronous processing completed in {processing_time:.2f}s") + print(f" Video ID: {result.video_id}") + print(f" Formats: {list(result.encoded_files.keys())}") + print(f" Duration: {result.metadata.duration}s") + + def test_video_processing_with_custom_config( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + clean_database: None + ): + """Test video processing with various configuration options.""" + print(f"\n⚙️ Testing video processing with custom configuration") + + output_dir = temp_video_dir / "custom_config_output" + + # Test with different quality preset + config = ProcessorConfig( + base_path=output_dir, + output_formats=["mp4"], + quality_preset="medium", + generate_thumbnails=True, + generate_sprites=False, # Disable sprites for this test + thumbnail_timestamp=1, + custom_ffmpeg_options={ + "video": ["-preset", "ultrafast"], # Override for speed + "audio": ["-ac", "1"] # Mono audio + } + ) + + processor = VideoProcessor(config) + result = processor.process_video(test_video_file, "custom_config_test") + + # Verify custom configuration was applied + assert len(result.encoded_files) == 1 # Only MP4 + assert "mp4" in result.encoded_files + assert result.thumbnail_file is not None + assert result.sprite_files is None # Sprites disabled + + print("✅ Custom configuration test passed") + + def test_error_handling( + self, + docker_compose_project: str, + temp_video_dir: Path, + clean_database: None + ): + """Test error handling for invalid inputs.""" + print(f"\n🚫 Testing error handling scenarios") + + config = ProcessorConfig( + base_path=temp_video_dir / "error_test", + output_formats=["mp4"], + quality_preset="low" + ) + processor = VideoProcessor(config) + + # Test with non-existent file + non_existent_file = temp_video_dir / "does_not_exist.mp4" + + with pytest.raises(FileNotFoundError): + processor.process_video(non_existent_file, "error_test") + + print("✅ Error handling test passed") + + def test_concurrent_processing( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + clean_database: None + ): + """Test processing multiple videos concurrently.""" + print(f"\n🔄 Testing concurrent video processing") + + # Create multiple output directories + num_concurrent = 3 + processors = [] + + for i in range(num_concurrent): + output_dir = temp_video_dir / f"concurrent_{i}" + config = ProcessorConfig( + base_path=output_dir, + output_formats=["mp4"], + quality_preset="low", + generate_thumbnails=False, # Disable for speed + generate_sprites=False + ) + processors.append(VideoProcessor(config)) + + # Process videos concurrently (simulate multiple instances) + results = [] + start_time = time.time() + + for i, processor in enumerate(processors): + result = processor.process_video(test_video_file, f"concurrent_test_{i}") + results.append(result) + + processing_time = time.time() - start_time + + # Verify all results + assert len(results) == num_concurrent + for i, result in enumerate(results): + assert result.video_id is not None + assert "mp4" in result.encoded_files + assert result.encoded_files["mp4"].exists() + + print(f"✅ Processed {num_concurrent} videos concurrently in {processing_time:.2f}s") + + +class TestVideoProcessingValidation: + """Tests for video processing validation and edge cases.""" + + def test_quality_preset_validation( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + clean_database: None + ): + """Test all quality presets produce valid output.""" + print(f"\n📊 Testing quality preset validation") + + presets = ["low", "medium", "high", "ultra"] + + for preset in presets: + output_dir = temp_video_dir / f"quality_{preset}" + config = ProcessorConfig( + base_path=output_dir, + output_formats=["mp4"], + quality_preset=preset, + generate_thumbnails=False, + generate_sprites=False + ) + + processor = VideoProcessor(config) + result = processor.process_video(test_video_file, f"quality_test_{preset}") + + # Verify output exists and has content + assert result.encoded_files["mp4"].exists() + assert result.encoded_files["mp4"].stat().st_size > 0 + + print(f" ✅ {preset} preset: {result.encoded_files['mp4'].stat().st_size} bytes") + + print("✅ All quality presets validated") + + def test_output_format_validation( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + clean_database: None + ): + """Test all supported output formats.""" + print(f"\n🎞️ Testing output format validation") + + formats = ["mp4", "webm", "ogv"] + + output_dir = temp_video_dir / "format_test" + config = ProcessorConfig( + base_path=output_dir, + output_formats=formats, + quality_preset="low", + generate_thumbnails=False, + generate_sprites=False + ) + + processor = VideoProcessor(config) + result = processor.process_video(test_video_file, "format_validation") + + # Verify all formats were created + for fmt in formats: + assert fmt in result.encoded_files + output_file = result.encoded_files[fmt] + assert output_file.exists() + assert output_file.suffix == f".{fmt}" + + print(f" ✅ {fmt}: {output_file.stat().st_size} bytes") + + print("✅ All output formats validated") + + +class TestVideoProcessingPerformance: + """Performance and resource usage tests.""" + + def test_processing_performance( + self, + docker_compose_project: str, + test_video_file: Path, + temp_video_dir: Path, + clean_database: None + ): + """Test processing performance metrics.""" + print(f"\n⚡ Testing processing performance") + + config = ProcessorConfig( + base_path=temp_video_dir / "performance_test", + output_formats=["mp4"], + quality_preset="low", + generate_thumbnails=True, + generate_sprites=True + ) + + processor = VideoProcessor(config) + + # Measure processing time + start_time = time.time() + result = processor.process_video(test_video_file, "performance_test") + processing_time = time.time() - start_time + + # Performance assertions (for 10s test video) + assert processing_time < 60, f"Processing took too long: {processing_time:.2f}s" + assert result.metadata.duration > 0 + + # Calculate processing ratio (processing_time / video_duration) + processing_ratio = processing_time / result.metadata.duration + + print(f"✅ Processing completed in {processing_time:.2f}s") + print(f" Video duration: {result.metadata.duration:.2f}s") + print(f" Processing ratio: {processing_ratio:.2f}x realtime") + + # Performance should be reasonable for test setup + assert processing_ratio < 10, f"Processing too slow: {processing_ratio:.2f}x realtime" \ No newline at end of file