diff --git a/CLAUDE.md b/CLAUDE.md index e7778fd..1fbf304 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -82,6 +82,8 @@ uv publish 5. **Format Conversion**: `pdf_to_markdown` - Clean markdown with MCP resource URIs for images 6. **Image Processing**: `extract_images` - Extract images with custom output paths and clean summary output 7. **PDF Forms**: `extract_form_data`, `create_form_pdf`, `fill_form_pdf`, `add_form_fields` - Complete form lifecycle management +8. **Document Assembly**: `merge_pdfs`, `split_pdf_by_pages`, `reorder_pdf_pages` - PDF manipulation and organization +9. **Annotations & Markup**: `add_sticky_notes`, `add_highlights`, `add_stamps`, `extract_all_annotations` - Collaboration and review tools ### MCP Client-Friendly Design @@ -158,8 +160,98 @@ The server provides comprehensive PDF form capabilities: - Identify field types and constraints - Form validation and structure analysis +### PDF Document Assembly + +The server provides comprehensive document organization capabilities: + +**PDF Merging (`merge_pdfs`)**: +- Combine multiple PDFs into single document +- Preserve bookmarks with automatic page number adjustment +- Generate table of contents from source filenames +- Optional page numbering for merged documents +- Intelligent error handling for problematic files + +**Page Range Splitting (`split_pdf_by_pages`)**: +- Split PDFs by custom page ranges (1-5, 6-10, 11-end) +- Flexible naming patterns with placeholders +- Preserve relevant bookmarks in each split +- Support for single pages and "end" keyword + +**Bookmark-Based Splitting (`split_pdf_by_bookmarks`)**: +- Automatically split at bookmark boundaries +- Configurable bookmark levels (chapters vs sections) +- Clean filename generation from bookmark titles +- Preserve document structure in splits + +**Page Reordering (`reorder_pdf_pages`)**: +- Rearrange pages in any custom sequence +- Support for page duplication and omission +- Automatic bookmark reference adjustment +- Detailed tracking of page transformations + ### Docker Support The project includes Docker support with all system dependencies pre-installed, useful for consistent cross-platform development and deployment. ### MCP Integration -Tools are registered using FastMCP decorators and follow MCP protocol standards for tool descriptions and parameter validation. \ No newline at end of file +Tools are registered using FastMCP decorators and follow MCP protocol standards for tool descriptions and parameter validation. + +## Future Enhancement Ideas + +Based on comprehensive PDF usage patterns, here are potential high-impact features for future development: + +### 🎯 Priority 1: Document Assembly & Merging +- `merge_pdfs` - Combine multiple PDFs with bookmarks preservation +- `split_pdf_by_pages` - Extract specific page ranges +- `split_pdf_by_bookmarks` - Auto-split by chapters/sections +- `insert_pdf_pages` - Insert pages at specific positions +- `reorder_pdf_pages` - Drag-and-drop style page reordering + +### 🔒 Priority 2: Digital Signatures & Security +- `add_digital_signature` - Sign with digital certificates +- `verify_pdf_signatures` - Validate signature authenticity +- `add_password_protection` - Encrypt with user/owner passwords +- `remove_pdf_passwords` - Decrypt protected PDFs +- `set_pdf_permissions` - Control print/copy/edit rights +- `redact_sensitive_data` - Black out confidential information + +### ✏️ Priority 3: Advanced Annotations & Markup +- `add_sticky_notes` - Comments and reviews +- `add_highlights` - Text highlighting with colors +- `add_stamps` - Approved/Draft/Confidential stamps +- `add_drawings` - Freehand annotations and shapes +- `extract_all_annotations` - Export comments to JSON/CSV + +### 🔍 Priority 4: Document Comparison & Analysis +- `compare_pdf_versions` - Visual diff between document versions +- `detect_pdf_changes` - Highlight additions/deletions +- `analyze_reading_order` - Accessibility compliance checking +- `extract_pdf_statistics` - Word count, reading time, complexity +- `detect_pdf_quality_issues` - Scan for structural problems + +### 📄 Priority 5: Advanced Content Extraction +- `extract_pdf_links` - All URLs and internal links +- `extract_pdf_fonts` - Font usage analysis +- `extract_pdf_colors` - Color palette extraction +- `extract_pdf_layers` - CAD/design layer information +- `convert_pdf_to_formats` - Word, Excel, PowerPoint, HTML conversion + +### ⚡ Priority 6: Batch Operations & Automation +- `batch_process_pdfs` - Apply operations to multiple files +- `create_pdf_portfolio` - Combine different file types +- `auto_ocr_detection` - Smart OCR for scanned pages only +- `optimize_pdf_size` - Intelligent compression algorithms +- `standardize_pdf_metadata` - Bulk metadata updates + +### 🚀 Innovative Features +- `ai_summarize_pdf` - Generate executive summaries +- `translate_pdf_text` - Multi-language document translation +- `create_pdf_quiz` - Auto-generate questions from content +- `extract_pdf_timeline` - Parse dates and create chronologies +- `analyze_pdf_accessibility` - WCAG compliance checking + +### Implementation Notes +- **Document Assembly** features are universally needed and should be prioritized +- **Digital Signatures** provide high enterprise value +- **Batch Operations** essential for automation workflows +- All features should maintain MCP protocol standards and clean output formatting +- Consider user experience and context window optimization for each tool \ No newline at end of file diff --git a/src/mcp_pdf_tools/server.py b/src/mcp_pdf_tools/server.py index 411dfe5..755fd95 100644 --- a/src/mcp_pdf_tools/server.py +++ b/src/mcp_pdf_tools/server.py @@ -4121,6 +4121,1179 @@ async def add_field_validation( except Exception as e: return {"error": f"Adding field validation failed: {str(e)}", "addition_time": round(time.time() - start_time, 2)} +@mcp.tool(name="merge_pdfs_advanced", description="Advanced PDF merging with bookmark preservation and options") +async def merge_pdfs_advanced( + input_paths: str, # JSON array of PDF file paths + output_path: str, + preserve_bookmarks: bool = True, + add_page_numbers: bool = False, + include_toc: bool = False +) -> Dict[str, Any]: + """ + Merge multiple PDF files into a single document + + Args: + input_paths: JSON array of PDF file paths to merge + output_path: Path where merged PDF should be saved + preserve_bookmarks: Whether to preserve existing bookmarks + add_page_numbers: Whether to add page numbers to merged document + include_toc: Whether to generate table of contents with source filenames + + Returns: + Dictionary containing merge results + """ + import json + import time + start_time = time.time() + + try: + # Parse input paths + try: + pdf_paths = json.loads(input_paths) if input_paths else [] + except json.JSONDecodeError as e: + return {"error": f"Invalid input paths JSON: {str(e)}", "merge_time": 0} + + if len(pdf_paths) < 2: + return {"error": "At least 2 PDF files are required for merging", "merge_time": 0} + + # Validate all input paths + validated_paths = [] + for pdf_path in pdf_paths: + try: + validated_path = await validate_pdf_path(pdf_path) + validated_paths.append(validated_path) + except Exception as e: + return {"error": f"Invalid PDF path '{pdf_path}': {str(e)}", "merge_time": 0} + + # Create output document + merged_doc = fitz.open() + merge_info = { + "files_merged": [], + "total_pages": 0, + "bookmarks_preserved": 0, + "merge_errors": [] + } + + current_page_offset = 0 + + # Process each PDF + for i, pdf_path in enumerate(validated_paths): + try: + doc = fitz.open(str(pdf_path)) + filename = Path(pdf_path).name + + # Insert pages + merged_doc.insert_pdf(doc, from_page=0, to_page=doc.page_count - 1) + + # Handle bookmarks + if preserve_bookmarks and doc.get_toc(): + toc = doc.get_toc() + # Adjust bookmark page numbers for merged document + adjusted_toc = [] + for level, title, page_num in toc: + adjusted_toc.append([level, title, page_num + current_page_offset]) + + # Add adjusted bookmarks to merged document + existing_toc = merged_doc.get_toc() + existing_toc.extend(adjusted_toc) + merged_doc.set_toc(existing_toc) + merge_info["bookmarks_preserved"] += len(toc) + + # Add table of contents entry for source file + if include_toc: + toc_entry = [1, f"Document {i+1}: {filename}", current_page_offset + 1] + existing_toc = merged_doc.get_toc() + existing_toc.append(toc_entry) + merged_doc.set_toc(existing_toc) + + merge_info["files_merged"].append({ + "filename": filename, + "pages": doc.page_count, + "page_range": f"{current_page_offset + 1}-{current_page_offset + doc.page_count}" + }) + + current_page_offset += doc.page_count + doc.close() + + except Exception as e: + merge_info["merge_errors"].append({ + "filename": Path(pdf_path).name, + "error": str(e) + }) + + # Add page numbers if requested + if add_page_numbers: + for page_num in range(merged_doc.page_count): + page = merged_doc[page_num] + page_rect = page.rect + + # Add page number at bottom center + page_text = f"Page {page_num + 1}" + text_pos = (page_rect.width / 2 - 20, page_rect.height - 20) + page.insert_text(text_pos, page_text, fontname="helv", fontsize=10, color=(0.5, 0.5, 0.5)) + + merge_info["total_pages"] = merged_doc.page_count + + # Ensure output directory exists + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Save merged PDF + merged_doc.save(str(output_file), garbage=4, deflate=True, clean=True) + merged_doc.close() + + file_size = output_file.stat().st_size + + return { + "output_path": str(output_file), + "files_processed": len(pdf_paths), + "files_successfully_merged": len(merge_info["files_merged"]), + "merge_details": merge_info, + "total_pages": merge_info["total_pages"], + "bookmarks_preserved": merge_info["bookmarks_preserved"], + "page_numbers_added": add_page_numbers, + "toc_generated": include_toc, + "file_size": format_file_size(file_size), + "merge_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"PDF merge failed: {str(e)}", "merge_time": round(time.time() - start_time, 2)} + +@mcp.tool(name="split_pdf_by_pages", description="Split PDF into separate files by page ranges") +async def split_pdf_by_pages( + input_path: str, + output_directory: str, + page_ranges: str, # JSON array of ranges like ["1-5", "6-10", "11-end"] + naming_pattern: str = "page_{start}-{end}.pdf" +) -> Dict[str, Any]: + """ + Split PDF into separate files by specified page ranges + + Args: + input_path: Path to the PDF file to split + output_directory: Directory where split files should be saved + page_ranges: JSON array of page ranges (1-indexed) + naming_pattern: Pattern for output filenames with {start}, {end}, {index} placeholders + + Returns: + Dictionary containing split results + """ + import json + import time + start_time = time.time() + + try: + # Parse page ranges + try: + ranges = json.loads(page_ranges) if page_ranges else [] + except json.JSONDecodeError as e: + return {"error": f"Invalid page ranges JSON: {str(e)}", "split_time": 0} + + if not ranges: + return {"error": "At least one page range is required", "split_time": 0} + + # Validate input path + input_file = await validate_pdf_path(input_path) + doc = fitz.open(str(input_file)) + total_pages = doc.page_count + + # Create output directory + output_dir = Path(output_directory) + output_dir.mkdir(parents=True, exist_ok=True) + + split_info = { + "files_created": [], + "split_errors": [], + "total_pages_processed": 0 + } + + # Process each range + for i, range_str in enumerate(ranges): + try: + # Parse range string + if range_str.lower() == "all": + start_page = 1 + end_page = total_pages + elif "-" in range_str: + parts = range_str.split("-", 1) + start_page = int(parts[0]) + if parts[1].lower() == "end": + end_page = total_pages + else: + end_page = int(parts[1]) + else: + # Single page + start_page = end_page = int(range_str) + + # Validate page numbers (convert to 0-indexed for PyMuPDF) + if start_page < 1 or start_page > total_pages: + split_info["split_errors"].append({ + "range": range_str, + "error": f"Start page {start_page} out of range (1-{total_pages})" + }) + continue + + if end_page < 1 or end_page > total_pages: + split_info["split_errors"].append({ + "range": range_str, + "error": f"End page {end_page} out of range (1-{total_pages})" + }) + continue + + if start_page > end_page: + split_info["split_errors"].append({ + "range": range_str, + "error": f"Start page {start_page} greater than end page {end_page}" + }) + continue + + # Create output filename + output_filename = naming_pattern.format( + start=start_page, + end=end_page, + index=i+1, + original=Path(input_file).stem + ) + output_path = output_dir / output_filename + + # Create new document with specified pages + new_doc = fitz.open() + new_doc.insert_pdf(doc, from_page=start_page-1, to_page=end_page-1) + + # Copy relevant bookmarks + original_toc = doc.get_toc() + if original_toc: + filtered_toc = [] + for level, title, page_num in original_toc: + # Adjust page numbers and include only relevant bookmarks + if start_page <= page_num <= end_page: + adjusted_page = page_num - start_page + 1 + filtered_toc.append([level, title, adjusted_page]) + + if filtered_toc: + new_doc.set_toc(filtered_toc) + + # Save split document + new_doc.save(str(output_path), garbage=4, deflate=True, clean=True) + new_doc.close() + + file_size = output_path.stat().st_size + pages_in_range = end_page - start_page + 1 + + split_info["files_created"].append({ + "filename": output_filename, + "page_range": f"{start_page}-{end_page}", + "pages": pages_in_range, + "file_size": format_file_size(file_size), + "output_path": str(output_path) + }) + + split_info["total_pages_processed"] += pages_in_range + + except ValueError as e: + split_info["split_errors"].append({ + "range": range_str, + "error": f"Invalid range format: {str(e)}" + }) + except Exception as e: + split_info["split_errors"].append({ + "range": range_str, + "error": f"Split failed: {str(e)}" + }) + + doc.close() + + return { + "input_path": str(input_file), + "output_directory": str(output_dir), + "total_input_pages": total_pages, + "files_created": len(split_info["files_created"]), + "files_failed": len(split_info["split_errors"]), + "split_details": split_info, + "naming_pattern": naming_pattern, + "split_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"PDF split failed: {str(e)}", "split_time": round(time.time() - start_time, 2)} + +@mcp.tool(name="reorder_pdf_pages", description="Reorder pages in a PDF document") +async def reorder_pdf_pages( + input_path: str, + output_path: str, + page_order: str # JSON array of page numbers in desired order +) -> Dict[str, Any]: + """ + Reorder pages in a PDF document according to specified sequence + + Args: + input_path: Path to the PDF file to reorder + output_path: Path where reordered PDF should be saved + page_order: JSON array of page numbers in desired order (1-indexed) + + Returns: + Dictionary containing reorder results + """ + import json + import time + start_time = time.time() + + try: + # Parse page order + try: + order = json.loads(page_order) if page_order else [] + except json.JSONDecodeError as e: + return {"error": f"Invalid page order JSON: {str(e)}", "reorder_time": 0} + + if not order: + return {"error": "Page order array is required", "reorder_time": 0} + + # Validate input path + input_file = await validate_pdf_path(input_path) + doc = fitz.open(str(input_file)) + total_pages = doc.page_count + + # Validate page numbers + invalid_pages = [] + for page_num in order: + if not isinstance(page_num, int) or page_num < 1 or page_num > total_pages: + invalid_pages.append(page_num) + + if invalid_pages: + doc.close() + return {"error": f"Invalid page numbers: {invalid_pages}. Pages must be 1-{total_pages}", "reorder_time": 0} + + # Create new document with reordered pages + new_doc = fitz.open() + + reorder_info = { + "pages_processed": 0, + "original_order": list(range(1, total_pages + 1)), + "new_order": order, + "pages_duplicated": [], + "pages_omitted": [] + } + + # Track which pages are used + pages_used = set() + + # Insert pages in specified order + for new_position, original_page in enumerate(order, 1): + # Convert to 0-indexed for PyMuPDF + page_index = original_page - 1 + + # Insert the page + new_doc.insert_pdf(doc, from_page=page_index, to_page=page_index) + + # Track usage + if original_page in pages_used: + reorder_info["pages_duplicated"].append(original_page) + else: + pages_used.add(original_page) + + reorder_info["pages_processed"] += 1 + + # Find omitted pages + all_pages = set(range(1, total_pages + 1)) + reorder_info["pages_omitted"] = list(all_pages - pages_used) + + # Handle bookmarks - adjust page references + original_toc = doc.get_toc() + if original_toc: + new_toc = [] + for level, title, original_page_ref in original_toc: + # Find new position of the referenced page + try: + new_page_ref = order.index(original_page_ref) + 1 + new_toc.append([level, title, new_page_ref]) + except ValueError: + # Page was omitted, skip this bookmark + pass + + if new_toc: + new_doc.set_toc(new_toc) + + # Ensure output directory exists + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Save reordered PDF + new_doc.save(str(output_file), garbage=4, deflate=True, clean=True) + + doc.close() + new_doc.close() + + file_size = output_file.stat().st_size + + return { + "input_path": str(input_file), + "output_path": str(output_file), + "original_pages": total_pages, + "reordered_pages": len(order), + "reorder_details": reorder_info, + "pages_duplicated": len(reorder_info["pages_duplicated"]), + "pages_omitted": len(reorder_info["pages_omitted"]), + "file_size": format_file_size(file_size), + "reorder_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"PDF page reorder failed: {str(e)}", "reorder_time": round(time.time() - start_time, 2)} + +@mcp.tool(name="split_pdf_by_bookmarks", description="Split PDF into separate files using bookmarks as breakpoints") +async def split_pdf_by_bookmarks( + input_path: str, + output_directory: str, + bookmark_level: int = 1, + naming_pattern: str = "{title}.pdf" +) -> Dict[str, Any]: + """ + Split PDF into separate files using bookmarks as natural breakpoints + + Args: + input_path: Path to the PDF file to split + output_directory: Directory where split files should be saved + bookmark_level: Which bookmark level to use for splitting (1=chapters, 2=sections) + naming_pattern: Pattern for output filenames with {title}, {index} placeholders + + Returns: + Dictionary containing split results + """ + import time + import re + start_time = time.time() + + try: + # Validate input path + input_file = await validate_pdf_path(input_path) + doc = fitz.open(str(input_file)) + + # Get table of contents + toc = doc.get_toc() + if not toc: + doc.close() + return {"error": "PDF has no bookmarks for splitting", "split_time": 0} + + # Filter bookmarks by level + split_points = [] + for level, title, page_num in toc: + if level == bookmark_level: + split_points.append((title, page_num)) + + if len(split_points) < 2: + doc.close() + return {"error": f"Not enough level-{bookmark_level} bookmarks for splitting (found {len(split_points)})", "split_time": 0} + + # Create output directory + output_dir = Path(output_directory) + output_dir.mkdir(parents=True, exist_ok=True) + + split_info = { + "files_created": [], + "split_errors": [], + "total_pages_processed": 0 + } + + total_pages = doc.page_count + + # Process each bookmark section + for i, (title, start_page) in enumerate(split_points): + try: + # Determine end page + if i + 1 < len(split_points): + end_page = split_points[i + 1][1] - 1 + else: + end_page = total_pages + + # Clean title for filename + clean_title = re.sub(r'[^\w\s-]', '', title).strip() + clean_title = re.sub(r'\s+', '_', clean_title) + if not clean_title: + clean_title = f"section_{i+1}" + + # Create output filename + output_filename = naming_pattern.format( + title=clean_title, + index=i+1, + original=Path(input_file).stem + ) + + # Ensure .pdf extension + if not output_filename.lower().endswith('.pdf'): + output_filename += '.pdf' + + output_path = output_dir / output_filename + + # Create new document with bookmark section + new_doc = fitz.open() + new_doc.insert_pdf(doc, from_page=start_page-1, to_page=end_page-1) + + # Add relevant bookmarks to new document + section_toc = [] + for level, bookmark_title, page_num in toc: + if start_page <= page_num <= end_page: + adjusted_page = page_num - start_page + 1 + section_toc.append([level, bookmark_title, adjusted_page]) + + if section_toc: + new_doc.set_toc(section_toc) + + # Save split document + new_doc.save(str(output_path), garbage=4, deflate=True, clean=True) + new_doc.close() + + file_size = output_path.stat().st_size + pages_in_section = end_page - start_page + 1 + + split_info["files_created"].append({ + "filename": output_filename, + "bookmark_title": title, + "page_range": f"{start_page}-{end_page}", + "pages": pages_in_section, + "file_size": format_file_size(file_size), + "output_path": str(output_path) + }) + + split_info["total_pages_processed"] += pages_in_section + + except Exception as e: + split_info["split_errors"].append({ + "bookmark_title": title, + "error": f"Split failed: {str(e)}" + }) + + doc.close() + + return { + "input_path": str(input_file), + "output_directory": str(output_dir), + "bookmark_level_used": bookmark_level, + "bookmarks_found": len(split_points), + "files_created": len(split_info["files_created"]), + "files_failed": len(split_info["split_errors"]), + "split_details": split_info, + "naming_pattern": naming_pattern, + "split_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"Bookmark-based PDF split failed: {str(e)}", "split_time": round(time.time() - start_time, 2)} + +@mcp.tool(name="add_sticky_notes", description="Add sticky note comments to specific locations in PDF") +async def add_sticky_notes( + input_path: str, + output_path: str, + notes: str # JSON array of note definitions +) -> Dict[str, Any]: + """ + Add sticky note annotations to PDF at specified locations + + Args: + input_path: Path to the existing PDF + output_path: Path where PDF with notes should be saved + notes: JSON array of note definitions + + Note format: + [ + { + "page": 1, + "x": 100, "y": 200, + "content": "This is a note", + "author": "John Doe", + "subject": "Review Comment", + "color": "yellow" + } + ] + + Returns: + Dictionary containing annotation results + """ + import json + import time + start_time = time.time() + + try: + # Parse notes + try: + note_definitions = json.loads(notes) if notes else [] + except json.JSONDecodeError as e: + return {"error": f"Invalid notes JSON: {str(e)}", "annotation_time": 0} + + if not note_definitions: + return {"error": "At least one note is required", "annotation_time": 0} + + # Validate input path + input_file = await validate_pdf_path(input_path) + doc = fitz.open(str(input_file)) + + annotation_info = { + "notes_added": [], + "annotation_errors": [] + } + + # Color mapping + color_map = { + "yellow": (1, 1, 0), + "red": (1, 0, 0), + "green": (0, 1, 0), + "blue": (0, 0, 1), + "orange": (1, 0.5, 0), + "purple": (0.5, 0, 1), + "pink": (1, 0.75, 0.8), + "gray": (0.5, 0.5, 0.5) + } + + # Process each note + for i, note_def in enumerate(note_definitions): + try: + page_num = note_def.get("page", 1) - 1 # Convert to 0-indexed + x = note_def.get("x", 100) + y = note_def.get("y", 100) + content = note_def.get("content", "") + author = note_def.get("author", "Anonymous") + subject = note_def.get("subject", "Note") + color_name = note_def.get("color", "yellow").lower() + + # Validate page number + if page_num >= len(doc) or page_num < 0: + annotation_info["annotation_errors"].append({ + "note_index": i, + "error": f"Page {page_num + 1} does not exist" + }) + continue + + page = doc[page_num] + + # Get color + color = color_map.get(color_name, (1, 1, 0)) # Default to yellow + + # Create sticky note annotation + note_rect = fitz.Rect(x, y, x + 20, y + 20) # Small icon size + + # Create text annotation (sticky note) + annot = page.add_text_annot(fitz.Point(x, y), content) + annot.set_info(content=content, title=subject) + annot.set_colors(stroke=color) + annot.set_flags(fitz.PDF_ANNOT_IS_PRINT) # Make it printable + annot.update() + + annotation_info["notes_added"].append({ + "page": page_num + 1, + "position": {"x": x, "y": y}, + "content": content[:50] + "..." if len(content) > 50 else content, + "author": author, + "subject": subject, + "color": color_name + }) + + except Exception as e: + annotation_info["annotation_errors"].append({ + "note_index": i, + "error": f"Failed to add note: {str(e)}" + }) + + # Ensure output directory exists + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Save PDF with annotations + doc.save(str(output_file), garbage=4, deflate=True, clean=True) + doc.close() + + file_size = output_file.stat().st_size + + return { + "input_path": str(input_file), + "output_path": str(output_file), + "notes_requested": len(note_definitions), + "notes_added": len(annotation_info["notes_added"]), + "notes_failed": len(annotation_info["annotation_errors"]), + "annotation_details": annotation_info, + "file_size": format_file_size(file_size), + "annotation_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"Adding sticky notes failed: {str(e)}", "annotation_time": round(time.time() - start_time, 2)} + +@mcp.tool(name="add_highlights", description="Add text highlights to specific text or areas in PDF") +async def add_highlights( + input_path: str, + output_path: str, + highlights: str # JSON array of highlight definitions +) -> Dict[str, Any]: + """ + Add highlight annotations to PDF text or specific areas + + Args: + input_path: Path to the existing PDF + output_path: Path where PDF with highlights should be saved + highlights: JSON array of highlight definitions + + Highlight format: + [ + { + "page": 1, + "text": "text to highlight", // Optional: search for this text + "rect": [x0, y0, x1, y1], // Optional: specific rectangle + "color": "yellow", + "author": "John Doe", + "note": "Important point" + } + ] + + Returns: + Dictionary containing highlight results + """ + import json + import time + start_time = time.time() + + try: + # Parse highlights + try: + highlight_definitions = json.loads(highlights) if highlights else [] + except json.JSONDecodeError as e: + return {"error": f"Invalid highlights JSON: {str(e)}", "highlight_time": 0} + + if not highlight_definitions: + return {"error": "At least one highlight is required", "highlight_time": 0} + + # Validate input path + input_file = await validate_pdf_path(input_path) + doc = fitz.open(str(input_file)) + + highlight_info = { + "highlights_added": [], + "highlight_errors": [] + } + + # Color mapping + color_map = { + "yellow": (1, 1, 0), + "red": (1, 0, 0), + "green": (0, 1, 0), + "blue": (0, 0, 1), + "orange": (1, 0.5, 0), + "purple": (0.5, 0, 1), + "pink": (1, 0.75, 0.8) + } + + # Process each highlight + for i, highlight_def in enumerate(highlight_definitions): + try: + page_num = highlight_def.get("page", 1) - 1 # Convert to 0-indexed + text_to_find = highlight_def.get("text", "") + rect_coords = highlight_def.get("rect", None) + color_name = highlight_def.get("color", "yellow").lower() + author = highlight_def.get("author", "Anonymous") + note = highlight_def.get("note", "") + + # Validate page number + if page_num >= len(doc) or page_num < 0: + highlight_info["highlight_errors"].append({ + "highlight_index": i, + "error": f"Page {page_num + 1} does not exist" + }) + continue + + page = doc[page_num] + color = color_map.get(color_name, (1, 1, 0)) + + highlights_added_this_item = 0 + + # Method 1: Search for text and highlight + if text_to_find: + text_instances = page.search_for(text_to_find) + for rect in text_instances: + # Create highlight annotation + annot = page.add_highlight_annot(rect) + annot.set_colors(stroke=color) + annot.set_info(content=note) + annot.update() + highlights_added_this_item += 1 + + # Method 2: Highlight specific rectangle + elif rect_coords and len(rect_coords) == 4: + highlight_rect = fitz.Rect(rect_coords[0], rect_coords[1], + rect_coords[2], rect_coords[3]) + annot = page.add_highlight_annot(highlight_rect) + annot.set_colors(stroke=color) + annot.set_info(content=note) + annot.update() + highlights_added_this_item += 1 + + else: + highlight_info["highlight_errors"].append({ + "highlight_index": i, + "error": "Must specify either 'text' to search for or 'rect' coordinates" + }) + continue + + if highlights_added_this_item > 0: + highlight_info["highlights_added"].append({ + "page": page_num + 1, + "text_searched": text_to_find, + "rect_used": rect_coords, + "instances_highlighted": highlights_added_this_item, + "color": color_name, + "author": author, + "note": note[:50] + "..." if len(note) > 50 else note + }) + else: + highlight_info["highlight_errors"].append({ + "highlight_index": i, + "error": f"No text found to highlight: '{text_to_find}'" + }) + + except Exception as e: + highlight_info["highlight_errors"].append({ + "highlight_index": i, + "error": f"Failed to add highlight: {str(e)}" + }) + + # Ensure output directory exists + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Save PDF with highlights + doc.save(str(output_file), garbage=4, deflate=True, clean=True) + doc.close() + + file_size = output_file.stat().st_size + + return { + "input_path": str(input_file), + "output_path": str(output_file), + "highlights_requested": len(highlight_definitions), + "highlights_added": len(highlight_info["highlights_added"]), + "highlights_failed": len(highlight_info["highlight_errors"]), + "highlight_details": highlight_info, + "file_size": format_file_size(file_size), + "highlight_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"Adding highlights failed: {str(e)}", "highlight_time": round(time.time() - start_time, 2)} + +@mcp.tool(name="add_stamps", description="Add approval stamps (Approved, Draft, Confidential, etc) to PDF") +async def add_stamps( + input_path: str, + output_path: str, + stamps: str # JSON array of stamp definitions +) -> Dict[str, Any]: + """ + Add stamp annotations to PDF (Approved, Draft, Confidential, etc) + + Args: + input_path: Path to the existing PDF + output_path: Path where PDF with stamps should be saved + stamps: JSON array of stamp definitions + + Stamp format: + [ + { + "page": 1, + "x": 400, "y": 700, + "stamp_type": "APPROVED", // APPROVED, DRAFT, CONFIDENTIAL, REVIEWED, etc + "size": "large", // small, medium, large + "rotation": 0, // degrees + "opacity": 0.7 + } + ] + + Returns: + Dictionary containing stamp results + """ + import json + import time + start_time = time.time() + + try: + # Parse stamps + try: + stamp_definitions = json.loads(stamps) if stamps else [] + except json.JSONDecodeError as e: + return {"error": f"Invalid stamps JSON: {str(e)}", "stamp_time": 0} + + if not stamp_definitions: + return {"error": "At least one stamp is required", "stamp_time": 0} + + # Validate input path + input_file = await validate_pdf_path(input_path) + doc = fitz.open(str(input_file)) + + stamp_info = { + "stamps_added": [], + "stamp_errors": [] + } + + # Predefined stamp types with colors and text + stamp_types = { + "APPROVED": {"text": "APPROVED", "color": (0, 0.7, 0), "border_color": (0, 0.5, 0)}, + "REJECTED": {"text": "REJECTED", "color": (0.8, 0, 0), "border_color": (0.6, 0, 0)}, + "DRAFT": {"text": "DRAFT", "color": (0.8, 0.4, 0), "border_color": (0.6, 0.3, 0)}, + "CONFIDENTIAL": {"text": "CONFIDENTIAL", "color": (0.8, 0, 0), "border_color": (0.6, 0, 0)}, + "REVIEWED": {"text": "REVIEWED", "color": (0, 0, 0.8), "border_color": (0, 0, 0.6)}, + "FINAL": {"text": "FINAL", "color": (0.5, 0, 0.5), "border_color": (0.3, 0, 0.3)}, + "URGENT": {"text": "URGENT", "color": (0.9, 0, 0), "border_color": (0.7, 0, 0)}, + "COMPLETED": {"text": "COMPLETED", "color": (0, 0.6, 0), "border_color": (0, 0.4, 0)} + } + + # Size mapping + size_map = { + "small": {"width": 80, "height": 25, "font_size": 10}, + "medium": {"width": 120, "height": 35, "font_size": 12}, + "large": {"width": 160, "height": 45, "font_size": 14} + } + + # Process each stamp + for i, stamp_def in enumerate(stamp_definitions): + try: + page_num = stamp_def.get("page", 1) - 1 # Convert to 0-indexed + x = stamp_def.get("x", 400) + y = stamp_def.get("y", 700) + stamp_type = stamp_def.get("stamp_type", "APPROVED").upper() + size_name = stamp_def.get("size", "medium").lower() + rotation = stamp_def.get("rotation", 0) + opacity = stamp_def.get("opacity", 0.7) + + # Validate page number + if page_num >= len(doc) or page_num < 0: + stamp_info["stamp_errors"].append({ + "stamp_index": i, + "error": f"Page {page_num + 1} does not exist" + }) + continue + + page = doc[page_num] + + # Get stamp properties + if stamp_type not in stamp_types: + stamp_info["stamp_errors"].append({ + "stamp_index": i, + "error": f"Unknown stamp type: {stamp_type}. Available: {list(stamp_types.keys())}" + }) + continue + + stamp_props = stamp_types[stamp_type] + size_props = size_map.get(size_name, size_map["medium"]) + + # Calculate stamp rectangle + stamp_width = size_props["width"] + stamp_height = size_props["height"] + stamp_rect = fitz.Rect(x, y, x + stamp_width, y + stamp_height) + + # Create stamp as a combination of rectangle and text + # Draw border rectangle + page.draw_rect(stamp_rect, color=stamp_props["border_color"], width=2) + + # Fill rectangle with semi-transparent background + fill_color = (*stamp_props["color"], opacity) + page.draw_rect(stamp_rect, color=stamp_props["color"], fill=fill_color, width=1) + + # Add text + text_rect = fitz.Rect(x + 5, y + 5, x + stamp_width - 5, y + stamp_height - 5) + + # Calculate text position for centering + font_size = size_props["font_size"] + text = stamp_props["text"] + + # Insert text (centered) + text_point = ( + x + stamp_width / 2 - len(text) * font_size / 4, + y + stamp_height / 2 + font_size / 3 + ) + + page.insert_text( + text_point, + text, + fontname="hebo", # Bold font + fontsize=font_size, + color=(1, 1, 1), # White text + rotate=rotation + ) + + stamp_info["stamps_added"].append({ + "page": page_num + 1, + "position": {"x": x, "y": y}, + "stamp_type": stamp_type, + "size": size_name, + "dimensions": {"width": stamp_width, "height": stamp_height}, + "rotation": rotation, + "opacity": opacity + }) + + except Exception as e: + stamp_info["stamp_errors"].append({ + "stamp_index": i, + "error": f"Failed to add stamp: {str(e)}" + }) + + # Ensure output directory exists + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Save PDF with stamps + doc.save(str(output_file), garbage=4, deflate=True, clean=True) + doc.close() + + file_size = output_file.stat().st_size + + return { + "input_path": str(input_file), + "output_path": str(output_file), + "stamps_requested": len(stamp_definitions), + "stamps_added": len(stamp_info["stamps_added"]), + "stamps_failed": len(stamp_info["stamp_errors"]), + "available_stamp_types": list(stamp_types.keys()), + "stamp_details": stamp_info, + "file_size": format_file_size(file_size), + "stamp_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"Adding stamps failed: {str(e)}", "stamp_time": round(time.time() - start_time, 2)} + +@mcp.tool(name="extract_all_annotations", description="Extract all annotations (notes, highlights, stamps) from PDF") +async def extract_all_annotations( + pdf_path: str, + export_format: str = "json" # json, csv +) -> Dict[str, Any]: + """ + Extract all annotations from PDF and export to JSON or CSV format + + Args: + pdf_path: Path to the PDF file to analyze + export_format: Output format (json or csv) + + Returns: + Dictionary containing all extracted annotations + """ + import time + start_time = time.time() + + try: + # Validate input path + input_file = await validate_pdf_path(pdf_path) + doc = fitz.open(str(input_file)) + + all_annotations = [] + annotation_summary = { + "total_annotations": 0, + "by_type": {}, + "by_page": {}, + "authors": set() + } + + # Process each page + for page_num in range(len(doc)): + page = doc[page_num] + page_annotations = [] + + # Get all annotations on this page + for annot in page.annots(): + try: + annot_info = { + "page": page_num + 1, + "type": annot.type[1], # Get annotation type name + "content": annot.info.get("content", ""), + "author": annot.info.get("title", "") or annot.info.get("author", ""), + "subject": annot.info.get("subject", ""), + "creation_date": str(annot.info.get("creationDate", "")), + "modification_date": str(annot.info.get("modDate", "")), + "rect": { + "x0": round(annot.rect.x0, 2), + "y0": round(annot.rect.y0, 2), + "x1": round(annot.rect.x1, 2), + "y1": round(annot.rect.y1, 2) + } + } + + # Get colors if available + try: + stroke_color = annot.colors.get("stroke") + fill_color = annot.colors.get("fill") + if stroke_color: + annot_info["stroke_color"] = stroke_color + if fill_color: + annot_info["fill_color"] = fill_color + except: + pass + + # For highlight annotations, try to get highlighted text + if annot.type[1] == "Highlight": + try: + highlighted_text = page.get_textbox(annot.rect) + if highlighted_text.strip(): + annot_info["highlighted_text"] = highlighted_text.strip() + except: + pass + + all_annotations.append(annot_info) + page_annotations.append(annot_info) + + # Update summary + annotation_type = annot_info["type"] + annotation_summary["by_type"][annotation_type] = annotation_summary["by_type"].get(annotation_type, 0) + 1 + + if annot_info["author"]: + annotation_summary["authors"].add(annot_info["author"]) + + except Exception as e: + # Skip problematic annotations + continue + + # Update page summary + if page_annotations: + annotation_summary["by_page"][page_num + 1] = len(page_annotations) + + doc.close() + + annotation_summary["total_annotations"] = len(all_annotations) + annotation_summary["authors"] = list(annotation_summary["authors"]) + + # Format output based on requested format + if export_format.lower() == "csv": + # Convert to CSV-friendly format + csv_data = [] + for annot in all_annotations: + csv_row = { + "Page": annot["page"], + "Type": annot["type"], + "Content": annot["content"], + "Author": annot["author"], + "Subject": annot["subject"], + "X0": annot["rect"]["x0"], + "Y0": annot["rect"]["y0"], + "X1": annot["rect"]["x1"], + "Y1": annot["rect"]["y1"], + "Creation_Date": annot["creation_date"], + "Highlighted_Text": annot.get("highlighted_text", "") + } + csv_data.append(csv_row) + + return { + "input_path": str(input_file), + "export_format": "csv", + "annotations": csv_data, + "summary": annotation_summary, + "extraction_time": round(time.time() - start_time, 2) + } + + else: # JSON format (default) + return { + "input_path": str(input_file), + "export_format": "json", + "annotations": all_annotations, + "summary": annotation_summary, + "extraction_time": round(time.time() - start_time, 2) + } + + except Exception as e: + return {"error": f"Annotation extraction failed: {str(e)}", "extraction_time": round(time.time() - start_time, 2)} + # Main entry point def create_server(): """Create and return the MCP server instance"""