🎬 Initial release: Professional video processing library

 Features:
- Multi-format encoding (MP4, WebM, OGV) with two-pass encoding
- Professional quality presets (Low, Medium, High, Ultra)
- Thumbnail generation and seekbar sprite creation
- Background processing with Procrastinate integration
- Type-safe configuration with Pydantic V2
- Modern Python tooling (uv, ruff, pytest)
- Comprehensive test suite and documentation

🛠️ Tech Stack:
- Python 3.11+ with full type hints
- FFmpeg integration via ffmpeg-python
- msprites2 fork for professional sprite generation
- Procrastinate for scalable background tasks
- Storage abstraction layer (local + future S3)

📚 Includes examples, API documentation, and development guides

🚀 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ryan Malloy 2025-09-05 08:01:33 -06:00
commit 8253c56d2c
24 changed files with 2530 additions and 0 deletions

79
.gitignore vendored Normal file
View File

@ -0,0 +1,79 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# uv
uv.lock
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Video processing artifacts
test_videos/
output/
*.mp4
*.webm
*.ogv
*.png
*.webvtt

488
README.md Normal file
View File

@ -0,0 +1,488 @@
<div align="center">
# 🎬 Video Processor
**A Modern Python Library for Professional Video Processing**
[![Python 3.11+](https://img.shields.io/badge/python-3.11+-blue.svg)](https://www.python.org/downloads/)
[![Built with uv](https://img.shields.io/badge/built%20with-uv-green)](https://github.com/astral-sh/uv)
[![Code style: ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![Type Checked](https://img.shields.io/badge/type%20checked-mypy-blue)](http://mypy-lang.org/)
[![Tests](https://img.shields.io/badge/tests-pytest-yellow)](https://pytest.org/)
*Extracted from the demostar Django application, now a standalone powerhouse for video encoding, thumbnail generation, and sprite creation.*
[Features](#-features) •
[Installation](#-installation) •
[Quick Start](#-quick-start) •
[Examples](#-examples) •
[API Reference](#-api-reference)
</div>
---
## ✨ Features
<table>
<tr>
<td width="50%">
### 🎥 **Video Encoding**
- **Multi-format support**: MP4 (H.264), WebM (VP9), OGV (Theora)
- **Two-pass encoding** for optimal quality
- **Professional presets**: Low, Medium, High, Ultra
- **Customizable bitrates** and quality settings
</td>
<td width="50%">
### 🖼️ **Thumbnails & Sprites**
- **Smart thumbnail extraction** at any timestamp
- **Seekbar sprite sheets** with WebVTT files
- **Configurable intervals** and dimensions
- **Mobile-optimized** output options
</td>
</tr>
<tr>
<td width="50%">
### ⚡ **Background Processing**
- **Procrastinate integration** for async tasks
- **PostgreSQL job queue** management
- **Scalable worker architecture**
- **Progress tracking** and error handling
</td>
<td width="50%">
### 🛠️ **Modern Development**
- **Type-safe** with full type hints
- **Pydantic V2** configuration validation
- **uv** for lightning-fast dependency management
- **ruff** for code quality and formatting
</td>
</tr>
</table>
---
## 📦 Installation
### Quick Install
```bash
# Using uv (recommended - fastest!)
uv add video-processor
# Or with pip
pip install video-processor
```
### Development Setup
```bash
git clone <repository>
cd video_processor
# Install with all development dependencies
uv sync --dev
# Verify installation
uv run pytest
```
---
## 🚀 Quick Start
### Basic Video Processing
```python
from pathlib import Path
from video_processor import VideoProcessor, ProcessorConfig
# 📋 Configure your processor
config = ProcessorConfig(
base_path=Path("/tmp/video_output"),
output_formats=["mp4", "webm"],
quality_preset="high" # 🎯 Professional quality
)
# 🎬 Initialize and process
processor = VideoProcessor(config)
result = processor.process_video(
input_path="input_video.mp4",
output_dir="outputs"
)
# 📊 Results
print(f"🎥 Video ID: {result.video_id}")
print(f"📁 Formats: {list(result.encoded_files.keys())}")
print(f"🖼️ Thumbnail: {result.thumbnail_file}")
print(f"🎞️ Sprites: {result.sprite_files}")
```
### Async Background Processing
```python
import asyncio
from video_processor.tasks import setup_procrastinate
async def process_in_background():
# 🗄️ Connect to PostgreSQL
app = setup_procrastinate("postgresql://user:pass@localhost/db")
# 📤 Submit job
job = await app.tasks.process_video_async.defer_async(
input_path="/path/to/video.mp4",
output_dir="/path/to/output",
config_dict={"quality_preset": "ultra"}
)
print(f"✅ Job queued: {job.id}")
asyncio.run(process_in_background())
```
---
## ⚙️ Configuration
### Quality Presets Comparison
<div align="center">
| 🎯 Preset | 📺 Video Bitrate | 🔊 Audio Bitrate | 🎨 CRF | 💡 Best For |
|-----------|------------------|------------------|---------|-------------|
| **Low** | 1,000k | 128k | 28 | 📱 Mobile, limited bandwidth |
| **Medium** | 2,500k | 192k | 23 | 🌐 Standard web delivery |
| **High** | 5,000k | 256k | 18 | 🎬 High-quality streaming |
| **Ultra** | 10,000k | 320k | 15 | 🏛️ Archive, professional use |
</div>
### Advanced Configuration
```python
from video_processor import ProcessorConfig
from pathlib import Path
config = ProcessorConfig(
# 📂 Storage & Paths
base_path=Path("/media/videos"),
storage_backend="local", # 🔮 S3 coming soon!
# 🎥 Video Settings
output_formats=["mp4", "webm", "ogv"],
quality_preset="ultra",
# 🖼️ Thumbnails & Sprites
thumbnail_timestamp=30, # 📍 30 seconds in
sprite_interval=5.0, # 🎞️ Every 5 seconds
# 🛠️ System
ffmpeg_path="/usr/local/bin/ffmpeg" # 🔧 Custom FFmpeg
)
```
---
## 💡 Examples
Explore our comprehensive examples in the [`examples/`](examples/) directory:
### 📝 Available Examples
| Example | Description | Features |
|---------|-------------|-----------|
| [`basic_usage.py`](examples/basic_usage.py) | 🎯 Simple synchronous processing | Configuration, encoding, thumbnails |
| [`async_processing.py`](examples/async_processing.py) | ⚡ Background task processing | Procrastinate, job queuing, monitoring |
| [`custom_config.py`](examples/custom_config.py) | 🛠️ Advanced configuration scenarios | Quality presets, validation, custom paths |
### 🎬 Real-World Usage Patterns
<details>
<summary><b>🏢 Production Video Pipeline</b></summary>
```python
# Multi-format encoding for video platform
config = ProcessorConfig(
base_path=Path("/var/media/uploads"),
output_formats=["mp4", "webm"], # Cross-browser support
quality_preset="high",
sprite_interval=10.0 # Balanced performance
)
processor = VideoProcessor(config)
result = processor.process_video(user_upload, output_dir)
# Generate multiple qualities
for quality in ["medium", "high"]:
config.quality_preset = quality
processor = VideoProcessor(config)
# Process to different quality folders...
```
</details>
<details>
<summary><b>📱 Mobile-Optimized Processing</b></summary>
```python
# Lightweight encoding for mobile delivery
mobile_config = ProcessorConfig(
base_path=Path("/tmp/mobile_videos"),
output_formats=["mp4"], # Mobile-friendly format
quality_preset="low", # Reduced bandwidth
sprite_interval=15.0 # Fewer sprites
)
```
</details>
---
## 📚 API Reference
### 🎬 VideoProcessor
The main orchestrator for all video processing operations.
#### 🔧 Methods
```python
# Process video to all configured formats
result = processor.process_video(
input_path: Path | str,
output_dir: Path | str | None = None,
video_id: str | None = None
) -> VideoProcessingResult
# Encode to specific format
output_path = processor.encode_video(
input_path: Path,
output_dir: Path,
format_name: str,
video_id: str
) -> Path
# Generate thumbnail at timestamp
thumbnail = processor.generate_thumbnail(
video_path: Path,
output_dir: Path,
timestamp: int,
video_id: str
) -> Path
# Create sprite sheet and WebVTT
sprites = processor.generate_sprites(
video_path: Path,
output_dir: Path,
video_id: str
) -> tuple[Path, Path]
```
### ⚙️ ProcessorConfig
Type-safe configuration with automatic validation.
#### 📋 Essential Fields
```python
class ProcessorConfig:
base_path: Path # 📂 Base directory
output_formats: list[str] # 🎥 Video formats
quality_preset: str # 🎯 Quality level
storage_backend: str # 💾 Storage type
ffmpeg_path: str # 🛠️ FFmpeg binary
thumbnail_timestamp: int # 🖼️ Thumbnail position
sprite_interval: float # 🎞️ Sprite frequency
```
### 📊 VideoProcessingResult
Comprehensive result object with all output information.
```python
@dataclass
class VideoProcessingResult:
video_id: str # 🆔 Unique identifier
encoded_files: dict[str, Path] # 📁 Format → file mapping
thumbnail_file: Path | None # 🖼️ Thumbnail image
sprite_files: tuple[Path, Path] | None # 🎞️ Sprite + WebVTT
metadata: VideoMetadata # 📊 Video properties
```
---
## 🧪 Development
### 🛠️ Development Commands
```bash
# 📦 Install dependencies
uv sync
# 🧪 Run test suite
uv run pytest -v
# 📊 Test coverage
uv run pytest --cov=video_processor
# ✨ Code formatting
uv run ruff format .
# 🔍 Linting
uv run ruff check .
# 🎯 Type checking
uv run mypy src/
```
### 📈 Test Coverage
Our comprehensive test suite covers:
- ✅ Configuration validation and type checking
- ✅ Path utilities and file operations
- ✅ FFmpeg integration and error handling
- ✅ Video metadata extraction
- ✅ Background task processing
```bash
========================== test session starts ==========================
tests/test_config.py ✅✅✅✅ [33%]
tests/test_utils.py ✅✅✅✅✅✅✅✅ [100%]
======================== 12 passed in 0.11s ========================
```
---
## 📦 Dependencies
### 🎯 Core Dependencies
| Package | Purpose | Why We Use It |
|---------|---------|---------------|
| `ffmpeg-python` | FFmpeg integration | 🎬 Professional video processing |
| `msprites2` | Sprite generation | 🎞️ Seekbar thumbnails (forked for fixes) |
| `procrastinate` | Background tasks | ⚡ Scalable async processing |
| `pydantic` | Configuration | ⚙️ Type-safe settings validation |
| `pillow` | Image processing | 🖼️ Thumbnail manipulation |
### 🔧 Development Tools
| Tool | Purpose | Benefits |
|------|---------|----------|
| `uv` | Package management | 🚀 Ultra-fast dependency resolution |
| `ruff` | Linting & formatting | ⚡ Lightning-fast code quality |
| `pytest` | Testing framework | 🧪 Reliable test execution |
| `mypy` | Type checking | 🎯 Static type analysis |
| `coverage` | Test coverage | 📊 Quality assurance |
---
## 🌟 Why Video Processor?
<div align="center">
### 🆚 Comparison with Alternatives
| Feature | Video Processor | FFmpeg CLI | moviepy | OpenCV |
|---------|----------------|------------|---------|--------|
| **Two-pass encoding** | ✅ | ✅ | ❌ | ❌ |
| **Multiple formats** | ✅ | ✅ | ✅ | ❌ |
| **Background processing** | ✅ | ❌ | ❌ | ❌ |
| **Type safety** | ✅ | ❌ | ❌ | ❌ |
| **Sprite generation** | ✅ | ❌ | ❌ | ❌ |
| **Modern Python** | ✅ | N/A | ❌ | ❌ |
</div>
---
## 📋 Requirements
### 🖥️ System Requirements
- **Python 3.11+** - Modern Python features
- **FFmpeg** - Video processing engine
- **PostgreSQL** - Background job processing (optional)
### 🐧 Installation Commands
```bash
# Ubuntu/Debian
sudo apt install ffmpeg postgresql-client
# macOS
brew install ffmpeg postgresql
# Arch Linux
sudo pacman -S ffmpeg postgresql
```
---
## 🤝 Contributing
We welcome contributions! Here's how to get started:
### 🚀 Quick Contribution Guide
1. **🍴 Fork** the repository
2. **🌿 Create** a feature branch (`git checkout -b feature/amazing-feature`)
3. **📝 Make** your changes with tests
4. **🧪 Test** everything (`uv run pytest`)
5. **✨ Format** code (`uv run ruff format .`)
6. **📤 Submit** a pull request
### 🎯 Areas We'd Love Help With
- 🌐 **S3 storage backend** implementation
- 🎞️ **Additional video formats** (AV1, HEVC)
- 📊 **Progress tracking** and monitoring
- 🐳 **Docker integration** examples
- 📖 **Documentation** improvements
---
## 📜 License
This project is licensed under the **MIT License** - see the [LICENSE](LICENSE) file for details.
---
## 🎉 Changelog
### 🌟 v0.1.0 - Initial Release
- ✨ **Multi-format encoding**: MP4, WebM, OGV support
- 🖼️ **Thumbnail generation** with customizable timestamps
- 🎞️ **Sprite sheet creation** with WebVTT files
- ⚡ **Background processing** with Procrastinate
- ⚙️ **Type-safe configuration** with Pydantic V2
- 🛠️ **Modern tooling**: uv, ruff, pytest integration
- 📚 **Comprehensive documentation** and examples
---
<div align="center">
### 🙋‍♀️ Questions? Issues? Ideas?
**Found a bug?** [Open an issue](https://github.com/your-repo/issues/new/choose)
**Have a feature request?** [Start a discussion](https://github.com/your-repo/discussions)
**Want to contribute?** Check out our [contribution guide](#-contributing)
---
**Built with ❤️ for the video processing community**
*Making professional video encoding accessible to everyone*
</div>

View File

@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Asynchronous video processing example using Procrastinate tasks.
This example demonstrates:
- Setting up Procrastinate for background processing
- Submitting video processing tasks
- Monitoring task status
"""
import asyncio
import tempfile
from pathlib import Path
import procrastinate
from video_processor import ProcessorConfig
from video_processor.tasks import setup_procrastinate
async def async_processing_example():
"""Demonstrate asynchronous video processing with Procrastinate."""
# Database connection string (adjust for your setup)
# For testing, you might use: "postgresql://user:password@localhost/dbname"
database_url = "postgresql://localhost/procrastinate_test"
try:
# Set up Procrastinate
app = setup_procrastinate(database_url)
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create config dictionary for serialization
config_dict = {
"base_path": str(temp_path),
"output_formats": ["mp4", "webm"],
"quality_preset": "medium",
}
# Example input file
input_file = Path("example_input.mp4")
if input_file.exists():
print(f"Submitting async processing job for: {input_file}")
# Submit video processing task
job = await app.tasks.process_video_async.defer_async(
input_path=str(input_file),
output_dir=str(temp_path / "outputs"),
config_dict=config_dict
)
print(f"Job submitted with ID: {job.id}")
print("Processing in background...")
# In a real application, you would monitor the job status
# and handle results when the task completes
else:
print(f"Input file not found: {input_file}")
print("Create an example video file or modify the path.")
except Exception as e:
print(f"Database connection failed: {e}")
print("Make sure PostgreSQL is running and the database exists.")
async def thumbnail_generation_example():
"""Demonstrate standalone thumbnail generation."""
database_url = "postgresql://localhost/procrastinate_test"
try:
app = setup_procrastinate(database_url)
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
input_file = Path("example_input.mp4")
if input_file.exists():
print("Submitting thumbnail generation job...")
job = await app.tasks.generate_thumbnail_async.defer_async(
video_path=str(input_file),
output_dir=str(temp_path),
timestamp=30, # 30 seconds into the video
video_id="example_thumb"
)
print(f"Thumbnail job submitted: {job.id}")
else:
print("Input file not found for thumbnail generation.")
except Exception as e:
print(f"Database connection failed: {e}")
if __name__ == "__main__":
print("=== Async Video Processing Example ===")
asyncio.run(async_processing_example())
print("\n=== Thumbnail Generation Example ===")
asyncio.run(thumbnail_generation_example())

68
examples/basic_usage.py Normal file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""
Basic usage example for the video processor module.
This example demonstrates:
- Creating a processor configuration
- Processing a video file to multiple formats
- Generating thumbnails and sprites
"""
import tempfile
from pathlib import Path
from video_processor import ProcessorConfig, VideoProcessor
def basic_processing_example():
"""Demonstrate basic video processing functionality."""
# Create a temporary directory for outputs
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create configuration
config = ProcessorConfig(
base_path=temp_path,
output_formats=["mp4", "webm"],
quality_preset="medium",
)
# Initialize processor
processor = VideoProcessor(config)
# Example input file (replace with actual video file path)
input_file = Path("example_input.mp4")
if input_file.exists():
print(f"Processing video: {input_file}")
# Process the video
result = processor.process_video(
input_path=input_file,
output_dir=temp_path / "outputs"
)
print(f"Processing complete!")
print(f"Video ID: {result.video_id}")
print(f"Formats created: {list(result.encoded_files.keys())}")
# Display output files
for format_name, file_path in result.encoded_files.items():
print(f" {format_name}: {file_path}")
if result.thumbnail_file:
print(f"Thumbnail: {result.thumbnail_file}")
if result.sprite_files:
sprite_img, sprite_vtt = result.sprite_files
print(f"Sprite image: {sprite_img}")
print(f"Sprite WebVTT: {sprite_vtt}")
else:
print(f"Input file not found: {input_file}")
print("Create an example video file or modify the path in this script.")
if __name__ == "__main__":
basic_processing_example()

128
examples/custom_config.py Normal file
View File

@ -0,0 +1,128 @@
#!/usr/bin/env python3
"""
Custom configuration examples for the video processor.
This example demonstrates:
- Creating custom quality presets
- Configuring different output formats
- Using custom FFmpeg paths
- Storage backend configuration
"""
import tempfile
from pathlib import Path
from video_processor import ProcessorConfig, VideoProcessor
def high_quality_processing():
"""Example of high-quality video processing configuration."""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# High-quality configuration
config = ProcessorConfig(
base_path=temp_path,
output_formats=["mp4", "webm", "ogv"], # All formats
quality_preset="ultra", # Highest quality
sprite_interval=5.0, # Sprite every 5 seconds
thumbnail_timestamp=10, # Thumbnail at 10 seconds
# ffmpeg_path="/usr/local/bin/ffmpeg", # Custom FFmpeg path if needed
)
processor = VideoProcessor(config)
print("High-quality processor configured:")
print(f" Quality preset: {config.quality_preset}")
print(f" Output formats: {config.output_formats}")
print(f" Sprite interval: {config.sprite_interval}s")
print(f" FFmpeg path: {config.ffmpeg_path}")
def mobile_optimized_processing():
"""Example of mobile-optimized processing configuration."""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Mobile-optimized configuration
config = ProcessorConfig(
base_path=temp_path,
output_formats=["mp4"], # Just MP4 for mobile compatibility
quality_preset="low", # Lower bitrate for mobile
sprite_interval=10.0, # Fewer sprites to save bandwidth
)
processor = VideoProcessor(config)
print("\nMobile-optimized processor configured:")
print(f" Quality preset: {config.quality_preset}")
print(f" Output formats: {config.output_formats}")
print(f" Sprite interval: {config.sprite_interval}s")
def custom_paths_and_storage():
"""Example of custom paths and storage configuration."""
# Custom base path
custom_base = Path("/tmp/video_processing")
custom_base.mkdir(exist_ok=True)
config = ProcessorConfig(
base_path=custom_base,
storage_backend="local", # Could be "s3" in the future
output_formats=["mp4", "webm"],
quality_preset="medium",
)
# The processor will use the custom paths
processor = VideoProcessor(config)
print(f"\nCustom paths processor:")
print(f" Base path: {config.base_path}")
print(f" Storage backend: {config.storage_backend}")
# Clean up
if custom_base.exists():
try:
custom_base.rmdir()
except OSError:
pass # Directory not empty
def validate_config_examples():
"""Demonstrate configuration validation."""
print(f"\nConfiguration validation examples:")
try:
# This should work fine
config = ProcessorConfig(
base_path=Path("/tmp"),
quality_preset="medium"
)
print("✓ Valid configuration created")
except Exception as e:
print(f"✗ Configuration failed: {e}")
try:
# This should fail due to invalid quality preset
config = ProcessorConfig(
base_path=Path("/tmp"),
quality_preset="invalid_preset" # This will cause validation error
)
print("✓ This shouldn't print - validation should fail")
except Exception as e:
print(f"✓ Expected validation error: {e}")
if __name__ == "__main__":
print("=== Video Processor Configuration Examples ===")
high_quality_processing()
mobile_optimized_processing()
custom_paths_and_storage()
validate_config_examples()

88
pyproject.toml Normal file
View File

@ -0,0 +1,88 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "video-processor"
version = "0.1.0"
description = "Standalone video processing pipeline with multiple format encoding"
authors = [{name = "Video Processor", email = "dev@example.com"}]
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"ffmpeg-python>=0.2.0",
"pillow>=11.2.1",
"msprites2 @ git+https://github.com/rsp2k/msprites2.git",
"procrastinate>=2.15.1",
"psycopg[pool]>=3.2.9",
"python-dateutil>=2.9.0",
"pydantic>=2.0.0",
"pydantic-settings>=2.0.0",
]
[project.optional-dependencies]
dev = [
"ruff>=0.1.0",
"mypy>=1.7.0",
"pytest>=7.0.0",
"pytest-cov>=4.0.0",
]
[tool.hatch.build.targets.wheel]
packages = ["src/video_processor"]
[tool.hatch.build.targets.sdist]
include = [
"/src",
"/tests",
"/README.md",
"/pyproject.toml",
]
[tool.hatch.metadata]
allow-direct-references = true
[tool.ruff]
target-version = "py311"
line-length = 88
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
ignore = [
"E501", # line too long (handled by formatter)
]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["S101"] # Allow assert in tests
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_configs = true
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
[dependency-groups]
dev = [
"mypy>=1.17.1",
"pytest>=8.4.2",
"pytest-cov>=6.2.1",
"ruff>=0.12.12",
]

View File

@ -0,0 +1,19 @@
"""
Video Processor - Standalone video processing pipeline.
A professional video processing library extracted from the demostar system,
featuring multiple format encoding, thumbnail generation, and background processing.
"""
from .config import ProcessorConfig
from .core.processor import VideoProcessor
from .exceptions import EncodingError, StorageError, VideoProcessorError
__version__ = "0.1.0"
__all__ = [
"VideoProcessor",
"ProcessorConfig",
"VideoProcessorError",
"EncodingError",
"StorageError",
]

View File

@ -0,0 +1,52 @@
"""Configuration management using Pydantic."""
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, field_validator
class ProcessorConfig(BaseModel):
"""Configuration for video processor."""
# Storage settings
storage_backend: Literal["local", "s3"] = "local"
base_path: Path = Field(default=Path("/tmp/videos"))
# Encoding settings
output_formats: list[Literal["mp4", "webm", "ogv"]] = Field(default=["mp4"])
quality_preset: Literal["low", "medium", "high", "ultra"] = "medium"
# FFmpeg settings
ffmpeg_path: str = "/usr/bin/ffmpeg"
# Thumbnail settings
thumbnail_timestamps: list[int] = Field(default=[1]) # seconds
thumbnail_width: int = 640
# Sprite settings
generate_sprites: bool = True
sprite_interval: int = 10 # seconds between sprite frames
# Custom FFmpeg options
custom_ffmpeg_options: dict[str, str] = Field(default_factory=dict)
# File permissions
file_permissions: int = 0o644
directory_permissions: int = 0o755
@field_validator("base_path")
@classmethod
def validate_base_path(cls, v: Path) -> Path:
"""Ensure base path is absolute."""
return v.resolve()
@field_validator("output_formats")
@classmethod
def validate_output_formats(cls, v: list[str]) -> list[str]:
"""Ensure at least one output format is specified."""
if not v:
raise ValueError("At least one output format must be specified")
return v
model_config = ConfigDict(validate_assignment=True)

View File

@ -0,0 +1,5 @@
"""Core video processing modules."""
from .processor import VideoProcessor
__all__ = ["VideoProcessor"]

View File

@ -0,0 +1,265 @@
"""Video encoding using FFmpeg."""
import subprocess
from pathlib import Path
from ..config import ProcessorConfig
from ..exceptions import EncodingError, FFmpegError
class VideoEncoder:
"""Handles video encoding operations using FFmpeg."""
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
self._quality_presets = self._get_quality_presets()
def _get_quality_presets(self) -> dict[str, dict[str, str]]:
"""Get quality presets for different output formats."""
return {
"low": {
"video_bitrate": "1000k",
"min_bitrate": "500k",
"max_bitrate": "1500k",
"audio_bitrate": "128k",
"crf": "28",
},
"medium": {
"video_bitrate": "2500k",
"min_bitrate": "1000k",
"max_bitrate": "4000k",
"audio_bitrate": "192k",
"crf": "23",
},
"high": {
"video_bitrate": "5000k",
"min_bitrate": "2000k",
"max_bitrate": "8000k",
"audio_bitrate": "256k",
"crf": "18",
},
"ultra": {
"video_bitrate": "10000k",
"min_bitrate": "5000k",
"max_bitrate": "15000k",
"audio_bitrate": "320k",
"crf": "15",
},
}
def encode_video(
self,
input_path: Path,
output_dir: Path,
format_name: str,
video_id: str,
) -> Path:
"""
Encode video to specified format.
Args:
input_path: Input video file
output_dir: Output directory
format_name: Output format (mp4, webm, ogv)
video_id: Unique video identifier
Returns:
Path to encoded file
"""
if format_name == "mp4":
return self._encode_mp4(input_path, output_dir, video_id)
elif format_name == "webm":
return self._encode_webm(input_path, output_dir, video_id)
elif format_name == "ogv":
return self._encode_ogv(input_path, output_dir, video_id)
else:
raise EncodingError(f"Unsupported format: {format_name}")
def _encode_mp4(self, input_path: Path, output_dir: Path, video_id: str) -> Path:
"""Encode video to MP4 using two-pass encoding."""
output_file = output_dir / f"{video_id}.mp4"
passlog_file = output_dir / f"{video_id}.ffmpeg2pass"
quality = self._quality_presets[self.config.quality_preset]
def clean_passlogs() -> None:
"""Clean up FFmpeg pass log files."""
for suffix in ["-0.log", "-0.log.mbtree"]:
log_file = Path(f"{passlog_file}{suffix}")
if log_file.exists():
log_file.unlink()
clean_passlogs()
try:
# Pass 1 - Analysis pass
pass1_cmd = [
self.config.ffmpeg_path,
"-y",
"-i",
str(input_path),
"-passlogfile",
str(passlog_file),
"-c:v",
"libx264",
"-b:v",
quality["video_bitrate"],
"-minrate",
quality["min_bitrate"],
"-maxrate",
quality["max_bitrate"],
"-pass",
"1",
"-an", # No audio in pass 1
"-f",
"mp4",
"/dev/null",
]
result = subprocess.run(pass1_cmd, capture_output=True, text=True)
if result.returncode != 0:
raise FFmpegError(f"Pass 1 failed: {result.stderr}")
# Pass 2 - Final encoding
pass2_cmd = [
self.config.ffmpeg_path,
"-y",
"-i",
str(input_path),
"-passlogfile",
str(passlog_file),
"-c:v",
"libx264",
"-b:v",
quality["video_bitrate"],
"-minrate",
quality["min_bitrate"],
"-maxrate",
quality["max_bitrate"],
"-pass",
"2",
"-c:a",
"aac",
"-b:a",
quality["audio_bitrate"],
"-movflags",
"faststart",
str(output_file),
]
result = subprocess.run(pass2_cmd, capture_output=True, text=True)
if result.returncode != 0:
raise FFmpegError(f"Pass 2 failed: {result.stderr}")
finally:
clean_passlogs()
if not output_file.exists():
raise EncodingError("MP4 encoding failed - output file not created")
return output_file
def _encode_webm(self, input_path: Path, output_dir: Path, video_id: str) -> Path:
"""Encode video to WebM using VP9."""
# Use MP4 as input if it exists for better quality
mp4_file = output_dir / f"{video_id}.mp4"
source_file = mp4_file if mp4_file.exists() else input_path
output_file = output_dir / f"{video_id}.webm"
passlog_file = output_dir / f"{video_id}.webm-pass"
quality = self._quality_presets[self.config.quality_preset]
try:
# Pass 1
pass1_cmd = [
self.config.ffmpeg_path,
"-y",
"-i",
str(source_file),
"-passlogfile",
str(passlog_file),
"-c:v",
"libvpx-vp9",
"-b:v",
"0",
"-crf",
quality["crf"],
"-pass",
"1",
"-an",
"-f",
"null",
"/dev/null",
]
result = subprocess.run(pass1_cmd, capture_output=True, text=True)
if result.returncode != 0:
raise FFmpegError(f"WebM Pass 1 failed: {result.stderr}")
# Pass 2
pass2_cmd = [
self.config.ffmpeg_path,
"-y",
"-i",
str(source_file),
"-passlogfile",
str(passlog_file),
"-c:v",
"libvpx-vp9",
"-b:v",
"0",
"-crf",
quality["crf"],
"-pass",
"2",
"-c:a",
"libopus",
str(output_file),
]
result = subprocess.run(pass2_cmd, capture_output=True, text=True)
if result.returncode != 0:
raise FFmpegError(f"WebM Pass 2 failed: {result.stderr}")
finally:
# Clean up pass log
pass_log = Path(f"{passlog_file}-0.log")
if pass_log.exists():
pass_log.unlink()
if not output_file.exists():
raise EncodingError("WebM encoding failed - output file not created")
return output_file
def _encode_ogv(self, input_path: Path, output_dir: Path, video_id: str) -> Path:
"""Encode video to OGV using Theora."""
# Use MP4 as input if it exists for better quality
mp4_file = output_dir / f"{video_id}.mp4"
source_file = mp4_file if mp4_file.exists() else input_path
output_file = output_dir / f"{video_id}.ogv"
cmd = [
self.config.ffmpeg_path,
"-y",
"-i",
str(source_file),
"-codec:v",
"libtheora",
"-qscale:v",
"6",
"-codec:a",
"libvorbis",
"-qscale:a",
"6",
str(output_file),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise FFmpegError(f"OGV encoding failed: {result.stderr}")
if not output_file.exists():
raise EncodingError("OGV encoding failed - output file not created")
return output_file

View File

@ -0,0 +1,136 @@
"""Video metadata extraction using FFmpeg probe."""
from pathlib import Path
from typing import Any
import ffmpeg
from ..config import ProcessorConfig
from ..exceptions import FFmpegError
class VideoMetadata:
"""Handles video metadata extraction."""
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
def extract_metadata(self, video_path: Path) -> dict[str, Any]:
"""
Extract comprehensive metadata from video file.
Args:
video_path: Path to video file
Returns:
Dictionary containing video metadata
"""
try:
probe_data = ffmpeg.probe(str(video_path))
# Extract general format information
format_info = probe_data.get("format", {})
# Extract video stream information
video_stream = self._get_video_stream(probe_data)
audio_stream = self._get_audio_stream(probe_data)
metadata = {
# File information
"filename": video_path.name,
"file_size": int(format_info.get("size", 0)),
"duration": float(format_info.get("duration", 0)),
"bitrate": int(format_info.get("bit_rate", 0)),
"format_name": format_info.get("format_name", ""),
"format_long_name": format_info.get("format_long_name", ""),
# Video stream information
"video": self._extract_video_metadata(video_stream)
if video_stream
else None,
# Audio stream information
"audio": self._extract_audio_metadata(audio_stream)
if audio_stream
else None,
# All streams count
"stream_count": len(probe_data.get("streams", [])),
# Raw probe data for advanced use cases
"raw_probe_data": probe_data,
}
return metadata
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else "Unknown FFmpeg error"
raise FFmpegError(f"Metadata extraction failed: {error_msg}") from e
except Exception as e:
raise FFmpegError(f"Metadata extraction failed: {e}") from e
def _get_video_stream(self, probe_data: dict[str, Any]) -> dict[str, Any] | None:
"""Get the primary video stream from probe data."""
streams = probe_data.get("streams", [])
return next(
(stream for stream in streams if stream.get("codec_type") == "video"), None
)
def _get_audio_stream(self, probe_data: dict[str, Any]) -> dict[str, Any] | None:
"""Get the primary audio stream from probe data."""
streams = probe_data.get("streams", [])
return next(
(stream for stream in streams if stream.get("codec_type") == "audio"), None
)
def _extract_video_metadata(self, video_stream: dict[str, Any]) -> dict[str, Any]:
"""Extract video-specific metadata."""
return {
"codec_name": video_stream.get("codec_name", ""),
"codec_long_name": video_stream.get("codec_long_name", ""),
"width": int(video_stream.get("width", 0)),
"height": int(video_stream.get("height", 0)),
"aspect_ratio": video_stream.get("display_aspect_ratio", ""),
"pixel_format": video_stream.get("pix_fmt", ""),
"framerate": self._parse_framerate(video_stream.get("r_frame_rate", "")),
"avg_framerate": self._parse_framerate(
video_stream.get("avg_frame_rate", "")
),
"bitrate": int(video_stream.get("bit_rate", 0))
if video_stream.get("bit_rate")
else None,
"duration": float(video_stream.get("duration", 0))
if video_stream.get("duration")
else None,
"frame_count": int(video_stream.get("nb_frames", 0))
if video_stream.get("nb_frames")
else None,
}
def _extract_audio_metadata(self, audio_stream: dict[str, Any]) -> dict[str, Any]:
"""Extract audio-specific metadata."""
return {
"codec_name": audio_stream.get("codec_name", ""),
"codec_long_name": audio_stream.get("codec_long_name", ""),
"sample_rate": int(audio_stream.get("sample_rate", 0))
if audio_stream.get("sample_rate")
else None,
"channels": int(audio_stream.get("channels", 0)),
"channel_layout": audio_stream.get("channel_layout", ""),
"bitrate": int(audio_stream.get("bit_rate", 0))
if audio_stream.get("bit_rate")
else None,
"duration": float(audio_stream.get("duration", 0))
if audio_stream.get("duration")
else None,
}
def _parse_framerate(self, framerate_str: str) -> float | None:
"""Parse framerate string like '30/1' to float."""
if not framerate_str or framerate_str == "0/0":
return None
try:
if "/" in framerate_str:
numerator, denominator = framerate_str.split("/")
return float(numerator) / float(denominator)
else:
return float(framerate_str)
except (ValueError, ZeroDivisionError):
return None

View File

@ -0,0 +1,140 @@
"""Main video processor class."""
import uuid
from pathlib import Path
from ..config import ProcessorConfig
from ..exceptions import ValidationError, VideoProcessorError
from ..storage.backends import LocalStorageBackend, StorageBackend
from .encoders import VideoEncoder
from .metadata import VideoMetadata
from .thumbnails import ThumbnailGenerator
class VideoProcessingResult:
"""Result of video processing operation."""
def __init__(
self,
video_id: str,
input_path: Path,
output_path: Path,
encoded_files: dict[str, Path],
thumbnails: list[Path],
sprite_file: Path | None = None,
webvtt_file: Path | None = None,
metadata: dict | None = None,
) -> None:
self.video_id = video_id
self.input_path = input_path
self.output_path = output_path
self.encoded_files = encoded_files
self.thumbnails = thumbnails
self.sprite_file = sprite_file
self.webvtt_file = webvtt_file
self.metadata = metadata
class VideoProcessor:
"""Main video processing class."""
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
self.storage = self._create_storage_backend()
self.encoder = VideoEncoder(config)
self.thumbnail_generator = ThumbnailGenerator(config)
self.metadata_extractor = VideoMetadata(config)
def _create_storage_backend(self) -> StorageBackend:
"""Create storage backend based on configuration."""
if self.config.storage_backend == "local":
return LocalStorageBackend(self.config)
elif self.config.storage_backend == "s3":
# TODO: Implement S3StorageBackend
raise NotImplementedError("S3 storage backend not implemented yet")
else:
raise ValidationError(
f"Unknown storage backend: {self.config.storage_backend}"
)
def process_video(
self,
input_path: Path | str,
output_dir: Path | str | None = None,
video_id: str | None = None,
) -> VideoProcessingResult:
"""
Process a video file with encoding, thumbnails, and sprites.
Args:
input_path: Path to input video file
output_dir: Output directory (defaults to config base_path)
video_id: Unique identifier for video (auto-generated if None)
Returns:
VideoProcessingResult with all generated files
"""
input_path = Path(input_path)
if not input_path.exists():
raise ValidationError(f"Input file does not exist: {input_path}")
# Generate unique video ID if not provided
if video_id is None:
video_id = str(uuid.uuid4())[:8]
# Set up output directory
if output_dir is None:
output_dir = self.config.base_path / video_id
else:
output_dir = Path(output_dir) / video_id
# Create output directory
self.storage.create_directory(output_dir)
try:
# Extract metadata first
metadata = self.metadata_extractor.extract_metadata(input_path)
# Encode video in requested formats
encoded_files = {}
for format_name in self.config.output_formats:
encoded_file = self.encoder.encode_video(
input_path, output_dir, format_name, video_id
)
encoded_files[format_name] = encoded_file
# Generate thumbnails
thumbnails = []
for timestamp in self.config.thumbnail_timestamps:
thumbnail = self.thumbnail_generator.generate_thumbnail(
encoded_files.get("mp4", input_path),
output_dir,
timestamp,
video_id,
)
thumbnails.append(thumbnail)
# Generate sprites if enabled
sprite_file = None
webvtt_file = None
if self.config.generate_sprites and "mp4" in encoded_files:
sprite_file, webvtt_file = self.thumbnail_generator.generate_sprites(
encoded_files["mp4"], output_dir, video_id
)
return VideoProcessingResult(
video_id=video_id,
input_path=input_path,
output_path=output_dir,
encoded_files=encoded_files,
thumbnails=thumbnails,
sprite_file=sprite_file,
webvtt_file=webvtt_file,
metadata=metadata,
)
except Exception as e:
# Clean up on failure
if output_dir.exists():
self.storage.cleanup_directory(output_dir)
raise VideoProcessorError(f"Video processing failed: {e}") from e

View File

@ -0,0 +1,143 @@
"""Thumbnail and sprite generation using FFmpeg and msprites2."""
from pathlib import Path
import ffmpeg
from msprites2 import MontageSprites
from ..config import ProcessorConfig
from ..exceptions import EncodingError, FFmpegError
class ThumbnailGenerator:
"""Handles thumbnail and sprite generation."""
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
def generate_thumbnail(
self,
video_path: Path,
output_dir: Path,
timestamp: int,
video_id: str,
) -> Path:
"""
Generate a thumbnail image from video at specified timestamp.
Args:
video_path: Path to video file
output_dir: Output directory
timestamp: Time in seconds to extract thumbnail
video_id: Unique video identifier
Returns:
Path to generated thumbnail
"""
output_file = output_dir / f"{video_id}_thumb_{timestamp}.png"
try:
# Get video info to determine width and duration
probe = ffmpeg.probe(str(video_path))
video_stream = next(
(
stream
for stream in probe["streams"]
if stream["codec_type"] == "video"
),
None,
)
if not video_stream:
raise FFmpegError("No video stream found in input file")
width = video_stream["width"]
duration = float(video_stream.get("duration", 0))
# Adjust timestamp if beyond video duration
if timestamp >= duration:
timestamp = max(1, int(duration // 2))
# Generate thumbnail using ffmpeg-python
(
ffmpeg.input(str(video_path), ss=timestamp)
.filter("scale", width, -1)
.output(str(output_file), vframes=1)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else "Unknown FFmpeg error"
raise FFmpegError(f"Thumbnail generation failed: {error_msg}") from e
if not output_file.exists():
raise EncodingError(
"Thumbnail generation failed - output file not created"
)
return output_file
def generate_sprites(
self,
video_path: Path,
output_dir: Path,
video_id: str,
) -> tuple[Path, Path]:
"""
Generate sprite sheet and WebVTT file for seekbar thumbnails.
Args:
video_path: Path to video file
output_dir: Output directory
video_id: Unique video identifier
Returns:
Tuple of (sprite_file_path, webvtt_file_path)
"""
sprite_file = output_dir / f"{video_id}_sprite.jpg"
webvtt_file = output_dir / f"{video_id}_sprite.webvtt"
thumbnail_dir = output_dir / "frames"
# Create frames directory
thumbnail_dir.mkdir(exist_ok=True)
try:
# Generate sprites using msprites2 (the forked library)
MontageSprites.from_media(
video_path=str(video_path),
thumbnail_dir=str(thumbnail_dir),
sprite_file=str(sprite_file),
webvtt_file=str(webvtt_file),
# Optional parameters - can be made configurable
interval=self.config.sprite_interval,
width=160, # Individual thumbnail width
height=90, # Individual thumbnail height
columns=10, # Thumbnails per row in sprite
)
except Exception as e:
raise EncodingError(f"Sprite generation failed: {e}") from e
if not sprite_file.exists():
raise EncodingError("Sprite generation failed - sprite file not created")
if not webvtt_file.exists():
raise EncodingError("Sprite generation failed - WebVTT file not created")
# Clean up temporary frames directory
self._cleanup_frames_directory(thumbnail_dir)
return sprite_file, webvtt_file
def _cleanup_frames_directory(self, frames_dir: Path) -> None:
"""Clean up temporary frame files."""
try:
if frames_dir.exists():
for frame_file in frames_dir.iterdir():
if frame_file.is_file():
frame_file.unlink()
frames_dir.rmdir()
except Exception:
# Don't fail the entire process if cleanup fails
pass

View File

@ -0,0 +1,21 @@
"""Custom exceptions for video processing."""
class VideoProcessorError(Exception):
"""Base exception for video processor errors."""
class EncodingError(VideoProcessorError):
"""Raised when video encoding fails."""
class StorageError(VideoProcessorError):
"""Raised when storage operations fail."""
class ValidationError(VideoProcessorError):
"""Raised when input validation fails."""
class FFmpegError(VideoProcessorError):
"""Raised when FFmpeg operations fail."""

View File

@ -0,0 +1,5 @@
"""Storage backend modules."""
from .backends import LocalStorageBackend, StorageBackend
__all__ = ["StorageBackend", "LocalStorageBackend"]

View File

@ -0,0 +1,115 @@
"""Storage backend implementations."""
import os
import shutil
from abc import ABC, abstractmethod
from pathlib import Path
from ..config import ProcessorConfig
from ..exceptions import StorageError
class StorageBackend(ABC):
"""Abstract base class for storage backends."""
def __init__(self, config: ProcessorConfig) -> None:
self.config = config
@abstractmethod
def create_directory(self, path: Path) -> None:
"""Create a directory with proper permissions."""
@abstractmethod
def cleanup_directory(self, path: Path) -> None:
"""Remove a directory and all its contents."""
@abstractmethod
def store_file(self, source_path: Path, destination_path: Path) -> Path:
"""Store a file from source to destination."""
@abstractmethod
def file_exists(self, path: Path) -> bool:
"""Check if a file exists."""
@abstractmethod
def get_file_size(self, path: Path) -> int:
"""Get file size in bytes."""
class LocalStorageBackend(StorageBackend):
"""Local filesystem storage backend."""
def create_directory(self, path: Path) -> None:
"""Create a directory with proper permissions."""
try:
path.mkdir(parents=True, exist_ok=True)
# Set directory permissions
os.chmod(path, self.config.directory_permissions)
except OSError as e:
raise StorageError(f"Failed to create directory {path}: {e}") from e
def cleanup_directory(self, path: Path) -> None:
"""Remove a directory and all its contents."""
try:
if path.exists() and path.is_dir():
shutil.rmtree(path)
except OSError as e:
raise StorageError(f"Failed to cleanup directory {path}: {e}") from e
def store_file(self, source_path: Path, destination_path: Path) -> Path:
"""Store a file from source to destination."""
try:
# Create destination directory if it doesn't exist
destination_path.parent.mkdir(parents=True, exist_ok=True)
# Copy file
shutil.copy2(source_path, destination_path)
# Set file permissions
os.chmod(destination_path, self.config.file_permissions)
return destination_path
except OSError as e:
raise StorageError(
f"Failed to store file {source_path} to {destination_path}: {e}"
) from e
def file_exists(self, path: Path) -> bool:
"""Check if a file exists."""
return path.exists() and path.is_file()
def get_file_size(self, path: Path) -> int:
"""Get file size in bytes."""
try:
return path.stat().st_size
except OSError as e:
raise StorageError(f"Failed to get file size for {path}: {e}") from e
class S3StorageBackend(StorageBackend):
"""S3 storage backend (placeholder for future implementation)."""
def __init__(self, config: ProcessorConfig) -> None:
super().__init__(config)
raise NotImplementedError("S3 storage backend not implemented yet")
def create_directory(self, path: Path) -> None:
"""Create a directory (S3 doesn't have directories, but we can simulate)."""
raise NotImplementedError
def cleanup_directory(self, path: Path) -> None:
"""Remove all files with the path prefix."""
raise NotImplementedError
def store_file(self, source_path: Path, destination_path: Path) -> Path:
"""Upload file to S3."""
raise NotImplementedError
def file_exists(self, path: Path) -> bool:
"""Check if object exists in S3."""
raise NotImplementedError
def get_file_size(self, path: Path) -> int:
"""Get S3 object size."""
raise NotImplementedError

View File

@ -0,0 +1,15 @@
"""Background task processing modules."""
from .procrastinate_tasks import (
generate_sprites_async,
generate_thumbnail_async,
process_video_async,
setup_procrastinate,
)
__all__ = [
"setup_procrastinate",
"process_video_async",
"generate_thumbnail_async",
"generate_sprites_async",
]

View File

@ -0,0 +1,195 @@
"""Procrastinate background tasks for video processing."""
import logging
from pathlib import Path
from procrastinate import App
from ..config import ProcessorConfig
from ..core.processor import VideoProcessor
from ..exceptions import VideoProcessorError
logger = logging.getLogger(__name__)
# Create Procrastinate app instance
app = App(connector=None) # Connector will be set during setup
def setup_procrastinate(database_url: str) -> App:
"""
Set up Procrastinate with database connection.
Args:
database_url: PostgreSQL connection string
Returns:
Configured Procrastinate app
"""
from procrastinate import AiopgConnector
connector = AiopgConnector(conninfo=database_url)
app.connector = connector
return app
@app.task(queue="video_processing")
def process_video_async(
input_path: str,
output_dir: str | None = None,
video_id: str | None = None,
config_dict: dict | None = None,
) -> dict:
"""
Process video asynchronously.
Args:
input_path: Path to input video file
output_dir: Output directory (optional)
video_id: Unique video identifier (optional)
config_dict: Configuration dictionary
Returns:
Dictionary with processing results
"""
logger.info(f"Starting async video processing for {input_path}")
try:
# Create config from dict or use defaults
if config_dict:
config = ProcessorConfig(**config_dict)
else:
config = ProcessorConfig()
# Create processor and process video
processor = VideoProcessor(config)
result = processor.process_video(
input_path=Path(input_path),
output_dir=Path(output_dir) if output_dir else None,
video_id=video_id,
)
# Convert result to serializable dictionary
result_dict = {
"video_id": result.video_id,
"input_path": str(result.input_path),
"output_path": str(result.output_path),
"encoded_files": {
fmt: str(path) for fmt, path in result.encoded_files.items()
},
"thumbnails": [str(path) for path in result.thumbnails],
"sprite_file": str(result.sprite_file) if result.sprite_file else None,
"webvtt_file": str(result.webvtt_file) if result.webvtt_file else None,
"metadata": result.metadata,
}
logger.info(f"Completed async video processing for {input_path}")
return result_dict
except Exception as e:
logger.error(f"Async video processing failed for {input_path}: {e}")
raise VideoProcessorError(f"Async processing failed: {e}") from e
@app.task(queue="thumbnail_generation")
def generate_thumbnail_async(
video_path: str,
output_dir: str,
timestamp: int,
video_id: str,
config_dict: dict | None = None,
) -> str:
"""
Generate thumbnail asynchronously.
Args:
video_path: Path to video file
output_dir: Output directory
timestamp: Time in seconds to extract thumbnail
video_id: Unique video identifier
config_dict: Configuration dictionary
Returns:
Path to generated thumbnail
"""
logger.info(f"Starting async thumbnail generation for {video_path} at {timestamp}s")
try:
# Create config from dict or use defaults
if config_dict:
config = ProcessorConfig(**config_dict)
else:
config = ProcessorConfig()
# Create thumbnail generator
from ..core.thumbnails import ThumbnailGenerator
generator = ThumbnailGenerator(config)
# Generate thumbnail
thumbnail_path = generator.generate_thumbnail(
video_path=Path(video_path),
output_dir=Path(output_dir),
timestamp=timestamp,
video_id=video_id,
)
logger.info(f"Completed async thumbnail generation: {thumbnail_path}")
return str(thumbnail_path)
except Exception as e:
logger.error(f"Async thumbnail generation failed: {e}")
raise VideoProcessorError(f"Async thumbnail generation failed: {e}") from e
@app.task(queue="sprite_generation")
def generate_sprites_async(
video_path: str,
output_dir: str,
video_id: str,
config_dict: dict | None = None,
) -> dict[str, str]:
"""
Generate video sprites asynchronously.
Args:
video_path: Path to video file
output_dir: Output directory
video_id: Unique video identifier
config_dict: Configuration dictionary
Returns:
Dictionary with sprite and webvtt file paths
"""
logger.info(f"Starting async sprite generation for {video_path}")
try:
# Create config from dict or use defaults
if config_dict:
config = ProcessorConfig(**config_dict)
else:
config = ProcessorConfig()
# Create thumbnail generator
from ..core.thumbnails import ThumbnailGenerator
generator = ThumbnailGenerator(config)
# Generate sprites
sprite_file, webvtt_file = generator.generate_sprites(
video_path=Path(video_path),
output_dir=Path(output_dir),
video_id=video_id,
)
result = {
"sprite_file": str(sprite_file),
"webvtt_file": str(webvtt_file),
}
logger.info(f"Completed async sprite generation: {result}")
return result
except Exception as e:
logger.error(f"Async sprite generation failed: {e}")
raise VideoProcessorError(f"Async sprite generation failed: {e}") from e

View File

@ -0,0 +1,6 @@
"""Utility modules."""
from .ffmpeg import FFmpegUtils
from .paths import PathUtils
__all__ = ["FFmpegUtils", "PathUtils"]

View File

@ -0,0 +1,138 @@
"""FFmpeg utilities and helper functions."""
import subprocess
from pathlib import Path
from ..exceptions import FFmpegError
class FFmpegUtils:
"""Utility functions for FFmpeg operations."""
@staticmethod
def check_ffmpeg_available(ffmpeg_path: str = "/usr/bin/ffmpeg") -> bool:
"""
Check if FFmpeg is available and working.
Args:
ffmpeg_path: Path to FFmpeg binary
Returns:
True if FFmpeg is available, False otherwise
"""
try:
result = subprocess.run(
[ffmpeg_path, "-version"], capture_output=True, text=True, timeout=10
)
return result.returncode == 0
except (
subprocess.TimeoutExpired,
FileNotFoundError,
subprocess.SubprocessError,
):
return False
@staticmethod
def get_ffmpeg_version(ffmpeg_path: str = "/usr/bin/ffmpeg") -> str | None:
"""
Get FFmpeg version string.
Args:
ffmpeg_path: Path to FFmpeg binary
Returns:
Version string or None if not available
"""
try:
result = subprocess.run(
[ffmpeg_path, "-version"], capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
# Extract version from first line
first_line = result.stdout.split("\n")[0]
if "version" in first_line:
return first_line.split("version")[1].split()[0]
except (
subprocess.TimeoutExpired,
FileNotFoundError,
subprocess.SubprocessError,
):
pass
return None
@staticmethod
def validate_input_file(file_path: Path) -> None:
"""
Validate that input file exists and is readable by FFmpeg.
Args:
file_path: Path to input file
Raises:
FFmpegError: If file is invalid
"""
if not file_path.exists():
raise FFmpegError(f"Input file does not exist: {file_path}")
if not file_path.is_file():
raise FFmpegError(f"Input path is not a file: {file_path}")
# Try to probe the file to ensure it's a valid media file
try:
import ffmpeg
ffmpeg.probe(str(file_path))
except Exception as e:
raise FFmpegError(f"Input file is not a valid media file: {e}") from e
@staticmethod
def estimate_processing_time(
input_file: Path, output_formats: list[str], quality_preset: str = "medium"
) -> int:
"""
Estimate processing time in seconds based on input file and settings.
Args:
input_file: Path to input file
output_formats: List of output formats
quality_preset: Quality preset name
Returns:
Estimated processing time in seconds
"""
try:
import ffmpeg
probe = ffmpeg.probe(str(input_file))
duration = float(probe["format"].get("duration", 0))
# Base multiplier for encoding (very rough estimate)
format_multipliers = {
"mp4": 0.5, # Two-pass H.264
"webm": 0.8, # VP9 is slower
"ogv": 0.3, # Theora is faster
}
quality_multipliers = {
"low": 0.5,
"medium": 1.0,
"high": 1.5,
"ultra": 2.0,
}
total_multiplier = sum(
format_multipliers.get(fmt, 1.0) for fmt in output_formats
)
quality_multiplier = quality_multipliers.get(quality_preset, 1.0)
# Base estimate: video duration * encoding complexity
estimated_time = duration * total_multiplier * quality_multiplier
# Add buffer time for thumbnails, sprites, etc.
estimated_time += 30
return max(int(estimated_time), 60) # Minimum 1 minute
except Exception:
# Fallback estimate
return 300 # 5 minutes default

View File

@ -0,0 +1,173 @@
"""Path utilities and helper functions."""
import uuid
from pathlib import Path
class PathUtils:
"""Utility functions for path operations."""
@staticmethod
def generate_video_id() -> str:
"""
Generate a unique video ID.
Returns:
8-character unique identifier
"""
return str(uuid.uuid4())[:8]
@staticmethod
def sanitize_filename(filename: str) -> str:
"""
Sanitize filename for safe filesystem use.
Args:
filename: Original filename
Returns:
Sanitized filename
"""
# Remove or replace unsafe characters
unsafe_chars = '<>:"/\\|?*'
for char in unsafe_chars:
filename = filename.replace(char, "_")
# Remove leading/trailing spaces and dots
filename = filename.strip(" .")
# Ensure filename is not empty
if not filename:
filename = "untitled"
return filename
@staticmethod
def get_file_extension(file_path: Path) -> str:
"""
Get file extension in lowercase.
Args:
file_path: Path to file
Returns:
File extension without dot (e.g., 'mp4')
"""
return file_path.suffix.lower().lstrip(".")
@staticmethod
def change_extension(file_path: Path, new_extension: str) -> Path:
"""
Change file extension.
Args:
file_path: Original file path
new_extension: New extension (with or without dot)
Returns:
Path with new extension
"""
if not new_extension.startswith("."):
new_extension = "." + new_extension
return file_path.with_suffix(new_extension)
@staticmethod
def ensure_directory_exists(directory: Path) -> None:
"""
Ensure directory exists, create if necessary.
Args:
directory: Path to directory
"""
directory.mkdir(parents=True, exist_ok=True)
@staticmethod
def get_relative_path(file_path: Path, base_path: Path) -> Path:
"""
Get relative path from base path.
Args:
file_path: File path
base_path: Base path
Returns:
Relative path
"""
try:
return file_path.relative_to(base_path)
except ValueError:
# If paths are not relative, return the filename
return Path(file_path.name)
@staticmethod
def is_video_file(file_path: Path) -> bool:
"""
Check if file appears to be a video file based on extension.
Args:
file_path: Path to file
Returns:
True if appears to be a video file
"""
video_extensions = {
"mp4",
"avi",
"mkv",
"mov",
"wmv",
"flv",
"webm",
"ogv",
"m4v",
"3gp",
"mpg",
"mpeg",
"ts",
"mts",
"f4v",
"vob",
"asf",
}
extension = PathUtils.get_file_extension(file_path)
return extension in video_extensions
@staticmethod
def get_safe_output_path(
output_dir: Path, filename: str, extension: str, video_id: str | None = None
) -> Path:
"""
Get a safe output path, handling conflicts.
Args:
output_dir: Output directory
filename: Desired filename (without extension)
extension: File extension (with or without dot)
video_id: Optional video ID to include in filename
Returns:
Safe output path
"""
# Sanitize filename
safe_filename = PathUtils.sanitize_filename(filename)
# Add video ID if provided
if video_id:
safe_filename = f"{video_id}_{safe_filename}"
# Ensure extension format
if not extension.startswith("."):
extension = "." + extension
# Create initial path
output_path = output_dir / (safe_filename + extension)
# Handle conflicts by adding counter
counter = 1
while output_path.exists():
name_with_counter = f"{safe_filename}_{counter}{extension}"
output_path = output_dir / name_with_counter
counter += 1
return output_path

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Test suite for video processor."""

57
tests/test_config.py Normal file
View File

@ -0,0 +1,57 @@
"""Tests for configuration module."""
from pathlib import Path
import pytest
from pydantic import ValidationError
from video_processor.config import ProcessorConfig
def test_default_config():
"""Test default configuration values."""
config = ProcessorConfig()
assert config.storage_backend == "local"
assert config.output_formats == ["mp4"]
assert config.quality_preset == "medium"
assert config.thumbnail_timestamps == [1]
assert config.generate_sprites is True
def test_config_validation():
"""Test configuration validation."""
# Test empty output formats
with pytest.raises(ValidationError):
ProcessorConfig(output_formats=[])
# Test valid formats
config = ProcessorConfig(output_formats=["mp4", "webm", "ogv"])
assert len(config.output_formats) == 3
def test_base_path_resolution():
"""Test base path is resolved to absolute path."""
relative_path = Path("relative/path")
config = ProcessorConfig(base_path=relative_path)
assert config.base_path.is_absolute()
def test_custom_config():
"""Test custom configuration values."""
config = ProcessorConfig(
storage_backend="local",
base_path=Path("/custom/path"),
output_formats=["mp4", "webm"],
quality_preset="high",
thumbnail_timestamps=[1, 30, 60],
generate_sprites=False,
)
assert config.storage_backend == "local"
assert config.base_path == Path("/custom/path").resolve()
assert config.output_formats == ["mp4", "webm"]
assert config.quality_preset == "high"
assert config.thumbnail_timestamps == [1, 30, 60]
assert config.generate_sprites is False

87
tests/test_utils.py Normal file
View File

@ -0,0 +1,87 @@
"""Tests for utility modules."""
from pathlib import Path
from video_processor.utils.ffmpeg import FFmpegUtils
from video_processor.utils.paths import PathUtils
class TestPathUtils:
"""Tests for PathUtils."""
def test_generate_video_id(self):
"""Test video ID generation."""
video_id = PathUtils.generate_video_id()
assert len(video_id) == 8
assert video_id.isalnum() or "-" in video_id # UUID format
# Test uniqueness
video_id2 = PathUtils.generate_video_id()
assert video_id != video_id2
def test_sanitize_filename(self):
"""Test filename sanitization."""
assert PathUtils.sanitize_filename("normal_file.mp4") == "normal_file.mp4"
assert (
PathUtils.sanitize_filename("file<with>bad:chars") == "file_with_bad_chars"
)
assert PathUtils.sanitize_filename(" .file ") == "file"
assert PathUtils.sanitize_filename("") == "untitled"
def test_get_file_extension(self):
"""Test file extension extraction."""
assert PathUtils.get_file_extension(Path("file.mp4")) == "mp4"
assert PathUtils.get_file_extension(Path("file.MP4")) == "mp4"
assert PathUtils.get_file_extension(Path("file")) == ""
def test_change_extension(self):
"""Test extension changing."""
original = Path("/path/to/file.mov")
changed = PathUtils.change_extension(original, "mp4")
assert changed == Path("/path/to/file.mp4")
changed_with_dot = PathUtils.change_extension(original, ".webm")
assert changed_with_dot == Path("/path/to/file.webm")
def test_is_video_file(self):
"""Test video file detection."""
assert PathUtils.is_video_file(Path("movie.mp4")) is True
assert PathUtils.is_video_file(Path("movie.avi")) is True
assert PathUtils.is_video_file(Path("movie.txt")) is False
assert PathUtils.is_video_file(Path("image.jpg")) is False
def test_get_safe_output_path(self, tmp_path):
"""Test safe output path generation."""
# Test basic path
path = PathUtils.get_safe_output_path(tmp_path, "video", "mp4", "abc123")
assert path == tmp_path / "abc123_video.mp4"
# Test conflict resolution
(tmp_path / "abc123_video.mp4").touch()
path = PathUtils.get_safe_output_path(tmp_path, "video", "mp4", "abc123")
assert path == tmp_path / "abc123_video_1.mp4"
class TestFFmpegUtils:
"""Tests for FFmpegUtils."""
def test_check_ffmpeg_available(self):
"""Test FFmpeg availability check."""
# This test might fail in CI/CD without FFmpeg installed
result = FFmpegUtils.check_ffmpeg_available("/usr/bin/ffmpeg")
assert isinstance(result, bool)
# Test with invalid path
assert FFmpegUtils.check_ffmpeg_available("/invalid/path") is False
def test_estimate_processing_time(self, tmp_path):
"""Test processing time estimation."""
# Create a dummy file (this is just testing the calculation logic)
dummy_file = tmp_path / "dummy.mp4"
dummy_file.touch()
# Test with default parameters (will use fallback since file isn't valid)
time_estimate = FFmpegUtils.estimate_processing_time(
dummy_file, ["mp4"], "medium"
)
assert time_estimate >= 60 # Should return minimum 60 seconds