From 91139264fd6b01b17ead9a0f4126a9ff7fdf40e6 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sat, 6 Sep 2025 05:43:00 -0600 Subject: [PATCH] Implement comprehensive streaming & real-time processing capabilities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 Implementation: Advanced Adaptive Streaming • Built AdaptiveStreamProcessor that leverages existing VideoProcessor infrastructure • AI-optimized bitrate ladder generation using content analysis with intelligent fallbacks • Comprehensive HLS playlist generation with segmentation and live streaming support • Complete DASH manifest generation with XML structure and live streaming capabilities • Integrated seamlessly with Phase 1 (AI analysis) and Phase 2 (advanced codecs) • Created 15 comprehensive tests covering all streaming functionality - all passing • Built demonstration script showcasing adaptive streaming, custom bitrate ladders, and deployment Key Features: - Multi-bitrate adaptive streaming with HLS & DASH support - AI-powered content analysis for optimized bitrate selection - Live streaming capabilities with RTMP input support - CDN-ready streaming packages with proper manifest generation - Thumbnail track generation for video scrubbing - Hardware acceleration support and codec-specific optimizations - Production deployment considerations and integration guidance Technical Architecture: - BitrateLevel dataclass for streaming configuration - StreamingPackage for complete adaptive stream management - HLSGenerator & DASHGenerator for format-specific manifest creation - Async/concurrent processing for optimal performance - Graceful degradation when AI dependencies unavailable 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- examples/streaming_demo.py | 318 +++++++++++++++++++ src/video_processor/streaming/__init__.py | 12 + src/video_processor/streaming/adaptive.py | 357 ++++++++++++++++++++++ src/video_processor/streaming/dash.py | 333 ++++++++++++++++++++ src/video_processor/streaming/hls.py | 262 ++++++++++++++++ tests/unit/test_adaptive_streaming.py | 299 ++++++++++++++++++ 6 files changed, 1581 insertions(+) create mode 100644 examples/streaming_demo.py create mode 100644 src/video_processor/streaming/__init__.py create mode 100644 src/video_processor/streaming/adaptive.py create mode 100644 src/video_processor/streaming/dash.py create mode 100644 src/video_processor/streaming/hls.py create mode 100644 tests/unit/test_adaptive_streaming.py diff --git a/examples/streaming_demo.py b/examples/streaming_demo.py new file mode 100644 index 0000000..be88830 --- /dev/null +++ b/examples/streaming_demo.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +""" +Streaming & Real-Time Processing Demonstration + +Showcases adaptive streaming capabilities (HLS, DASH) built on the existing +comprehensive video processing infrastructure with AI optimization. +""" + +import asyncio +import logging +from pathlib import Path + +from video_processor import ProcessorConfig +from video_processor.streaming import AdaptiveStreamProcessor, BitrateLevel + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def demonstrate_adaptive_streaming(video_path: Path, output_dir: Path): + """Demonstrate adaptive streaming creation.""" + logger.info("=== Adaptive Streaming Demonstration ===") + + # Configure for streaming with multiple formats and AI optimization + config = ProcessorConfig( + base_path=output_dir, + output_formats=["mp4", "hevc", "av1_mp4"], # Multiple codec options + quality_preset="high", + enable_av1_encoding=True, + enable_hevc_encoding=True, + generate_sprites=True, + sprite_interval=5, # More frequent for streaming + ) + + # Create adaptive stream processor with AI optimization + processor = AdaptiveStreamProcessor(config, enable_ai_optimization=True) + + print(f"\n🔍 Streaming Capabilities:") + capabilities = processor.get_streaming_capabilities() + for capability, available in capabilities.items(): + status = "✅ Available" if available else "❌ Not Available" + print(f" {capability.replace('_', ' ').title()}: {status}") + + print(f"\n🎯 Creating Adaptive Streaming Package...") + print(f" Source: {video_path}") + print(f" Output: {output_dir}") + + try: + # Create adaptive streaming package + streaming_package = await processor.create_adaptive_stream( + video_path=video_path, + output_dir=output_dir, + video_id="demo_stream", + streaming_formats=["hls", "dash"], + ) + + print(f"\n🎉 Streaming Package Created Successfully!") + print(f" Video ID: {streaming_package.video_id}") + print(f" Output Directory: {streaming_package.output_dir}") + print(f" Segment Duration: {streaming_package.segment_duration}s") + + # Display bitrate ladder information + print(f"\n📊 Bitrate Ladder ({len(streaming_package.bitrate_levels)} levels):") + for level in streaming_package.bitrate_levels: + print(f" {level.name:<6} | {level.width}x{level.height:<4} | {level.bitrate:>4}k | {level.codec.upper()}") + + # Display generated files + print(f"\n📁 Generated Files:") + if streaming_package.hls_playlist: + print(f" HLS Playlist: {streaming_package.hls_playlist}") + if streaming_package.dash_manifest: + print(f" DASH Manifest: {streaming_package.dash_manifest}") + if streaming_package.thumbnail_track: + print(f" Thumbnail Track: {streaming_package.thumbnail_track}") + + return streaming_package + + except Exception as e: + logger.error(f"Adaptive streaming failed: {e}") + raise + + +async def demonstrate_custom_bitrate_ladder(video_path: Path, output_dir: Path): + """Demonstrate custom bitrate ladder configuration.""" + logger.info("=== Custom Bitrate Ladder Demonstration ===") + + # Define custom bitrate ladder optimized for mobile streaming + mobile_ladder = [ + BitrateLevel("240p", 426, 240, 300, 450, "h264", "mp4"), # Very low bandwidth + BitrateLevel("360p", 640, 360, 600, 900, "h264", "mp4"), # Low bandwidth + BitrateLevel("480p", 854, 480, 1200, 1800, "hevc", "mp4"), # Medium with HEVC + BitrateLevel("720p", 1280, 720, 2400, 3600, "av1", "mp4"), # High with AV1 + ] + + print(f"\n📱 Mobile-Optimized Bitrate Ladder:") + print(f"{'Level':<6} | {'Resolution':<10} | {'Bitrate':<8} | {'Codec'}") + print("-" * 45) + for level in mobile_ladder: + print(f"{level.name:<6} | {level.width}x{level.height:<6} | {level.bitrate:>4}k | {level.codec.upper()}") + + config = ProcessorConfig( + base_path=output_dir / "mobile", + quality_preset="medium", + ) + + processor = AdaptiveStreamProcessor(config) + + try: + # Create streaming package with custom ladder + streaming_package = await processor.create_adaptive_stream( + video_path=video_path, + output_dir=output_dir / "mobile", + video_id="mobile_stream", + streaming_formats=["hls"], # HLS for mobile + custom_bitrate_ladder=mobile_ladder, + ) + + print(f"\n🎉 Mobile Streaming Package Created!") + print(f" HLS Playlist: {streaming_package.hls_playlist}") + print(f" Optimized for: Mobile devices and low bandwidth") + + return streaming_package + + except Exception as e: + logger.error(f"Mobile streaming failed: {e}") + raise + + +async def demonstrate_ai_optimized_streaming(video_path: Path, output_dir: Path): + """Demonstrate AI-optimized adaptive streaming.""" + logger.info("=== AI-Optimized Streaming Demonstration ===") + + config = ProcessorConfig( + base_path=output_dir / "ai_optimized", + quality_preset="high", + enable_av1_encoding=True, + enable_hevc_encoding=True, + ) + + # Enable AI optimization + processor = AdaptiveStreamProcessor(config, enable_ai_optimization=True) + + if not processor.enable_ai_optimization: + print(" ⚠️ AI optimization not available (missing dependencies)") + print(" Using intelligent defaults based on video characteristics") + + print(f"\n🧠 AI-Enhanced Streaming Features:") + print(f" ✅ Content-aware bitrate ladder generation") + print(f" ✅ Motion-adaptive bitrate adjustment") + print(f" ✅ Resolution-aware quality optimization") + print(f" ✅ Codec selection based on content analysis") + + try: + # Let AI analyze and optimize the streaming package + streaming_package = await processor.create_adaptive_stream( + video_path=video_path, + output_dir=output_dir / "ai_optimized", + video_id="ai_stream", + ) + + print(f"\n🎯 AI Optimization Results:") + print(f" Generated {len(streaming_package.bitrate_levels)} bitrate levels") + print(f" Streaming formats: HLS + DASH") + + # Show how AI influenced the bitrate ladder + total_bitrate = sum(level.bitrate for level in streaming_package.bitrate_levels) + avg_bitrate = total_bitrate / len(streaming_package.bitrate_levels) + print(f" Average bitrate: {avg_bitrate:.0f}k (optimized for content)") + + # Show codec distribution + codec_count = {} + for level in streaming_package.bitrate_levels: + codec_count[level.codec] = codec_count.get(level.codec, 0) + 1 + + print(f" Codec distribution:") + for codec, count in codec_count.items(): + print(f" {codec.upper()}: {count} level(s)") + + return streaming_package + + except Exception as e: + logger.error(f"AI-optimized streaming failed: {e}") + raise + + +def demonstrate_streaming_deployment(streaming_packages: list): + """Demonstrate streaming deployment considerations.""" + logger.info("=== Streaming Deployment Guide ===") + + print(f"\n🚀 Production Deployment Considerations:") + print(f"\n📦 CDN Distribution:") + print(f" • Upload generated HLS/DASH files to CDN") + print(f" • Configure proper MIME types:") + print(f" - .m3u8 files: application/vnd.apple.mpegurl") + print(f" - .mpd files: application/dash+xml") + print(f" - .ts/.m4s segments: video/mp2t, video/mp4") + + print(f"\n🌐 Web Player Integration:") + print(f" • HLS: Use hls.js for browser support") + print(f" • DASH: Use dash.js or shaka-player") + print(f" • Native support: Safari (HLS), Chrome/Edge (DASH)") + + print(f"\n📊 Analytics & Monitoring:") + print(f" • Track bitrate switching events") + print(f" • Monitor buffer health and stall events") + print(f" • Measure startup time and seeking performance") + + print(f"\n💾 Storage Optimization:") + total_files = 0 + total_size_estimate = 0 + + for i, package in enumerate(streaming_packages, 1): + files_count = len(package.bitrate_levels) * 2 # HLS + DASH per level + total_files += files_count + + # Rough size estimate (segments + manifests) + size_estimate = files_count * 50 # ~50KB per segment average + total_size_estimate += size_estimate + + print(f" Package {i}: ~{files_count} files, ~{size_estimate}KB") + + print(f" Total: ~{total_files} files, ~{total_size_estimate}KB") + + print(f"\n🔒 Security Considerations:") + print(f" • DRM integration for premium content") + print(f" • Token-based authentication for private streams") + print(f" • HTTPS delivery for all manifest and segment files") + + +async def main(): + """Main demonstration function.""" + video_path = Path("tests/fixtures/videos/big_buck_bunny_720p_1mb.mp4") + output_dir = Path("/tmp/streaming_demo") + + # Create output directory + output_dir.mkdir(exist_ok=True) + + print("🎬 Streaming & Real-Time Processing Demonstration") + print("=" * 55) + + if not video_path.exists(): + print(f"⚠️ Test video not found: {video_path}") + print(" Please provide a video file path as argument:") + print(" python examples/streaming_demo.py /path/to/your/video.mp4") + return + + streaming_packages = [] + + try: + # 1. Standard adaptive streaming + package1 = await demonstrate_adaptive_streaming(video_path, output_dir) + streaming_packages.append(package1) + + print("\n" + "="*55) + + # 2. Custom bitrate ladder + package2 = await demonstrate_custom_bitrate_ladder(video_path, output_dir) + streaming_packages.append(package2) + + print("\n" + "="*55) + + # 3. AI-optimized streaming + package3 = await demonstrate_ai_optimized_streaming(video_path, output_dir) + streaming_packages.append(package3) + + print("\n" + "="*55) + + # 4. Deployment guide + demonstrate_streaming_deployment(streaming_packages) + + print(f"\n🎉 Streaming demonstration complete!") + print(f" Generated {len(streaming_packages)} streaming packages") + print(f" Output directory: {output_dir}") + print(f" Ready for CDN deployment and web player integration!") + + except Exception as e: + logger.error(f"Streaming demonstration failed: {e}") + raise + + +if __name__ == "__main__": + import sys + + # Allow custom video path + if len(sys.argv) > 1: + custom_video_path = Path(sys.argv[1]) + if custom_video_path.exists(): + # Override main function with custom path + async def custom_main(): + output_dir = Path("/tmp/streaming_demo") + output_dir.mkdir(exist_ok=True) + + print("🎬 Streaming & Real-Time Processing Demonstration") + print("=" * 55) + print(f"Using custom video: {custom_video_path}") + + streaming_packages = [] + + package1 = await demonstrate_adaptive_streaming(custom_video_path, output_dir) + streaming_packages.append(package1) + + package2 = await demonstrate_custom_bitrate_ladder(custom_video_path, output_dir) + streaming_packages.append(package2) + + package3 = await demonstrate_ai_optimized_streaming(custom_video_path, output_dir) + streaming_packages.append(package3) + + demonstrate_streaming_deployment(streaming_packages) + + print(f"\n🎉 Streaming demonstration complete!") + print(f" Output directory: {output_dir}") + + asyncio.run(custom_main()) + else: + print(f"❌ Video file not found: {custom_video_path}") + else: + asyncio.run(main()) \ No newline at end of file diff --git a/src/video_processor/streaming/__init__.py b/src/video_processor/streaming/__init__.py new file mode 100644 index 0000000..37cdc8f --- /dev/null +++ b/src/video_processor/streaming/__init__.py @@ -0,0 +1,12 @@ +"""Streaming and real-time video processing modules.""" + +from .adaptive import AdaptiveStreamProcessor, StreamingPackage +from .hls import HLSGenerator +from .dash import DASHGenerator + +__all__ = [ + "AdaptiveStreamProcessor", + "StreamingPackage", + "HLSGenerator", + "DASHGenerator", +] \ No newline at end of file diff --git a/src/video_processor/streaming/adaptive.py b/src/video_processor/streaming/adaptive.py new file mode 100644 index 0000000..fb94233 --- /dev/null +++ b/src/video_processor/streaming/adaptive.py @@ -0,0 +1,357 @@ +"""Adaptive streaming processor that builds on existing encoding infrastructure.""" + +import asyncio +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional, Literal + +from ..config import ProcessorConfig +from ..core.processor import VideoProcessor +from ..exceptions import EncodingError, VideoProcessorError + +# Optional AI integration +try: + from ..ai.content_analyzer import VideoContentAnalyzer + HAS_AI_SUPPORT = True +except ImportError: + HAS_AI_SUPPORT = False + +logger = logging.getLogger(__name__) + + +@dataclass +class BitrateLevel: + """Represents a single bitrate level in adaptive streaming.""" + name: str + width: int + height: int + bitrate: int # kbps + max_bitrate: int # kbps + codec: str + container: str + + +@dataclass +class StreamingPackage: + """Complete adaptive streaming package.""" + video_id: str + source_path: Path + output_dir: Path + hls_playlist: Optional[Path] = None + dash_manifest: Optional[Path] = None + bitrate_levels: List[BitrateLevel] = None + segment_duration: int = 6 # seconds + thumbnail_track: Optional[Path] = None + metadata: Optional[Dict] = None + + +class AdaptiveStreamProcessor: + """ + Adaptive streaming processor that leverages existing video processing infrastructure. + + Creates HLS and DASH streams with multiple bitrate levels optimized using AI analysis. + """ + + def __init__(self, config: ProcessorConfig, enable_ai_optimization: bool = True) -> None: + self.config = config + self.enable_ai_optimization = enable_ai_optimization and HAS_AI_SUPPORT + + if self.enable_ai_optimization: + self.content_analyzer = VideoContentAnalyzer() + else: + self.content_analyzer = None + + logger.info(f"Adaptive streaming initialized with AI optimization: {self.enable_ai_optimization}") + + async def create_adaptive_stream( + self, + video_path: Path, + output_dir: Path, + video_id: Optional[str] = None, + streaming_formats: List[Literal["hls", "dash"]] = None, + custom_bitrate_ladder: Optional[List[BitrateLevel]] = None, + ) -> StreamingPackage: + """ + Create adaptive streaming package from source video. + + Args: + video_path: Source video file + output_dir: Output directory for streaming files + video_id: Optional video identifier + streaming_formats: List of streaming formats to generate + custom_bitrate_ladder: Custom bitrate levels (uses optimized defaults if None) + + Returns: + Complete streaming package with manifests and segments + """ + if video_id is None: + video_id = video_path.stem + + if streaming_formats is None: + streaming_formats = ["hls", "dash"] + + logger.info(f"Creating adaptive stream for {video_path} -> {output_dir}") + + # Step 1: Analyze source video for optimal bitrate ladder + bitrate_levels = custom_bitrate_ladder + if bitrate_levels is None: + bitrate_levels = await self._generate_optimal_bitrate_ladder(video_path) + + # Step 2: Create output directory structure + stream_dir = output_dir / video_id + stream_dir.mkdir(parents=True, exist_ok=True) + + # Step 3: Generate multiple bitrate renditions + rendition_files = await self._generate_bitrate_renditions( + video_path, stream_dir, video_id, bitrate_levels + ) + + # Step 4: Generate streaming manifests + streaming_package = StreamingPackage( + video_id=video_id, + source_path=video_path, + output_dir=stream_dir, + bitrate_levels=bitrate_levels, + ) + + if "hls" in streaming_formats: + streaming_package.hls_playlist = await self._generate_hls_playlist( + stream_dir, video_id, bitrate_levels, rendition_files + ) + + if "dash" in streaming_formats: + streaming_package.dash_manifest = await self._generate_dash_manifest( + stream_dir, video_id, bitrate_levels, rendition_files + ) + + # Step 5: Generate thumbnail track for scrubbing + streaming_package.thumbnail_track = await self._generate_thumbnail_track( + video_path, stream_dir, video_id + ) + + logger.info(f"Adaptive streaming package created successfully") + return streaming_package + + async def _generate_optimal_bitrate_ladder(self, video_path: Path) -> List[BitrateLevel]: + """ + Generate optimal bitrate ladder using AI analysis or intelligent defaults. + """ + logger.info("Generating optimal bitrate ladder") + + # Get source video characteristics + source_analysis = None + if self.enable_ai_optimization and self.content_analyzer: + try: + source_analysis = await self.content_analyzer.analyze_content(video_path) + logger.info(f"AI analysis: {source_analysis.resolution}, motion: {source_analysis.motion_intensity:.2f}") + except Exception as e: + logger.warning(f"AI analysis failed, using defaults: {e}") + + # Base bitrate ladder + base_levels = [ + BitrateLevel("240p", 426, 240, 400, 600, "h264", "mp4"), + BitrateLevel("360p", 640, 360, 800, 1200, "h264", "mp4"), + BitrateLevel("480p", 854, 480, 1500, 2250, "h264", "mp4"), + BitrateLevel("720p", 1280, 720, 3000, 4500, "h264", "mp4"), + BitrateLevel("1080p", 1920, 1080, 6000, 9000, "h264", "mp4"), + ] + + # Optimize ladder based on source characteristics + optimized_levels = [] + + if source_analysis: + source_width, source_height = source_analysis.resolution + motion_multiplier = 1.0 + (source_analysis.motion_intensity * 0.5) # Up to 1.5x for high motion + + for level in base_levels: + # Skip levels higher than source resolution + if level.width > source_width or level.height > source_height: + continue + + # Adjust bitrates based on motion content + adjusted_bitrate = int(level.bitrate * motion_multiplier) + adjusted_max_bitrate = int(level.max_bitrate * motion_multiplier) + + # Use advanced codecs for higher quality levels if available + codec = level.codec + if level.height >= 720 and self.config.enable_hevc_encoding: + codec = "hevc" + elif level.height >= 1080 and self.config.enable_av1_encoding: + codec = "av1" + + optimized_level = BitrateLevel( + name=level.name, + width=level.width, + height=level.height, + bitrate=adjusted_bitrate, + max_bitrate=adjusted_max_bitrate, + codec=codec, + container=level.container, + ) + optimized_levels.append(optimized_level) + else: + # Use base levels without optimization + optimized_levels = base_levels + + # Ensure we have at least one level + if not optimized_levels: + optimized_levels = [base_levels[2]] # Default to 480p + + logger.info(f"Generated {len(optimized_levels)} bitrate levels") + return optimized_levels + + async def _generate_bitrate_renditions( + self, + source_path: Path, + output_dir: Path, + video_id: str, + bitrate_levels: List[BitrateLevel], + ) -> Dict[str, Path]: + """ + Generate multiple bitrate renditions using existing VideoProcessor infrastructure. + """ + logger.info(f"Generating {len(bitrate_levels)} bitrate renditions") + rendition_files = {} + + for level in bitrate_levels: + rendition_name = f"{video_id}_{level.name}" + rendition_dir = output_dir / level.name + rendition_dir.mkdir(exist_ok=True) + + # Create specialized config for this bitrate level + rendition_config = ProcessorConfig( + base_path=rendition_dir, + output_formats=[self._get_output_format(level.codec)], + quality_preset=self._get_quality_preset_for_bitrate(level.bitrate), + custom_ffmpeg_options=self._get_ffmpeg_options_for_level(level), + ) + + # Process video at this bitrate level + try: + processor = VideoProcessor(rendition_config) + result = await asyncio.to_thread( + processor.process_video, source_path, rendition_name + ) + + # Get the generated file + format_name = self._get_output_format(level.codec) + if format_name in result.encoded_files: + rendition_files[level.name] = result.encoded_files[format_name] + logger.info(f"Generated {level.name} rendition: {result.encoded_files[format_name]}") + else: + logger.error(f"Failed to generate {level.name} rendition") + + except Exception as e: + logger.error(f"Error generating {level.name} rendition: {e}") + raise EncodingError(f"Failed to generate {level.name} rendition: {e}") + + return rendition_files + + def _get_output_format(self, codec: str) -> str: + """Map codec to output format.""" + codec_map = { + "h264": "mp4", + "hevc": "hevc", + "av1": "av1_mp4", + } + return codec_map.get(codec, "mp4") + + def _get_quality_preset_for_bitrate(self, bitrate: int) -> str: + """Select quality preset based on target bitrate.""" + if bitrate < 1000: + return "low" + elif bitrate < 3000: + return "medium" + elif bitrate < 8000: + return "high" + else: + return "ultra" + + def _get_ffmpeg_options_for_level(self, level: BitrateLevel) -> Dict[str, str]: + """Generate FFmpeg options for specific bitrate level.""" + return { + "b:v": f"{level.bitrate}k", + "maxrate": f"{level.max_bitrate}k", + "bufsize": f"{level.max_bitrate * 2}k", + "s": f"{level.width}x{level.height}", + } + + async def _generate_hls_playlist( + self, + output_dir: Path, + video_id: str, + bitrate_levels: List[BitrateLevel], + rendition_files: Dict[str, Path], + ) -> Path: + """Generate HLS master playlist and segment individual renditions.""" + from .hls import HLSGenerator + + hls_generator = HLSGenerator() + playlist_path = await hls_generator.create_master_playlist( + output_dir, video_id, bitrate_levels, rendition_files + ) + + logger.info(f"HLS playlist generated: {playlist_path}") + return playlist_path + + async def _generate_dash_manifest( + self, + output_dir: Path, + video_id: str, + bitrate_levels: List[BitrateLevel], + rendition_files: Dict[str, Path], + ) -> Path: + """Generate DASH MPD manifest.""" + from .dash import DASHGenerator + + dash_generator = DASHGenerator() + manifest_path = await dash_generator.create_manifest( + output_dir, video_id, bitrate_levels, rendition_files + ) + + logger.info(f"DASH manifest generated: {manifest_path}") + return manifest_path + + async def _generate_thumbnail_track( + self, + source_path: Path, + output_dir: Path, + video_id: str, + ) -> Path: + """Generate thumbnail track for video scrubbing using existing infrastructure.""" + try: + # Use existing thumbnail generation with optimized settings + thumbnail_config = ProcessorConfig( + base_path=output_dir, + thumbnail_timestamps=list(range(0, 300, 10)), # Every 10 seconds up to 5 minutes + generate_sprites=True, + sprite_interval=5, # More frequent for streaming + ) + + processor = VideoProcessor(thumbnail_config) + result = await asyncio.to_thread( + processor.process_video, source_path, f"{video_id}_thumbnails" + ) + + if result.sprite_file: + logger.info(f"Thumbnail track generated: {result.sprite_file}") + return result.sprite_file + else: + logger.warning("No thumbnail track generated") + return None + + except Exception as e: + logger.error(f"Thumbnail track generation failed: {e}") + return None + + def get_streaming_capabilities(self) -> Dict[str, bool]: + """Get information about available streaming capabilities.""" + return { + "hls_streaming": True, + "dash_streaming": True, + "ai_optimization": self.enable_ai_optimization, + "advanced_codecs": self.config.enable_av1_encoding or self.config.enable_hevc_encoding, + "thumbnail_tracks": True, + "multi_bitrate": True, + } \ No newline at end of file diff --git a/src/video_processor/streaming/dash.py b/src/video_processor/streaming/dash.py new file mode 100644 index 0000000..26924c5 --- /dev/null +++ b/src/video_processor/streaming/dash.py @@ -0,0 +1,333 @@ +"""DASH (Dynamic Adaptive Streaming over HTTP) manifest generation.""" + +import asyncio +import logging +import subprocess +from pathlib import Path +from typing import Dict, List +import xml.etree.ElementTree as ET +from datetime import datetime, timezone + +from .adaptive import BitrateLevel +from ..exceptions import FFmpegError + +logger = logging.getLogger(__name__) + + +class DASHGenerator: + """Generates DASH MPD manifests and segments from video renditions.""" + + def __init__(self, segment_duration: int = 4) -> None: + self.segment_duration = segment_duration + + async def create_manifest( + self, + output_dir: Path, + video_id: str, + bitrate_levels: List[BitrateLevel], + rendition_files: Dict[str, Path], + ) -> Path: + """ + Create DASH MPD manifest and segment all renditions. + + Args: + output_dir: Output directory + video_id: Video identifier + bitrate_levels: List of bitrate levels + rendition_files: Dictionary of rendition name to file path + + Returns: + Path to MPD manifest file + """ + logger.info(f"Creating DASH manifest for {video_id}") + + # Create DASH directory + dash_dir = output_dir / "dash" + dash_dir.mkdir(exist_ok=True) + + # Generate DASH segments for each rendition + adaptation_sets = [] + for level in bitrate_levels: + if level.name in rendition_files: + segments_info = await self._create_dash_segments( + dash_dir, level, rendition_files[level.name] + ) + adaptation_sets.append((level, segments_info)) + + # Create MPD manifest + manifest_path = dash_dir / f"{video_id}.mpd" + await self._create_mpd_manifest( + manifest_path, video_id, adaptation_sets + ) + + logger.info(f"DASH manifest created: {manifest_path}") + return manifest_path + + async def _create_dash_segments( + self, dash_dir: Path, level: BitrateLevel, video_file: Path + ) -> Dict: + """Create DASH segments for a single bitrate level.""" + rendition_dir = dash_dir / level.name + rendition_dir.mkdir(exist_ok=True) + + # DASH segment pattern + init_segment = rendition_dir / f"{level.name}_init.mp4" + segment_pattern = rendition_dir / f"{level.name}_$Number$.m4s" + + # Use FFmpeg to create DASH segments + cmd = [ + "ffmpeg", "-y", + "-i", str(video_file), + "-c", "copy", # Copy without re-encoding + "-f", "dash", + "-seg_duration", str(self.segment_duration), + "-init_seg_name", str(init_segment.name), + "-media_seg_name", f"{level.name}_$Number$.m4s", + "-single_file", "0", # Create separate segment files + str(rendition_dir / f"{level.name}.mpd"), + ] + + try: + result = await asyncio.to_thread( + subprocess.run, cmd, capture_output=True, text=True, check=True + ) + + # Get duration and segment count from the created files + segments_info = await self._analyze_dash_segments(rendition_dir, level.name) + logger.info(f"DASH segments created for {level.name}") + return segments_info + + except subprocess.CalledProcessError as e: + error_msg = f"DASH segmentation failed for {level.name}: {e.stderr}" + logger.error(error_msg) + raise FFmpegError(error_msg) + + async def _analyze_dash_segments(self, rendition_dir: Path, rendition_name: str) -> Dict: + """Analyze created DASH segments to get metadata.""" + # Count segment files + segment_files = list(rendition_dir.glob(f"{rendition_name}_*.m4s")) + segment_count = len(segment_files) + + # Get duration from FFprobe + try: + # Find the first video file in the directory (should be the source) + video_files = list(rendition_dir.glob("*.mp4")) + if video_files: + duration = await self._get_video_duration(video_files[0]) + else: + duration = segment_count * self.segment_duration # Estimate + + except Exception as e: + logger.warning(f"Could not get exact duration: {e}") + duration = segment_count * self.segment_duration + + return { + "segment_count": segment_count, + "duration": duration, + "init_segment": f"{rendition_name}_init.mp4", + "media_template": f"{rendition_name}_$Number$.m4s", + } + + async def _get_video_duration(self, video_path: Path) -> float: + """Get video duration using ffprobe.""" + cmd = [ + "ffprobe", "-v", "quiet", + "-show_entries", "format=duration", + "-of", "csv=p=0", + str(video_path) + ] + + result = await asyncio.to_thread( + subprocess.run, cmd, capture_output=True, text=True, check=True + ) + + return float(result.stdout.strip()) + + async def _create_mpd_manifest( + self, manifest_path: Path, video_id: str, adaptation_sets: List[tuple] + ) -> None: + """Create DASH MPD manifest XML.""" + # Calculate total duration (use first adaptation set) + if adaptation_sets: + total_duration = adaptation_sets[0][1]["duration"] + else: + total_duration = 0 + + # Create MPD root element + mpd = ET.Element("MPD") + mpd.set("xmlns", "urn:mpeg:dash:schema:mpd:2011") + mpd.set("type", "static") + mpd.set("mediaPresentationDuration", self._format_duration(total_duration)) + mpd.set("profiles", "urn:mpeg:dash:profile:isoff-on-demand:2011") + mpd.set("minBufferTime", f"PT{self.segment_duration}S") + + # Add publishing time + now = datetime.now(timezone.utc) + mpd.set("publishTime", now.isoformat().replace("+00:00", "Z")) + + # Create Period element + period = ET.SubElement(mpd, "Period") + period.set("id", "0") + period.set("duration", self._format_duration(total_duration)) + + # Group by codec for adaptation sets + codec_groups = {} + for level, segments_info in adaptation_sets: + if level.codec not in codec_groups: + codec_groups[level.codec] = [] + codec_groups[level.codec].append((level, segments_info)) + + # Create adaptation sets + adaptation_set_id = 0 + for codec, levels in codec_groups.items(): + adaptation_set = ET.SubElement(period, "AdaptationSet") + adaptation_set.set("id", str(adaptation_set_id)) + adaptation_set.set("contentType", "video") + adaptation_set.set("mimeType", "video/mp4") + adaptation_set.set("codecs", self._get_dash_codec_string(codec)) + adaptation_set.set("startWithSAP", "1") + adaptation_set.set("segmentAlignment", "true") + + # Add representations for each bitrate level + representation_id = 0 + for level, segments_info in levels: + representation = ET.SubElement(adaptation_set, "Representation") + representation.set("id", f"{adaptation_set_id}_{representation_id}") + representation.set("bandwidth", str(level.bitrate * 1000)) + representation.set("width", str(level.width)) + representation.set("height", str(level.height)) + representation.set("frameRate", "25") # Default frame rate + + # Add segment template + segment_template = ET.SubElement(representation, "SegmentTemplate") + segment_template.set("timescale", "1000") + segment_template.set("duration", str(self.segment_duration * 1000)) + segment_template.set("initialization", f"{level.name}/{segments_info['init_segment']}") + segment_template.set("media", f"{level.name}/{segments_info['media_template']}") + segment_template.set("startNumber", "1") + + representation_id += 1 + + adaptation_set_id += 1 + + # Write XML to file + tree = ET.ElementTree(mpd) + ET.indent(tree, space=" ", level=0) # Pretty print + + await asyncio.to_thread( + tree.write, + manifest_path, + encoding="utf-8", + xml_declaration=True + ) + + logger.info(f"MPD manifest written with {len(adaptation_sets)} representations") + + def _format_duration(self, seconds: float) -> str: + """Format duration in ISO 8601 format for DASH.""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = seconds % 60 + return f"PT{hours}H{minutes}M{secs:.3f}S" + + def _get_dash_codec_string(self, codec: str) -> str: + """Get DASH codec string for manifest.""" + codec_strings = { + "h264": "avc1.42E01E", + "hevc": "hev1.1.6.L93.B0", + "av1": "av01.0.05M.08", + } + return codec_strings.get(codec, "avc1.42E01E") + + +class DASHLiveGenerator: + """Generates live DASH streams.""" + + def __init__(self, segment_duration: int = 4, time_shift_buffer: int = 300) -> None: + self.segment_duration = segment_duration + self.time_shift_buffer = time_shift_buffer # DVR window in seconds + + async def start_live_stream( + self, + input_source: str, + output_dir: Path, + stream_name: str, + bitrate_levels: List[BitrateLevel], + ) -> None: + """ + Start live DASH streaming. + + Args: + input_source: Input source (RTMP, file, device) + output_dir: Output directory + stream_name: Name of the stream + bitrate_levels: Bitrate levels for ABR streaming + """ + logger.info(f"Starting live DASH stream: {stream_name}") + + # Create output directory + dash_dir = output_dir / "dash_live" / stream_name + dash_dir.mkdir(parents=True, exist_ok=True) + + # Use FFmpeg to generate live DASH stream with multiple bitrates + cmd = [ + "ffmpeg", "-y", + "-i", input_source, + "-f", "dash", + "-seg_duration", str(self.segment_duration), + "-window_size", str(self.time_shift_buffer // self.segment_duration), + "-extra_window_size", "5", + "-remove_at_exit", "1", + ] + + # Add video streams for each bitrate level + for i, level in enumerate(bitrate_levels): + cmd.extend([ + "-map", "0:v:0", + f"-c:v:{i}", self._get_encoder_for_codec(level.codec), + f"-b:v:{i}", f"{level.bitrate}k", + f"-maxrate:v:{i}", f"{level.max_bitrate}k", + f"-s:v:{i}", f"{level.width}x{level.height}", + ]) + + # Add audio stream + cmd.extend([ + "-map", "0:a:0", + "-c:a", "aac", + "-b:a", "128k", + ]) + + # Output + manifest_path = dash_dir / f"{stream_name}.mpd" + cmd.append(str(manifest_path)) + + logger.info(f"Starting live DASH encoding") + + try: + # Start FFmpeg process + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Monitor process + stdout, stderr = await process.communicate() + + if process.returncode != 0: + error_msg = f"Live DASH streaming failed: {stderr.decode()}" + logger.error(error_msg) + raise FFmpegError(error_msg) + + except Exception as e: + logger.error(f"Live DASH stream error: {e}") + raise + + def _get_encoder_for_codec(self, codec: str) -> str: + """Get FFmpeg encoder for codec.""" + encoders = { + "h264": "libx264", + "hevc": "libx265", + "av1": "libaom-av1", + } + return encoders.get(codec, "libx264") \ No newline at end of file diff --git a/src/video_processor/streaming/hls.py b/src/video_processor/streaming/hls.py new file mode 100644 index 0000000..9fcdc54 --- /dev/null +++ b/src/video_processor/streaming/hls.py @@ -0,0 +1,262 @@ +"""HLS (HTTP Live Streaming) manifest generation and segmentation.""" + +import asyncio +import logging +import subprocess +from pathlib import Path +from typing import Dict, List + +from .adaptive import BitrateLevel +from ..config import ProcessorConfig +from ..exceptions import EncodingError, FFmpegError + +logger = logging.getLogger(__name__) + + +class HLSGenerator: + """Generates HLS playlists and segments from video renditions.""" + + def __init__(self, segment_duration: int = 6) -> None: + self.segment_duration = segment_duration + + async def create_master_playlist( + self, + output_dir: Path, + video_id: str, + bitrate_levels: List[BitrateLevel], + rendition_files: Dict[str, Path], + ) -> Path: + """ + Create HLS master playlist and segment all renditions. + + Args: + output_dir: Output directory + video_id: Video identifier + bitrate_levels: List of bitrate levels + rendition_files: Dictionary of rendition name to file path + + Returns: + Path to master playlist file + """ + logger.info(f"Creating HLS master playlist for {video_id}") + + # Create HLS directory + hls_dir = output_dir / "hls" + hls_dir.mkdir(exist_ok=True) + + # Generate segments for each rendition + playlist_info = [] + for level in bitrate_levels: + if level.name in rendition_files: + playlist_path = await self._create_rendition_playlist( + hls_dir, level, rendition_files[level.name] + ) + playlist_info.append((level, playlist_path)) + + # Create master playlist + master_playlist_path = hls_dir / f"{video_id}.m3u8" + await self._write_master_playlist(master_playlist_path, playlist_info) + + logger.info(f"HLS master playlist created: {master_playlist_path}") + return master_playlist_path + + async def _create_rendition_playlist( + self, hls_dir: Path, level: BitrateLevel, video_file: Path + ) -> Path: + """Create individual rendition playlist with segments.""" + rendition_dir = hls_dir / level.name + rendition_dir.mkdir(exist_ok=True) + + playlist_path = rendition_dir / f"{level.name}.m3u8" + segment_pattern = rendition_dir / f"{level.name}_%03d.ts" + + # Use FFmpeg to create HLS segments + cmd = [ + "ffmpeg", "-y", + "-i", str(video_file), + "-c", "copy", # Copy without re-encoding + "-hls_time", str(self.segment_duration), + "-hls_playlist_type", "vod", + "-hls_segment_filename", str(segment_pattern), + str(playlist_path), + ] + + try: + result = await asyncio.to_thread( + subprocess.run, cmd, capture_output=True, text=True, check=True + ) + logger.info(f"HLS segments created for {level.name}") + return playlist_path + + except subprocess.CalledProcessError as e: + error_msg = f"HLS segmentation failed for {level.name}: {e.stderr}" + logger.error(error_msg) + raise FFmpegError(error_msg) + + async def _write_master_playlist( + self, master_path: Path, playlist_info: List[tuple] + ) -> None: + """Write HLS master playlist file.""" + lines = ["#EXTM3U", "#EXT-X-VERSION:6"] + + for level, playlist_path in playlist_info: + # Calculate relative path from master playlist to rendition playlist + rel_path = playlist_path.relative_to(master_path.parent) + + lines.extend([ + f"#EXT-X-STREAM-INF:BANDWIDTH={level.bitrate * 1000}," + f"RESOLUTION={level.width}x{level.height}," + f"CODECS=\"{self._get_hls_codec_string(level.codec)}\"", + str(rel_path), + ]) + + content = "\n".join(lines) + "\n" + + await asyncio.to_thread(master_path.write_text, content) + logger.info(f"Master playlist written with {len(playlist_info)} renditions") + + def _get_hls_codec_string(self, codec: str) -> str: + """Get HLS codec string for manifest.""" + codec_strings = { + "h264": "avc1.42E01E", + "hevc": "hev1.1.6.L93.B0", + "av1": "av01.0.05M.08", + } + return codec_strings.get(codec, "avc1.42E01E") + + +class HLSLiveGenerator: + """Generates live HLS streams from real-time input.""" + + def __init__(self, segment_duration: int = 6, playlist_size: int = 10) -> None: + self.segment_duration = segment_duration + self.playlist_size = playlist_size # Number of segments to keep in playlist + + async def start_live_stream( + self, + input_source: str, # RTMP URL, camera device, etc. + output_dir: Path, + stream_name: str, + bitrate_levels: List[BitrateLevel], + ) -> None: + """ + Start live HLS streaming from input source. + + Args: + input_source: Input source (RTMP, file, device) + output_dir: Output directory for HLS files + stream_name: Name of the stream + bitrate_levels: Bitrate levels for ABR streaming + """ + logger.info(f"Starting live HLS stream: {stream_name}") + + # Create output directory + hls_dir = output_dir / "live" / stream_name + hls_dir.mkdir(parents=True, exist_ok=True) + + # Start FFmpeg process for live streaming + tasks = [] + for level in bitrate_levels: + task = asyncio.create_task( + self._start_live_rendition(input_source, hls_dir, level) + ) + tasks.append(task) + + # Create master playlist + master_playlist = hls_dir / f"{stream_name}.m3u8" + await self._create_live_master_playlist(master_playlist, bitrate_levels) + + # Wait for all streaming processes + try: + await asyncio.gather(*tasks) + except Exception as e: + logger.error(f"Live streaming error: {e}") + # Cancel all tasks + for task in tasks: + task.cancel() + raise + + async def _start_live_rendition( + self, input_source: str, hls_dir: Path, level: BitrateLevel + ) -> None: + """Start live streaming for a single bitrate level.""" + rendition_dir = hls_dir / level.name + rendition_dir.mkdir(exist_ok=True) + + playlist_path = rendition_dir / f"{level.name}.m3u8" + segment_pattern = rendition_dir / f"{level.name}_%03d.ts" + + cmd = [ + "ffmpeg", "-y", + "-i", input_source, + "-c:v", self._get_encoder_for_codec(level.codec), + "-b:v", f"{level.bitrate}k", + "-maxrate", f"{level.max_bitrate}k", + "-s", f"{level.width}x{level.height}", + "-c:a", "aac", "-b:a", "128k", + "-f", "hls", + "-hls_time", str(self.segment_duration), + "-hls_list_size", str(self.playlist_size), + "-hls_flags", "delete_segments", + "-hls_segment_filename", str(segment_pattern), + str(playlist_path), + ] + + logger.info(f"Starting live encoding for {level.name}") + + try: + # Start FFmpeg process + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Monitor process + stdout, stderr = await process.communicate() + + if process.returncode != 0: + error_msg = f"Live streaming failed for {level.name}: {stderr.decode()}" + logger.error(error_msg) + raise FFmpegError(error_msg) + + except Exception as e: + logger.error(f"Live rendition error for {level.name}: {e}") + raise + + async def _create_live_master_playlist( + self, master_path: Path, bitrate_levels: List[BitrateLevel] + ) -> None: + """Create master playlist for live streaming.""" + lines = ["#EXTM3U", "#EXT-X-VERSION:6"] + + for level in bitrate_levels: + rel_path = f"{level.name}/{level.name}.m3u8" + lines.extend([ + f"#EXT-X-STREAM-INF:BANDWIDTH={level.bitrate * 1000}," + f"RESOLUTION={level.width}x{level.height}," + f"CODECS=\"{self._get_hls_codec_string(level.codec)}\"", + rel_path, + ]) + + content = "\n".join(lines) + "\n" + await asyncio.to_thread(master_path.write_text, content) + logger.info("Live master playlist created") + + def _get_encoder_for_codec(self, codec: str) -> str: + """Get FFmpeg encoder for codec.""" + encoders = { + "h264": "libx264", + "hevc": "libx265", + "av1": "libaom-av1", + } + return encoders.get(codec, "libx264") + + def _get_hls_codec_string(self, codec: str) -> str: + """Get HLS codec string for manifest.""" + codec_strings = { + "h264": "avc1.42E01E", + "hevc": "hev1.1.6.L93.B0", + "av1": "av01.0.05M.08", + } + return codec_strings.get(codec, "avc1.42E01E") \ No newline at end of file diff --git a/tests/unit/test_adaptive_streaming.py b/tests/unit/test_adaptive_streaming.py new file mode 100644 index 0000000..cd95f59 --- /dev/null +++ b/tests/unit/test_adaptive_streaming.py @@ -0,0 +1,299 @@ +"""Tests for adaptive streaming functionality.""" + +import pytest +from pathlib import Path +from unittest.mock import Mock, patch, AsyncMock + +from video_processor.config import ProcessorConfig +from video_processor.streaming.adaptive import ( + AdaptiveStreamProcessor, + BitrateLevel, + StreamingPackage +) + + +class TestBitrateLevel: + """Test BitrateLevel dataclass.""" + + def test_bitrate_level_creation(self): + """Test BitrateLevel creation.""" + level = BitrateLevel( + name="720p", + width=1280, + height=720, + bitrate=3000, + max_bitrate=4500, + codec="h264", + container="mp4" + ) + + assert level.name == "720p" + assert level.width == 1280 + assert level.height == 720 + assert level.bitrate == 3000 + assert level.max_bitrate == 4500 + assert level.codec == "h264" + assert level.container == "mp4" + + +class TestStreamingPackage: + """Test StreamingPackage dataclass.""" + + def test_streaming_package_creation(self): + """Test StreamingPackage creation.""" + package = StreamingPackage( + video_id="test_video", + source_path=Path("input.mp4"), + output_dir=Path("/output"), + segment_duration=6 + ) + + assert package.video_id == "test_video" + assert package.source_path == Path("input.mp4") + assert package.output_dir == Path("/output") + assert package.segment_duration == 6 + assert package.hls_playlist is None + assert package.dash_manifest is None + + +class TestAdaptiveStreamProcessor: + """Test adaptive stream processor functionality.""" + + def test_initialization(self): + """Test processor initialization.""" + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + assert processor.config == config + assert processor.enable_ai_optimization in [True, False] # Depends on AI availability + + def test_initialization_with_ai_disabled(self): + """Test processor initialization with AI disabled.""" + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config, enable_ai_optimization=False) + + assert processor.enable_ai_optimization is False + assert processor.content_analyzer is None + + def test_get_streaming_capabilities(self): + """Test streaming capabilities reporting.""" + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + capabilities = processor.get_streaming_capabilities() + + assert isinstance(capabilities, dict) + assert "hls_streaming" in capabilities + assert "dash_streaming" in capabilities + assert "ai_optimization" in capabilities + assert "advanced_codecs" in capabilities + assert "thumbnail_tracks" in capabilities + assert "multi_bitrate" in capabilities + + def test_get_output_format_mapping(self): + """Test codec to output format mapping.""" + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + assert processor._get_output_format("h264") == "mp4" + assert processor._get_output_format("hevc") == "hevc" + assert processor._get_output_format("av1") == "av1_mp4" + assert processor._get_output_format("unknown") == "mp4" + + def test_get_quality_preset_for_bitrate(self): + """Test quality preset selection based on bitrate.""" + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + assert processor._get_quality_preset_for_bitrate(500) == "low" + assert processor._get_quality_preset_for_bitrate(2000) == "medium" + assert processor._get_quality_preset_for_bitrate(5000) == "high" + assert processor._get_quality_preset_for_bitrate(10000) == "ultra" + + def test_get_ffmpeg_options_for_level(self): + """Test FFmpeg options generation for bitrate levels.""" + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + level = BitrateLevel( + name="720p", width=1280, height=720, + bitrate=3000, max_bitrate=4500, + codec="h264", container="mp4" + ) + + options = processor._get_ffmpeg_options_for_level(level) + + assert options["b:v"] == "3000k" + assert options["maxrate"] == "4500k" + assert options["bufsize"] == "9000k" + assert options["s"] == "1280x720" + + @pytest.mark.asyncio + async def test_generate_optimal_bitrate_ladder_without_ai(self): + """Test bitrate ladder generation without AI analysis.""" + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config, enable_ai_optimization=False) + + levels = await processor._generate_optimal_bitrate_ladder(Path("test.mp4")) + + assert isinstance(levels, list) + assert len(levels) >= 1 + assert all(isinstance(level, BitrateLevel) for level in levels) + + @pytest.mark.asyncio + @patch('video_processor.streaming.adaptive.VideoContentAnalyzer') + async def test_generate_optimal_bitrate_ladder_with_ai(self, mock_analyzer_class): + """Test bitrate ladder generation with AI analysis.""" + # Mock AI analyzer + mock_analyzer = Mock() + mock_analysis = Mock() + mock_analysis.resolution = (1920, 1080) + mock_analysis.motion_intensity = 0.8 + mock_analyzer.analyze_content = AsyncMock(return_value=mock_analysis) + mock_analyzer_class.return_value = mock_analyzer + + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config, enable_ai_optimization=True) + processor.content_analyzer = mock_analyzer + + levels = await processor._generate_optimal_bitrate_ladder(Path("test.mp4")) + + assert isinstance(levels, list) + assert len(levels) >= 1 + + # Check that bitrates were adjusted for high motion + for level in levels: + assert level.bitrate > 0 + assert level.max_bitrate > level.bitrate + + @pytest.mark.asyncio + @patch('video_processor.streaming.adaptive.VideoProcessor') + @patch('video_processor.streaming.adaptive.asyncio.to_thread') + async def test_generate_bitrate_renditions(self, mock_to_thread, mock_processor_class): + """Test bitrate rendition generation.""" + # Mock VideoProcessor + mock_result = Mock() + mock_result.encoded_files = {"mp4": Path("/output/test.mp4")} + mock_processor_instance = Mock() + mock_processor_instance.process_video.return_value = mock_result + mock_processor_class.return_value = mock_processor_instance + mock_to_thread.return_value = mock_result + + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + bitrate_levels = [ + BitrateLevel("480p", 854, 480, 1500, 2250, "h264", "mp4"), + BitrateLevel("720p", 1280, 720, 3000, 4500, "h264", "mp4"), + ] + + with patch('pathlib.Path.mkdir'): + rendition_files = await processor._generate_bitrate_renditions( + Path("input.mp4"), Path("/output"), "test_video", bitrate_levels + ) + + assert isinstance(rendition_files, dict) + assert len(rendition_files) == 2 + assert "480p" in rendition_files + assert "720p" in rendition_files + + @pytest.mark.asyncio + @patch('video_processor.streaming.adaptive.asyncio.to_thread') + async def test_generate_thumbnail_track(self, mock_to_thread): + """Test thumbnail track generation.""" + # Mock VideoProcessor result + mock_result = Mock() + mock_result.sprite_file = Path("/output/sprite.jpg") + mock_to_thread.return_value = mock_result + + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + with patch('video_processor.streaming.adaptive.VideoProcessor'): + thumbnail_track = await processor._generate_thumbnail_track( + Path("input.mp4"), Path("/output"), "test_video" + ) + + assert thumbnail_track == Path("/output/sprite.jpg") + + @pytest.mark.asyncio + @patch('video_processor.streaming.adaptive.asyncio.to_thread') + async def test_generate_thumbnail_track_failure(self, mock_to_thread): + """Test thumbnail track generation failure.""" + mock_to_thread.side_effect = Exception("Thumbnail generation failed") + + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + with patch('video_processor.streaming.adaptive.VideoProcessor'): + thumbnail_track = await processor._generate_thumbnail_track( + Path("input.mp4"), Path("/output"), "test_video" + ) + + assert thumbnail_track is None + + @pytest.mark.asyncio + @patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_hls_playlist') + @patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_dash_manifest') + @patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_thumbnail_track') + @patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_bitrate_renditions') + @patch('video_processor.streaming.adaptive.AdaptiveStreamProcessor._generate_optimal_bitrate_ladder') + async def test_create_adaptive_stream( + self, mock_ladder, mock_renditions, mock_thumbnail, mock_dash, mock_hls + ): + """Test complete adaptive stream creation.""" + # Setup mocks + mock_bitrate_levels = [ + BitrateLevel("720p", 1280, 720, 3000, 4500, "h264", "mp4") + ] + mock_rendition_files = {"720p": Path("/output/720p.mp4")} + + mock_ladder.return_value = mock_bitrate_levels + mock_renditions.return_value = mock_rendition_files + mock_thumbnail.return_value = Path("/output/sprite.jpg") + mock_hls.return_value = Path("/output/playlist.m3u8") + mock_dash.return_value = Path("/output/manifest.mpd") + + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + with patch('pathlib.Path.mkdir'): + result = await processor.create_adaptive_stream( + Path("input.mp4"), + Path("/output"), + "test_video", + ["hls", "dash"] + ) + + assert isinstance(result, StreamingPackage) + assert result.video_id == "test_video" + assert result.hls_playlist == Path("/output/playlist.m3u8") + assert result.dash_manifest == Path("/output/manifest.mpd") + assert result.thumbnail_track == Path("/output/sprite.jpg") + assert result.bitrate_levels == mock_bitrate_levels + + @pytest.mark.asyncio + async def test_create_adaptive_stream_with_custom_ladder(self): + """Test adaptive stream creation with custom bitrate ladder.""" + custom_levels = [ + BitrateLevel("480p", 854, 480, 1500, 2250, "h264", "mp4"), + ] + + config = ProcessorConfig() + processor = AdaptiveStreamProcessor(config) + + with patch.multiple( + processor, + _generate_bitrate_renditions=AsyncMock(return_value={"480p": Path("test.mp4")}), + _generate_hls_playlist=AsyncMock(return_value=Path("playlist.m3u8")), + _generate_dash_manifest=AsyncMock(return_value=Path("manifest.mpd")), + _generate_thumbnail_track=AsyncMock(return_value=Path("sprite.jpg")), + ), patch('pathlib.Path.mkdir'): + result = await processor.create_adaptive_stream( + Path("input.mp4"), + Path("/output"), + "test_video", + custom_bitrate_ladder=custom_levels + ) + + assert result.bitrate_levels == custom_levels \ No newline at end of file