Implement comprehensive streaming & real-time processing capabilities

Phase 3 Implementation: Advanced Adaptive Streaming
• Built AdaptiveStreamProcessor that leverages existing VideoProcessor infrastructure
• AI-optimized bitrate ladder generation using content analysis with intelligent fallbacks
• Comprehensive HLS playlist generation with segmentation and live streaming support
• Complete DASH manifest generation with XML structure and live streaming capabilities
• Integrated seamlessly with Phase 1 (AI analysis) and Phase 2 (advanced codecs)
• Created 15 comprehensive tests covering all streaming functionality - all passing
• Built demonstration script showcasing adaptive streaming, custom bitrate ladders, and deployment

Key Features:
- Multi-bitrate adaptive streaming with HLS & DASH support
- AI-powered content analysis for optimized bitrate selection
- Live streaming capabilities with RTMP input support
- CDN-ready streaming packages with proper manifest generation
- Thumbnail track generation for video scrubbing
- Hardware acceleration support and codec-specific optimizations
- Production deployment considerations and integration guidance

Technical Architecture:
- BitrateLevel dataclass for streaming configuration
- StreamingPackage for complete adaptive stream management
- HLSGenerator & DASHGenerator for format-specific manifest creation
- Async/concurrent processing for optimal performance
- Graceful degradation when AI dependencies unavailable

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ryan Malloy 2025-09-06 05:43:00 -06:00
parent 770fc74c13
commit 91139264fd
6 changed files with 1581 additions and 0 deletions

318
examples/streaming_demo.py Normal file
View File

@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""
Streaming & Real-Time Processing Demonstration
Showcases adaptive streaming capabilities (HLS, DASH) built on the existing
comprehensive video processing infrastructure with AI optimization.
"""
import asyncio
import logging
from pathlib import Path
from video_processor import ProcessorConfig
from video_processor.streaming import AdaptiveStreamProcessor, BitrateLevel
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def demonstrate_adaptive_streaming(video_path: Path, output_dir: Path):
"""Demonstrate adaptive streaming creation."""
logger.info("=== Adaptive Streaming Demonstration ===")
# Configure for streaming with multiple formats and AI optimization
config = ProcessorConfig(
base_path=output_dir,
output_formats=["mp4", "hevc", "av1_mp4"], # Multiple codec options
quality_preset="high",
enable_av1_encoding=True,
enable_hevc_encoding=True,
generate_sprites=True,
sprite_interval=5, # More frequent for streaming
)
# Create adaptive stream processor with AI optimization
processor = AdaptiveStreamProcessor(config, enable_ai_optimization=True)
print(f"\n🔍 Streaming Capabilities:")
capabilities = processor.get_streaming_capabilities()
for capability, available in capabilities.items():
status = "✅ Available" if available else "❌ Not Available"
print(f" {capability.replace('_', ' ').title()}: {status}")
print(f"\n🎯 Creating Adaptive Streaming Package...")
print(f" Source: {video_path}")
print(f" Output: {output_dir}")
try:
# Create adaptive streaming package
streaming_package = await processor.create_adaptive_stream(
video_path=video_path,
output_dir=output_dir,
video_id="demo_stream",
streaming_formats=["hls", "dash"],
)
print(f"\n🎉 Streaming Package Created Successfully!")
print(f" Video ID: {streaming_package.video_id}")
print(f" Output Directory: {streaming_package.output_dir}")
print(f" Segment Duration: {streaming_package.segment_duration}s")
# Display bitrate ladder information
print(f"\n📊 Bitrate Ladder ({len(streaming_package.bitrate_levels)} levels):")
for level in streaming_package.bitrate_levels:
print(f" {level.name:<6} | {level.width}x{level.height:<4} | {level.bitrate:>4}k | {level.codec.upper()}")
# Display generated files
print(f"\n📁 Generated Files:")
if streaming_package.hls_playlist:
print(f" HLS Playlist: {streaming_package.hls_playlist}")
if streaming_package.dash_manifest:
print(f" DASH Manifest: {streaming_package.dash_manifest}")
if streaming_package.thumbnail_track:
print(f" Thumbnail Track: {streaming_package.thumbnail_track}")
return streaming_package
except Exception as e:
logger.error(f"Adaptive streaming failed: {e}")
raise
async def demonstrate_custom_bitrate_ladder(video_path: Path, output_dir: Path):
"""Demonstrate custom bitrate ladder configuration."""
logger.info("=== Custom Bitrate Ladder Demonstration ===")
# Define custom bitrate ladder optimized for mobile streaming
mobile_ladder = [
BitrateLevel("240p", 426, 240, 300, 450, "h264", "mp4"), # Very low bandwidth
BitrateLevel("360p", 640, 360, 600, 900, "h264", "mp4"), # Low bandwidth
BitrateLevel("480p", 854, 480, 1200, 1800, "hevc", "mp4"), # Medium with HEVC
BitrateLevel("720p", 1280, 720, 2400, 3600, "av1", "mp4"), # High with AV1
]
print(f"\n📱 Mobile-Optimized Bitrate Ladder:")
print(f"{'Level':<6} | {'Resolution':<10} | {'Bitrate':<8} | {'Codec'}")
print("-" * 45)
for level in mobile_ladder:
print(f"{level.name:<6} | {level.width}x{level.height:<6} | {level.bitrate:>4}k | {level.codec.upper()}")
config = ProcessorConfig(
base_path=output_dir / "mobile",
quality_preset="medium",
)
processor = AdaptiveStreamProcessor(config)
try:
# Create streaming package with custom ladder
streaming_package = await processor.create_adaptive_stream(
video_path=video_path,
output_dir=output_dir / "mobile",
video_id="mobile_stream",
streaming_formats=["hls"], # HLS for mobile
custom_bitrate_ladder=mobile_ladder,
)
print(f"\n🎉 Mobile Streaming Package Created!")
print(f" HLS Playlist: {streaming_package.hls_playlist}")
print(f" Optimized for: Mobile devices and low bandwidth")
return streaming_package
except Exception as e:
logger.error(f"Mobile streaming failed: {e}")
raise
async def demonstrate_ai_optimized_streaming(video_path: Path, output_dir: Path):
"""Demonstrate AI-optimized adaptive streaming."""
logger.info("=== AI-Optimized Streaming Demonstration ===")
config = ProcessorConfig(
base_path=output_dir / "ai_optimized",
quality_preset="high",
enable_av1_encoding=True,
enable_hevc_encoding=True,
)
# Enable AI optimization
processor = AdaptiveStreamProcessor(config, enable_ai_optimization=True)
if not processor.enable_ai_optimization:
print(" ⚠️ AI optimization not available (missing dependencies)")
print(" Using intelligent defaults based on video characteristics")
print(f"\n🧠 AI-Enhanced Streaming Features:")
print(f" ✅ Content-aware bitrate ladder generation")
print(f" ✅ Motion-adaptive bitrate adjustment")
print(f" ✅ Resolution-aware quality optimization")
print(f" ✅ Codec selection based on content analysis")
try:
# Let AI analyze and optimize the streaming package
streaming_package = await processor.create_adaptive_stream(
video_path=video_path,
output_dir=output_dir / "ai_optimized",
video_id="ai_stream",
)
print(f"\n🎯 AI Optimization Results:")
print(f" Generated {len(streaming_package.bitrate_levels)} bitrate levels")
print(f" Streaming formats: HLS + DASH")
# Show how AI influenced the bitrate ladder
total_bitrate = sum(level.bitrate for level in streaming_package.bitrate_levels)
avg_bitrate = total_bitrate / len(streaming_package.bitrate_levels)
print(f" Average bitrate: {avg_bitrate:.0f}k (optimized for content)")
# Show codec distribution
codec_count = {}
for level in streaming_package.bitrate_levels:
codec_count[level.codec] = codec_count.get(level.codec, 0) + 1
print(f" Codec distribution:")
for codec, count in codec_count.items():
print(f" {codec.upper()}: {count} level(s)")
return streaming_package
except Exception as e:
logger.error(f"AI-optimized streaming failed: {e}")
raise
def demonstrate_streaming_deployment(streaming_packages: list):
"""Demonstrate streaming deployment considerations."""
logger.info("=== Streaming Deployment Guide ===")
print(f"\n🚀 Production Deployment Considerations:")
print(f"\n📦 CDN Distribution:")
print(f" • Upload generated HLS/DASH files to CDN")
print(f" • Configure proper MIME types:")
print(f" - .m3u8 files: application/vnd.apple.mpegurl")
print(f" - .mpd files: application/dash+xml")
print(f" - .ts/.m4s segments: video/mp2t, video/mp4")
print(f"\n🌐 Web Player Integration:")
print(f" • HLS: Use hls.js for browser support")
print(f" • DASH: Use dash.js or shaka-player")
print(f" • Native support: Safari (HLS), Chrome/Edge (DASH)")
print(f"\n📊 Analytics & Monitoring:")
print(f" • Track bitrate switching events")
print(f" • Monitor buffer health and stall events")
print(f" • Measure startup time and seeking performance")
print(f"\n💾 Storage Optimization:")
total_files = 0
total_size_estimate = 0
for i, package in enumerate(streaming_packages, 1):
files_count = len(package.bitrate_levels) * 2 # HLS + DASH per level
total_files += files_count
# Rough size estimate (segments + manifests)
size_estimate = files_count * 50 # ~50KB per segment average
total_size_estimate += size_estimate
print(f" Package {i}: ~{files_count} files, ~{size_estimate}KB")
print(f" Total: ~{total_files} files, ~{total_size_estimate}KB")
print(f"\n🔒 Security Considerations:")
print(f" • DRM integration for premium content")
print(f" • Token-based authentication for private streams")
print(f" • HTTPS delivery for all manifest and segment files")
async def main():
"""Main demonstration function."""
video_path = Path("tests/fixtures/videos/big_buck_bunny_720p_1mb.mp4")
output_dir = Path("/tmp/streaming_demo")
# Create output directory
output_dir.mkdir(exist_ok=True)
print("🎬 Streaming & Real-Time Processing Demonstration")
print("=" * 55)
if not video_path.exists():
print(f"⚠️ Test video not found: {video_path}")
print(" Please provide a video file path as argument:")
print(" python examples/streaming_demo.py /path/to/your/video.mp4")
return
streaming_packages = []
try:
# 1. Standard adaptive streaming
package1 = await demonstrate_adaptive_streaming(video_path, output_dir)
streaming_packages.append(package1)
print("\n" + "="*55)
# 2. Custom bitrate ladder
package2 = await demonstrate_custom_bitrate_ladder(video_path, output_dir)
streaming_packages.append(package2)
print("\n" + "="*55)
# 3. AI-optimized streaming
package3 = await demonstrate_ai_optimized_streaming(video_path, output_dir)
streaming_packages.append(package3)
print("\n" + "="*55)
# 4. Deployment guide
demonstrate_streaming_deployment(streaming_packages)
print(f"\n🎉 Streaming demonstration complete!")
print(f" Generated {len(streaming_packages)} streaming packages")
print(f" Output directory: {output_dir}")
print(f" Ready for CDN deployment and web player integration!")
except Exception as e:
logger.error(f"Streaming demonstration failed: {e}")
raise
if __name__ == "__main__":
import sys
# Allow custom video path
if len(sys.argv) > 1:
custom_video_path = Path(sys.argv[1])
if custom_video_path.exists():
# Override main function with custom path
async def custom_main():
output_dir = Path("/tmp/streaming_demo")
output_dir.mkdir(exist_ok=True)
print("🎬 Streaming & Real-Time Processing Demonstration")
print("=" * 55)
print(f"Using custom video: {custom_video_path}")
streaming_packages = []
package1 = await demonstrate_adaptive_streaming(custom_video_path, output_dir)
streaming_packages.append(package1)
package2 = await demonstrate_custom_bitrate_ladder(custom_video_path, output_dir)
streaming_packages.append(package2)
package3 = await demonstrate_ai_optimized_streaming(custom_video_path, output_dir)
streaming_packages.append(package3)
demonstrate_streaming_deployment(streaming_packages)
print(f"\n🎉 Streaming demonstration complete!")
print(f" Output directory: {output_dir}")
asyncio.run(custom_main())
else:
print(f"❌ Video file not found: {custom_video_path}")
else:
asyncio.run(main())

View File

@ -0,0 +1,12 @@
"""Streaming and real-time video processing modules."""
from .adaptive import AdaptiveStreamProcessor, StreamingPackage
from .hls import HLSGenerator
from .dash import DASHGenerator
__all__ = [
"AdaptiveStreamProcessor",
"StreamingPackage",
"HLSGenerator",
"DASHGenerator",
]

View File

@ -0,0 +1,357 @@
"""Adaptive streaming processor that builds on existing encoding infrastructure."""
import asyncio
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Literal
from ..config import ProcessorConfig
from ..core.processor import VideoProcessor
from ..exceptions import EncodingError, VideoProcessorError
# Optional AI integration
try:
from ..ai.content_analyzer import VideoContentAnalyzer
HAS_AI_SUPPORT = True
except ImportError:
HAS_AI_SUPPORT = False
logger = logging.getLogger(__name__)
@dataclass
class BitrateLevel:
"""Represents a single bitrate level in adaptive streaming."""
name: str
width: int
height: int
bitrate: int # kbps
max_bitrate: int # kbps
codec: str
container: str
@dataclass
class StreamingPackage:
"""Complete adaptive streaming package."""
video_id: str
source_path: Path
output_dir: Path
hls_playlist: Optional[Path] = None
dash_manifest: Optional[Path] = None
bitrate_levels: List[BitrateLevel] = None
segment_duration: int = 6 # seconds
thumbnail_track: Optional[Path] = None
metadata: Optional[Dict] = None
class AdaptiveStreamProcessor:
"""
Adaptive streaming processor that leverages existing video processing infrastructure.
Creates HLS and DASH streams with multiple bitrate levels optimized using AI analysis.
"""
def __init__(self, config: ProcessorConfig, enable_ai_optimization: bool = True) -> None:
self.config = config
self.enable_ai_optimization = enable_ai_optimization and HAS_AI_SUPPORT
if self.enable_ai_optimization:
self.content_analyzer = VideoContentAnalyzer()
else:
self.content_analyzer = None
logger.info(f"Adaptive streaming initialized with AI optimization: {self.enable_ai_optimization}")
async def create_adaptive_stream(
self,
video_path: Path,
output_dir: Path,
video_id: Optional[str] = None,
streaming_formats: List[Literal["hls", "dash"]] = None,
custom_bitrate_ladder: Optional[List[BitrateLevel]] = None,
) -> StreamingPackage:
"""
Create adaptive streaming package from source video.
Args:
video_path: Source video file
output_dir: Output directory for streaming files
video_id: Optional video identifier
streaming_formats: List of streaming formats to generate
custom_bitrate_ladder: Custom bitrate levels (uses optimized defaults if None)
Returns:
Complete streaming package with manifests and segments
"""
if video_id is None:
video_id = video_path.stem
if streaming_formats is None:
streaming_formats = ["hls", "dash"]
logger.info(f"Creating adaptive stream for {video_path} -> {output_dir}")
# Step 1: Analyze source video for optimal bitrate ladder
bitrate_levels = custom_bitrate_ladder
if bitrate_levels is None:
bitrate_levels = await self._generate_optimal_bitrate_ladder(video_path)
# Step 2: Create output directory structure
stream_dir = output_dir / video_id
stream_dir.mkdir(parents=True, exist_ok=True)
# Step 3: Generate multiple bitrate renditions
rendition_files = await self._generate_bitrate_renditions(
video_path, stream_dir, video_id, bitrate_levels
)
# Step 4: Generate streaming manifests
streaming_package = StreamingPackage(
video_id=video_id,
source_path=video_path,
output_dir=stream_dir,
bitrate_levels=bitrate_levels,
)
if "hls" in streaming_formats:
streaming_package.hls_playlist = await self._generate_hls_playlist(
stream_dir, video_id, bitrate_levels, rendition_files
)
if "dash" in streaming_formats:
streaming_package.dash_manifest = await self._generate_dash_manifest(
stream_dir, video_id, bitrate_levels, rendition_files
)
# Step 5: Generate thumbnail track for scrubbing
streaming_package.thumbnail_track = await self._generate_thumbnail_track(
video_path, stream_dir, video_id
)
logger.info(f"Adaptive streaming package created successfully")
return streaming_package
async def _generate_optimal_bitrate_ladder(self, video_path: Path) -> List[BitrateLevel]:
"""
Generate optimal bitrate ladder using AI analysis or intelligent defaults.
"""
logger.info("Generating optimal bitrate ladder")
# Get source video characteristics
source_analysis = None
if self.enable_ai_optimization and self.content_analyzer:
try:
source_analysis = await self.content_analyzer.analyze_content(video_path)
logger.info(f"AI analysis: {source_analysis.resolution}, motion: {source_analysis.motion_intensity:.2f}")
except Exception as e:
logger.warning(f"AI analysis failed, using defaults: {e}")
# Base bitrate ladder
base_levels = [
BitrateLevel("240p", 426, 240, 400, 600, "h264", "mp4"),
BitrateLevel("360p", 640, 360, 800, 1200, "h264", "mp4"),
BitrateLevel("480p", 854, 480, 1500, 2250, "h264", "mp4"),
BitrateLevel("720p", 1280, 720, 3000, 4500, "h264", "mp4"),
BitrateLevel("1080p", 1920, 1080, 6000, 9000, "h264", "mp4"),
]
# Optimize ladder based on source characteristics
optimized_levels = []
if source_analysis:
source_width, source_height = source_analysis.resolution
motion_multiplier = 1.0 + (source_analysis.motion_intensity * 0.5) # Up to 1.5x for high motion
for level in base_levels:
# Skip levels higher than source resolution
if level.width > source_width or level.height > source_height:
continue
# Adjust bitrates based on motion content
adjusted_bitrate = int(level.bitrate * motion_multiplier)
adjusted_max_bitrate = int(level.max_bitrate * motion_multiplier)
# Use advanced codecs for higher quality levels if available
codec = level.codec
if level.height >= 720 and self.config.enable_hevc_encoding:
codec = "hevc"
elif level.height >= 1080 and self.config.enable_av1_encoding:
codec = "av1"
optimized_level = BitrateLevel(
name=level.name,
width=level.width,
height=level.height,
bitrate=adjusted_bitrate,
max_bitrate=adjusted_max_bitrate,
codec=codec,
container=level.container,
)
optimized_levels.append(optimized_level)
else:
# Use base levels without optimization
optimized_levels = base_levels
# Ensure we have at least one level
if not optimized_levels:
optimized_levels = [base_levels[2]] # Default to 480p
logger.info(f"Generated {len(optimized_levels)} bitrate levels")
return optimized_levels
async def _generate_bitrate_renditions(
self,
source_path: Path,
output_dir: Path,
video_id: str,
bitrate_levels: List[BitrateLevel],
) -> Dict[str, Path]:
"""
Generate multiple bitrate renditions using existing VideoProcessor infrastructure.
"""
logger.info(f"Generating {len(bitrate_levels)} bitrate renditions")
rendition_files = {}
for level in bitrate_levels:
rendition_name = f"{video_id}_{level.name}"
rendition_dir = output_dir / level.name
rendition_dir.mkdir(exist_ok=True)
# Create specialized config for this bitrate level
rendition_config = ProcessorConfig(
base_path=rendition_dir,
output_formats=[self._get_output_format(level.codec)],
quality_preset=self._get_quality_preset_for_bitrate(level.bitrate),
custom_ffmpeg_options=self._get_ffmpeg_options_for_level(level),
)
# Process video at this bitrate level
try:
processor = VideoProcessor(rendition_config)
result = await asyncio.to_thread(
processor.process_video, source_path, rendition_name
)
# Get the generated file
format_name = self._get_output_format(level.codec)
if format_name in result.encoded_files:
rendition_files[level.name] = result.encoded_files[format_name]
logger.info(f"Generated {level.name} rendition: {result.encoded_files[format_name]}")
else:
logger.error(f"Failed to generate {level.name} rendition")
except Exception as e:
logger.error(f"Error generating {level.name} rendition: {e}")
raise EncodingError(f"Failed to generate {level.name} rendition: {e}")
return rendition_files
def _get_output_format(self, codec: str) -> str:
"""Map codec to output format."""
codec_map = {
"h264": "mp4",
"hevc": "hevc",
"av1": "av1_mp4",
}
return codec_map.get(codec, "mp4")
def _get_quality_preset_for_bitrate(self, bitrate: int) -> str:
"""Select quality preset based on target bitrate."""
if bitrate < 1000:
return "low"
elif bitrate < 3000:
return "medium"
elif bitrate < 8000:
return "high"
else:
return "ultra"
def _get_ffmpeg_options_for_level(self, level: BitrateLevel) -> Dict[str, str]:
"""Generate FFmpeg options for specific bitrate level."""
return {
"b:v": f"{level.bitrate}k",
"maxrate": f"{level.max_bitrate}k",
"bufsize": f"{level.max_bitrate * 2}k",
"s": f"{level.width}x{level.height}",
}
async def _generate_hls_playlist(
self,
output_dir: Path,
video_id: str,
bitrate_levels: List[BitrateLevel],
rendition_files: Dict[str, Path],
) -> Path:
"""Generate HLS master playlist and segment individual renditions."""
from .hls import HLSGenerator
hls_generator = HLSGenerator()
playlist_path = await hls_generator.create_master_playlist(
output_dir, video_id, bitrate_levels, rendition_files
)
logger.info(f"HLS playlist generated: {playlist_path}")
return playlist_path
async def _generate_dash_manifest(
self,
output_dir: Path,
video_id: str,
bitrate_levels: List[BitrateLevel],
rendition_files: Dict[str, Path],
) -> Path:
"""Generate DASH MPD manifest."""
from .dash import DASHGenerator
dash_generator = DASHGenerator()
manifest_path = await dash_generator.create_manifest(
output_dir, video_id, bitrate_levels, rendition_files
)
logger.info(f"DASH manifest generated: {manifest_path}")
return manifest_path
async def _generate_thumbnail_track(
self,
source_path: Path,
output_dir: Path,
video_id: str,
) -> Path:
"""Generate thumbnail track for video scrubbing using existing infrastructure."""
try:
# Use existing thumbnail generation with optimized settings
thumbnail_config = ProcessorConfig(
base_path=output_dir,
thumbnail_timestamps=list(range(0, 300, 10)), # Every 10 seconds up to 5 minutes
generate_sprites=True,
sprite_interval=5, # More frequent for streaming
)
processor = VideoProcessor(thumbnail_config)
result = await asyncio.to_thread(
processor.process_video, source_path, f"{video_id}_thumbnails"
)
if result.sprite_file:
logger.info(f"Thumbnail track generated: {result.sprite_file}")
return result.sprite_file
else:
logger.warning("No thumbnail track generated")
return None
except Exception as e:
logger.error(f"Thumbnail track generation failed: {e}")
return None
def get_streaming_capabilities(self) -> Dict[str, bool]:
"""Get information about available streaming capabilities."""
return {
"hls_streaming": True,
"dash_streaming": True,
"ai_optimization": self.enable_ai_optimization,
"advanced_codecs": self.config.enable_av1_encoding or self.config.enable_hevc_encoding,
"thumbnail_tracks": True,
"multi_bitrate": True,
}

View File

@ -0,0 +1,333 @@
"""DASH (Dynamic Adaptive Streaming over HTTP) manifest generation."""
import asyncio
import logging
import subprocess
from pathlib import Path
from typing import Dict, List
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from .adaptive import BitrateLevel
from ..exceptions import FFmpegError
logger = logging.getLogger(__name__)
class DASHGenerator:
"""Generates DASH MPD manifests and segments from video renditions."""
def __init__(self, segment_duration: int = 4) -> None:
self.segment_duration = segment_duration
async def create_manifest(
self,
output_dir: Path,
video_id: str,
bitrate_levels: List[BitrateLevel],
rendition_files: Dict[str, Path],
) -> Path:
"""
Create DASH MPD manifest and segment all renditions.
Args:
output_dir: Output directory
video_id: Video identifier
bitrate_levels: List of bitrate levels
rendition_files: Dictionary of rendition name to file path
Returns:
Path to MPD manifest file
"""
logger.info(f"Creating DASH manifest for {video_id}")
# Create DASH directory
dash_dir = output_dir / "dash"
dash_dir.mkdir(exist_ok=True)
# Generate DASH segments for each rendition
adaptation_sets = []
for level in bitrate_levels:
if level.name in rendition_files:
segments_info = await self._create_dash_segments(
dash_dir, level, rendition_files[level.name]
)
adaptation_sets.append((level, segments_info))
# Create MPD manifest
manifest_path = dash_dir / f"{video_id}.mpd"
await self._create_mpd_manifest(
manifest_path, video_id, adaptation_sets
)
logger.info(f"DASH manifest created: {manifest_path}")
return manifest_path
async def _create_dash_segments(
self, dash_dir: Path, level: BitrateLevel, video_file: Path
) -> Dict:
"""Create DASH segments for a single bitrate level."""
rendition_dir = dash_dir / level.name
rendition_dir.mkdir(exist_ok=True)
# DASH segment pattern
init_segment = rendition_dir / f"{level.name}_init.mp4"
segment_pattern = rendition_dir / f"{level.name}_$Number$.m4s"
# Use FFmpeg to create DASH segments
cmd = [
"ffmpeg", "-y",
"-i", str(video_file),
"-c", "copy", # Copy without re-encoding
"-f", "dash",
"-seg_duration", str(self.segment_duration),
"-init_seg_name", str(init_segment.name),
"-media_seg_name", f"{level.name}_$Number$.m4s",
"-single_file", "0", # Create separate segment files
str(rendition_dir / f"{level.name}.mpd"),
]
try:
result = await asyncio.to_thread(
subprocess.run, cmd, capture_output=True, text=True, check=True
)
# Get duration and segment count from the created files
segments_info = await self._analyze_dash_segments(rendition_dir, level.name)
logger.info(f"DASH segments created for {level.name}")
return segments_info
except subprocess.CalledProcessError as e:
error_msg = f"DASH segmentation failed for {level.name}: {e.stderr}"
logger.error(error_msg)
raise FFmpegError(error_msg)
async def _analyze_dash_segments(self, rendition_dir: Path, rendition_name: str) -> Dict:
"""Analyze created DASH segments to get metadata."""
# Count segment files
segment_files = list(rendition_dir.glob(f"{rendition_name}_*.m4s"))
segment_count = len(segment_files)
# Get duration from FFprobe
try:
# Find the first video file in the directory (should be the source)
video_files = list(rendition_dir.glob("*.mp4"))
if video_files:
duration = await self._get_video_duration(video_files[0])
else:
duration = segment_count * self.segment_duration # Estimate
except Exception as e:
logger.warning(f"Could not get exact duration: {e}")
duration = segment_count * self.segment_duration
return {
"segment_count": segment_count,
"duration": duration,
"init_segment": f"{rendition_name}_init.mp4",
"media_template": f"{rendition_name}_$Number$.m4s",
}
async def _get_video_duration(self, video_path: Path) -> float:
"""Get video duration using ffprobe."""
cmd = [
"ffprobe", "-v", "quiet",
"-show_entries", "format=duration",
"-of", "csv=p=0",
str(video_path)
]
result = await asyncio.to_thread(
subprocess.run, cmd, capture_output=True, text=True, check=True
)
return float(result.stdout.strip())
async def _create_mpd_manifest(
self, manifest_path: Path, video_id: str, adaptation_sets: List[tuple]
) -> None:
"""Create DASH MPD manifest XML."""
# Calculate total duration (use first adaptation set)
if adaptation_sets:
total_duration = adaptation_sets[0][1]["duration"]
else:
total_duration = 0
# Create MPD root element
mpd = ET.Element("MPD")
mpd.set("xmlns", "urn:mpeg:dash:schema:mpd:2011")
mpd.set("type", "static")
mpd.set("mediaPresentationDuration", self._format_duration(total_duration))
mpd.set("profiles", "urn:mpeg:dash:profile:isoff-on-demand:2011")
mpd.set("minBufferTime", f"PT{self.segment_duration}S")
# Add publishing time
now = datetime.now(timezone.utc)
mpd.set("publishTime", now.isoformat().replace("+00:00", "Z"))
# Create Period element
period = ET.SubElement(mpd, "Period")
period.set("id", "0")
period.set("duration", self._format_duration(total_duration))
# Group by codec for adaptation sets
codec_groups = {}
for level, segments_info in adaptation_sets:
if level.codec not in codec_groups:
codec_groups[level.codec] = []
codec_groups[level.codec].append((level, segments_info))
# Create adaptation sets
adaptation_set_id = 0
for codec, levels in codec_groups.items():
adaptation_set = ET.SubElement(period, "AdaptationSet")
adaptation_set.set("id", str(adaptation_set_id))
adaptation_set.set("contentType", "video")
adaptation_set.set("mimeType", "video/mp4")
adaptation_set.set("codecs", self._get_dash_codec_string(codec))
adaptation_set.set("startWithSAP", "1")
adaptation_set.set("segmentAlignment", "true")
# Add representations for each bitrate level
representation_id = 0
for level, segments_info in levels:
representation = ET.SubElement(adaptation_set, "Representation")
representation.set("id", f"{adaptation_set_id}_{representation_id}")
representation.set("bandwidth", str(level.bitrate * 1000))
representation.set("width", str(level.width))
representation.set("height", str(level.height))
representation.set("frameRate", "25") # Default frame rate
# Add segment template
segment_template = ET.SubElement(representation, "SegmentTemplate")
segment_template.set("timescale", "1000")
segment_template.set("duration", str(self.segment_duration * 1000))
segment_template.set("initialization", f"{level.name}/{segments_info['init_segment']}")
segment_template.set("media", f"{level.name}/{segments_info['media_template']}")
segment_template.set("startNumber", "1")
representation_id += 1
adaptation_set_id += 1
# Write XML to file
tree = ET.ElementTree(mpd)
ET.indent(tree, space=" ", level=0) # Pretty print
await asyncio.to_thread(
tree.write,
manifest_path,
encoding="utf-8",
xml_declaration=True
)
logger.info(f"MPD manifest written with {len(adaptation_sets)} representations")
def _format_duration(self, seconds: float) -> str:
"""Format duration in ISO 8601 format for DASH."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"PT{hours}H{minutes}M{secs:.3f}S"
def _get_dash_codec_string(self, codec: str) -> str:
"""Get DASH codec string for manifest."""
codec_strings = {
"h264": "avc1.42E01E",
"hevc": "hev1.1.6.L93.B0",
"av1": "av01.0.05M.08",
}
return codec_strings.get(codec, "avc1.42E01E")
class DASHLiveGenerator:
"""Generates live DASH streams."""
def __init__(self, segment_duration: int = 4, time_shift_buffer: int = 300) -> None:
self.segment_duration = segment_duration
self.time_shift_buffer = time_shift_buffer # DVR window in seconds
async def start_live_stream(
self,
input_source: str,
output_dir: Path,
stream_name: str,
bitrate_levels: List[BitrateLevel],
) -> None:
"""
Start live DASH streaming.
Args:
input_source: Input source (RTMP, file, device)
output_dir: Output directory
stream_name: Name of the stream
bitrate_levels: Bitrate levels for ABR streaming
"""
logger.info(f"Starting live DASH stream: {stream_name}")
# Create output directory
dash_dir = output_dir / "dash_live" / stream_name
dash_dir.mkdir(parents=True, exist_ok=True)
# Use FFmpeg to generate live DASH stream with multiple bitrates
cmd = [
"ffmpeg", "-y",
"-i", input_source,
"-f", "dash",
"-seg_duration", str(self.segment_duration),
"-window_size", str(self.time_shift_buffer // self.segment_duration),
"-extra_window_size", "5",
"-remove_at_exit", "1",
]
# Add video streams for each bitrate level
for i, level in enumerate(bitrate_levels):
cmd.extend([
"-map", "0:v:0",
f"-c:v:{i}", self._get_encoder_for_codec(level.codec),
f"-b:v:{i}", f"{level.bitrate}k",
f"-maxrate:v:{i}", f"{level.max_bitrate}k",
f"-s:v:{i}", f"{level.width}x{level.height}",
])
# Add audio stream
cmd.extend([
"-map", "0:a:0",
"-c:a", "aac",
"-b:a", "128k",
])
# Output
manifest_path = dash_dir / f"{stream_name}.mpd"
cmd.append(str(manifest_path))
logger.info(f"Starting live DASH encoding")
try:
# Start FFmpeg process
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Monitor process
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_msg = f"Live DASH streaming failed: {stderr.decode()}"
logger.error(error_msg)
raise FFmpegError(error_msg)
except Exception as e:
logger.error(f"Live DASH stream error: {e}")
raise
def _get_encoder_for_codec(self, codec: str) -> str:
"""Get FFmpeg encoder for codec."""
encoders = {
"h264": "libx264",
"hevc": "libx265",
"av1": "libaom-av1",
}
return encoders.get(codec, "libx264")

View File

@ -0,0 +1,262 @@
"""HLS (HTTP Live Streaming) manifest generation and segmentation."""
import asyncio
import logging
import subprocess
from pathlib import Path
from typing import Dict, List
from .adaptive import BitrateLevel
from ..config import ProcessorConfig
from ..exceptions import EncodingError, FFmpegError
logger = logging.getLogger(__name__)
class HLSGenerator:
"""Generates HLS playlists and segments from video renditions."""
def __init__(self, segment_duration: int = 6) -> None:
self.segment_duration = segment_duration
async def create_master_playlist(
self,
output_dir: Path,
video_id: str,
bitrate_levels: List[BitrateLevel],
rendition_files: Dict[str, Path],
) -> Path:
"""
Create HLS master playlist and segment all renditions.
Args:
output_dir: Output directory
video_id: Video identifier
bitrate_levels: List of bitrate levels
rendition_files: Dictionary of rendition name to file path
Returns:
Path to master playlist file
"""
logger.info(f"Creating HLS master playlist for {video_id}")
# Create HLS directory
hls_dir = output_dir / "hls"
hls_dir.mkdir(exist_ok=True)
# Generate segments for each rendition
playlist_info = []
for level in bitrate_levels:
if level.name in rendition_files:
playlist_path = await self._create_rendition_playlist(
hls_dir, level, rendition_files[level.name]
)
playlist_info.append((level, playlist_path))
# Create master playlist
master_playlist_path = hls_dir / f"{video_id}.m3u8"
await self._write_master_playlist(master_playlist_path, playlist_info)
logger.info(f"HLS master playlist created: {master_playlist_path}")
return master_playlist_path
async def _create_rendition_playlist(
self, hls_dir: Path, level: BitrateLevel, video_file: Path
) -> Path:
"""Create individual rendition playlist with segments."""
rendition_dir = hls_dir / level.name
rendition_dir.mkdir(exist_ok=True)
playlist_path = rendition_dir / f"{level.name}.m3u8"
segment_pattern = rendition_dir / f"{level.name}_%03d.ts"
# Use FFmpeg to create HLS segments
cmd = [
"ffmpeg", "-y",
"-i", str(video_file),
"-c", "copy", # Copy without re-encoding
"-hls_time", str(self.segment_duration),
"-hls_playlist_type", "vod",
"-hls_segment_filename", str(segment_pattern),
str(playlist_path),
]
try:
result = await asyncio.to_thread(
subprocess.run, cmd, capture_output=True, text=True, check=True
)
logger.info(f"HLS segments created for {level.name}")
return playlist_path
except subprocess.CalledProcessError as e:
error_msg = f"HLS segmentation failed for {level.name}: {e.stderr}"
logger.error(error_msg)
raise FFmpegError(error_msg)
async def _write_master_playlist(
self, master_path: Path, playlist_info: List[tuple]
) -> None:
"""Write HLS master playlist file."""
lines = ["#EXTM3U", "#EXT-X-VERSION:6"]
for level, playlist_path in playlist_info:
# Calculate relative path from master playlist to rendition playlist
rel_path = playlist_path.relative_to(master_path.parent)
lines.extend([
f"#EXT-X-STREAM-INF:BANDWIDTH={level.bitrate * 1000},"
f"RESOLUTION={level.width}x{level.height},"
f"CODECS=\"{self._get_hls_codec_string(level.codec)}\"",
str(rel_path),
])
content = "\n".join(lines) + "\n"
await asyncio.to_thread(master_path.write_text, content)
logger.info(f"Master playlist written with {len(playlist_info)} renditions")
def _get_hls_codec_string(self, codec: str) -> str:
"""Get HLS codec string for manifest."""
codec_strings = {
"h264": "avc1.42E01E",
"hevc": "hev1.1.6.L93.B0",
"av1": "av01.0.05M.08",
}
return codec_strings.get(codec, "avc1.42E01E")
class HLSLiveGenerator:
"""Generates live HLS streams from real-time input."""
def __init__(self, segment_duration: int = 6, playlist_size: int = 10) -> None:
self.segment_duration = segment_duration
self.playlist_size = playlist_size # Number of segments to keep in playlist
async def start_live_stream(
self,
input_source: str, # RTMP URL, camera device, etc.
output_dir: Path,
stream_name: str,
bitrate_levels: List[BitrateLevel],
) -> None:
"""
Start live HLS streaming from input source.
Args:
input_source: Input source (RTMP, file, device)
output_dir: Output directory for HLS files
stream_name: Name of the stream
bitrate_levels: Bitrate levels for ABR streaming
"""
logger.info(f"Starting live HLS stream: {stream_name}")
# Create output directory
hls_dir = output_dir / "live" / stream_name
hls_dir.mkdir(parents=True, exist_ok=True)
# Start FFmpeg process for live streaming
tasks = []
for level in bitrate_levels:
task = asyncio.create_task(
self._start_live_rendition(input_source, hls_dir, level)
)
tasks.append(task)
# Create master playlist
master_playlist = hls_dir / f"{stream_name}.m3u8"
await self._create_live_master_playlist(master_playlist, bitrate_levels)
# Wait for all streaming processes
try:
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Live streaming error: {e}")
# Cancel all tasks
for task in tasks:
task.cancel()
raise
async def _start_live_rendition(
self, input_source: str, hls_dir: Path, level: BitrateLevel
) -> None:
"""Start live streaming for a single bitrate level."""
rendition_dir = hls_dir / level.name
rendition_dir.mkdir(exist_ok=True)
playlist_path = rendition_dir / f"{level.name}.m3u8"
segment_pattern = rendition_dir / f"{level.name}_%03d.ts"
cmd = [
"ffmpeg", "-y",
"-i", input_source,
"-c:v", self._get_encoder_for_codec(level.codec),
"-b:v", f"{level.bitrate}k",
"-maxrate", f"{level.max_bitrate}k",
"-s", f"{level.width}x{level.height}",
"-c:a", "aac", "-b:a", "128k",
"-f", "hls",
"-hls_time", str(self.segment_duration),
"-hls_list_size", str(self.playlist_size),
"-hls_flags", "delete_segments",
"-hls_segment_filename", str(segment_pattern),
str(playlist_path),
]
logger.info(f"Starting live encoding for {level.name}")
try:
# Start FFmpeg process
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Monitor process
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_msg = f"Live streaming failed for {level.name}: {stderr.decode()}"
logger.error(error_msg)
raise FFmpegError(error_msg)
except Exception as e:
logger.error(f"Live rendition error for {level.name}: {e}")
raise
async def _create_live_master_playlist(
self, master_path: Path, bitrate_levels: List[BitrateLevel]
) -> None:
"""Create master playlist for live streaming."""
lines = ["#EXTM3U", "#EXT-X-VERSION:6"]
for level in bitrate_levels:
rel_path = f"{level.name}/{level.name}.m3u8"
lines.extend([
f"#EXT-X-STREAM-INF:BANDWIDTH={level.bitrate * 1000},"
f"RESOLUTION={level.width}x{level.height},"
f"CODECS=\"{self._get_hls_codec_string(level.codec)}\"",
rel_path,
])
content = "\n".join(lines) + "\n"
await asyncio.to_thread(master_path.write_text, content)
logger.info("Live master playlist created")
def _get_encoder_for_codec(self, codec: str) -> str:
"""Get FFmpeg encoder for codec."""
encoders = {
"h264": "libx264",
"hevc": "libx265",
"av1": "libaom-av1",
}
return encoders.get(codec, "libx264")
def _get_hls_codec_string(self, codec: str) -> str:
"""Get HLS codec string for manifest."""
codec_strings = {
"h264": "avc1.42E01E",
"hevc": "hev1.1.6.L93.B0",
"av1": "av01.0.05M.08",
}
return codec_strings.get(codec, "avc1.42E01E")

View File

@ -0,0 +1,299 @@
"""Tests for adaptive streaming functionality."""
import pytest
from pathlib import Path
from unittest.mock import Mock, patch, AsyncMock
from video_processor.config import ProcessorConfig
from video_processor.streaming.adaptive import (
AdaptiveStreamProcessor,
BitrateLevel,
StreamingPackage
)
class TestBitrateLevel:
"""Test BitrateLevel dataclass."""
def test_bitrate_level_creation(self):
"""Test BitrateLevel creation."""
level = BitrateLevel(
name="720p",
width=1280,
height=720,
bitrate=3000,
max_bitrate=4500,
codec="h264",
container="mp4"
)
assert level.name == "720p"
assert level.width == 1280
assert level.height == 720
assert level.bitrate == 3000
assert level.max_bitrate == 4500
assert level.codec == "h264"
assert level.container == "mp4"
class TestStreamingPackage:
"""Test StreamingPackage dataclass."""
def test_streaming_package_creation(self):
"""Test StreamingPackage creation."""
package = StreamingPackage(
video_id="test_video",
source_path=Path("input.mp4"),
output_dir=Path("/output"),
segment_duration=6
)
assert package.video_id == "test_video"
assert package.source_path == Path("input.mp4")
assert package.output_dir == Path("/output")
assert package.segment_duration == 6
assert package.hls_playlist is None
assert package.dash_manifest is None
class TestAdaptiveStreamProcessor:
"""Test adaptive stream processor functionality."""
def test_initialization(self):
"""Test processor initialization."""
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
assert processor.config == config
assert processor.enable_ai_optimization in [True, False] # Depends on AI availability
def test_initialization_with_ai_disabled(self):
"""Test processor initialization with AI disabled."""
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config, enable_ai_optimization=False)
assert processor.enable_ai_optimization is False
assert processor.content_analyzer is None
def test_get_streaming_capabilities(self):
"""Test streaming capabilities reporting."""
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
capabilities = processor.get_streaming_capabilities()
assert isinstance(capabilities, dict)
assert "hls_streaming" in capabilities
assert "dash_streaming" in capabilities
assert "ai_optimization" in capabilities
assert "advanced_codecs" in capabilities
assert "thumbnail_tracks" in capabilities
assert "multi_bitrate" in capabilities
def test_get_output_format_mapping(self):
"""Test codec to output format mapping."""
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
assert processor._get_output_format("h264") == "mp4"
assert processor._get_output_format("hevc") == "hevc"
assert processor._get_output_format("av1") == "av1_mp4"
assert processor._get_output_format("unknown") == "mp4"
def test_get_quality_preset_for_bitrate(self):
"""Test quality preset selection based on bitrate."""
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
assert processor._get_quality_preset_for_bitrate(500) == "low"
assert processor._get_quality_preset_for_bitrate(2000) == "medium"
assert processor._get_quality_preset_for_bitrate(5000) == "high"
assert processor._get_quality_preset_for_bitrate(10000) == "ultra"
def test_get_ffmpeg_options_for_level(self):
"""Test FFmpeg options generation for bitrate levels."""
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
level = BitrateLevel(
name="720p", width=1280, height=720,
bitrate=3000, max_bitrate=4500,
codec="h264", container="mp4"
)
options = processor._get_ffmpeg_options_for_level(level)
assert options["b:v"] == "3000k"
assert options["maxrate"] == "4500k"
assert options["bufsize"] == "9000k"
assert options["s"] == "1280x720"
@pytest.mark.asyncio
async def test_generate_optimal_bitrate_ladder_without_ai(self):
"""Test bitrate ladder generation without AI analysis."""
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config, enable_ai_optimization=False)
levels = await processor._generate_optimal_bitrate_ladder(Path("test.mp4"))
assert isinstance(levels, list)
assert len(levels) >= 1
assert all(isinstance(level, BitrateLevel) for level in levels)
@pytest.mark.asyncio
@patch('video_processor.streaming.adaptive.VideoContentAnalyzer')
async def test_generate_optimal_bitrate_ladder_with_ai(self, mock_analyzer_class):
"""Test bitrate ladder generation with AI analysis."""
# Mock AI analyzer
mock_analyzer = Mock()
mock_analysis = Mock()
mock_analysis.resolution = (1920, 1080)
mock_analysis.motion_intensity = 0.8
mock_analyzer.analyze_content = AsyncMock(return_value=mock_analysis)
mock_analyzer_class.return_value = mock_analyzer
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config, enable_ai_optimization=True)
processor.content_analyzer = mock_analyzer
levels = await processor._generate_optimal_bitrate_ladder(Path("test.mp4"))
assert isinstance(levels, list)
assert len(levels) >= 1
# Check that bitrates were adjusted for high motion
for level in levels:
assert level.bitrate > 0
assert level.max_bitrate > level.bitrate
@pytest.mark.asyncio
@patch('video_processor.streaming.adaptive.VideoProcessor')
@patch('video_processor.streaming.adaptive.asyncio.to_thread')
async def test_generate_bitrate_renditions(self, mock_to_thread, mock_processor_class):
"""Test bitrate rendition generation."""
# Mock VideoProcessor
mock_result = Mock()
mock_result.encoded_files = {"mp4": Path("/output/test.mp4")}
mock_processor_instance = Mock()
mock_processor_instance.process_video.return_value = mock_result
mock_processor_class.return_value = mock_processor_instance
mock_to_thread.return_value = mock_result
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
bitrate_levels = [
BitrateLevel("480p", 854, 480, 1500, 2250, "h264", "mp4"),
BitrateLevel("720p", 1280, 720, 3000, 4500, "h264", "mp4"),
]
with patch('pathlib.Path.mkdir'):
rendition_files = await processor._generate_bitrate_renditions(
Path("input.mp4"), Path("/output"), "test_video", bitrate_levels
)
assert isinstance(rendition_files, dict)
assert len(rendition_files) == 2
assert "480p" in rendition_files
assert "720p" in rendition_files
@pytest.mark.asyncio
@patch('video_processor.streaming.adaptive.asyncio.to_thread')
async def test_generate_thumbnail_track(self, mock_to_thread):
"""Test thumbnail track generation."""
# Mock VideoProcessor result
mock_result = Mock()
mock_result.sprite_file = Path("/output/sprite.jpg")
mock_to_thread.return_value = mock_result
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
with patch('video_processor.streaming.adaptive.VideoProcessor'):
thumbnail_track = await processor._generate_thumbnail_track(
Path("input.mp4"), Path("/output"), "test_video"
)
assert thumbnail_track == Path("/output/sprite.jpg")
@pytest.mark.asyncio
@patch('video_processor.streaming.adaptive.asyncio.to_thread')
async def test_generate_thumbnail_track_failure(self, mock_to_thread):
"""Test thumbnail track generation failure."""
mock_to_thread.side_effect = Exception("Thumbnail generation failed")
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
with patch('video_processor.streaming.adaptive.VideoProcessor'):
thumbnail_track = await processor._generate_thumbnail_track(
Path("input.mp4"), Path("/output"), "test_video"
)
assert thumbnail_track is None
@pytest.mark.asyncio
@patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_hls_playlist')
@patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_dash_manifest')
@patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_thumbnail_track')
@patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_bitrate_renditions')
@patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_optimal_bitrate_ladder')
async def test_create_adaptive_stream(
self, mock_ladder, mock_renditions, mock_thumbnail, mock_dash, mock_hls
):
"""Test complete adaptive stream creation."""
# Setup mocks
mock_bitrate_levels = [
BitrateLevel("720p", 1280, 720, 3000, 4500, "h264", "mp4")
]
mock_rendition_files = {"720p": Path("/output/720p.mp4")}
mock_ladder.return_value = mock_bitrate_levels
mock_renditions.return_value = mock_rendition_files
mock_thumbnail.return_value = Path("/output/sprite.jpg")
mock_hls.return_value = Path("/output/playlist.m3u8")
mock_dash.return_value = Path("/output/manifest.mpd")
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
with patch('pathlib.Path.mkdir'):
result = await processor.create_adaptive_stream(
Path("input.mp4"),
Path("/output"),
"test_video",
["hls", "dash"]
)
assert isinstance(result, StreamingPackage)
assert result.video_id == "test_video"
assert result.hls_playlist == Path("/output/playlist.m3u8")
assert result.dash_manifest == Path("/output/manifest.mpd")
assert result.thumbnail_track == Path("/output/sprite.jpg")
assert result.bitrate_levels == mock_bitrate_levels
@pytest.mark.asyncio
async def test_create_adaptive_stream_with_custom_ladder(self):
"""Test adaptive stream creation with custom bitrate ladder."""
custom_levels = [
BitrateLevel("480p", 854, 480, 1500, 2250, "h264", "mp4"),
]
config = ProcessorConfig()
processor = AdaptiveStreamProcessor(config)
with patch.multiple(
processor,
_generate_bitrate_renditions=AsyncMock(return_value={"480p": Path("test.mp4")}),
_generate_hls_playlist=AsyncMock(return_value=Path("playlist.m3u8")),
_generate_dash_manifest=AsyncMock(return_value=Path("manifest.mpd")),
_generate_thumbnail_track=AsyncMock(return_value=Path("sprite.jpg")),
), patch('pathlib.Path.mkdir'):
result = await processor.create_adaptive_stream(
Path("input.mp4"),
Path("/output"),
"test_video",
custom_bitrate_ladder=custom_levels
)
assert result.bitrate_levels == custom_levels