diff --git a/docs/RESOURCE_DESIGN.md b/docs/RESOURCE_DESIGN.md new file mode 100644 index 0000000..0f84b0f --- /dev/null +++ b/docs/RESOURCE_DESIGN.md @@ -0,0 +1,266 @@ +# MCP Resources Design for Embedded Office Content + +## Overview + +Expose embedded content from Office documents as MCP resources, allowing clients to fetch specific items on-demand rather than bloating tool responses. + +## URI Scheme + +``` +office://{doc_id}/{resource_type}/{resource_id} +``` + +**Examples:** +- `office://abc123/image/0` - First image from document abc123 +- `office://abc123/chart/revenue-q4` - Named chart +- `office://abc123/media/video-1` - Embedded video +- `office://abc123/embed/attached.pdf` - Embedded PDF + +## Supported Resource Types + +| Type | MIME Types | Sources | +|------|-----------|---------| +| `image` | image/png, image/jpeg, image/gif, image/wmf, image/emf | All Office formats | +| `chart` | image/png (rendered), application/json (data) | Excel, Word, PowerPoint | +| `media` | audio/*, video/* | PowerPoint, Word | +| `embed` | application/pdf, application/msword, etc. | OLE embedded objects | +| `font` | font/ttf, font/otf | Embedded fonts | +| `slide` | image/png (rendered) | PowerPoint slides as images | + +## Document ID Strategy + +Documents need stable IDs for resource URIs. Options: + +1. **Content hash** - SHA256 of file content (stable across sessions) +2. **Path hash** - Hash of file path (simpler, works for local files) +3. **Session ID** - Random ID per extraction (only valid during session) + +**Recommendation:** Use content hash prefix (first 12 chars of SHA256) for stability. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ MCP Client │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Resource Template: office://{doc_id}/{type}/{resource_id} │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Resource Manager │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ ImageStore │ │ ChartStore │ │ MediaStore │ ... │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Document Cache │ +│ { doc_id: { images: [...], charts: [...], media: [...] } } │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Implementation + +### 1. Resource Store (in-memory cache) + +```python +from dataclasses import dataclass +from typing import Dict, List, Optional +import hashlib + +@dataclass +class EmbeddedResource: + """Represents an embedded resource from an Office document.""" + resource_id: str + resource_type: str # image, chart, media, embed + mime_type: str + data: bytes + name: Optional[str] = None # Original filename if available + metadata: Optional[dict] = None # Size, dimensions, etc. + +class ResourceStore: + """Manages extracted resources from Office documents.""" + + def __init__(self): + self._documents: Dict[str, Dict[str, List[EmbeddedResource]]] = {} + + @staticmethod + def get_doc_id(file_path: str) -> str: + """Generate stable document ID from file content.""" + with open(file_path, 'rb') as f: + content_hash = hashlib.sha256(f.read()).hexdigest() + return content_hash[:12] + + def store(self, doc_id: str, resource: EmbeddedResource): + """Store an extracted resource.""" + if doc_id not in self._documents: + self._documents[doc_id] = {} + rtype = resource.resource_type + if rtype not in self._documents[doc_id]: + self._documents[doc_id][rtype] = [] + self._documents[doc_id][rtype].append(resource) + + def get(self, doc_id: str, resource_type: str, resource_id: str) -> Optional[EmbeddedResource]: + """Retrieve a specific resource.""" + if doc_id not in self._documents: + return None + resources = self._documents[doc_id].get(resource_type, []) + + # Try by index first + if resource_id.isdigit(): + idx = int(resource_id) + if 0 <= idx < len(resources): + return resources[idx] + + # Try by name + for r in resources: + if r.resource_id == resource_id or r.name == resource_id: + return r + return None + + def list_resources(self, doc_id: str) -> Dict[str, List[dict]]: + """List all resources for a document.""" + if doc_id not in self._documents: + return {} + + result = {} + for rtype, resources in self._documents[doc_id].items(): + result[rtype] = [ + { + "id": r.resource_id, + "name": r.name, + "mime_type": r.mime_type, + "uri": f"office://{doc_id}/{rtype}/{r.resource_id}" + } + for r in resources + ] + return result + +# Global instance +resource_store = ResourceStore() +``` + +### 2. Resource Template Registration + +```python +from fastmcp import FastMCP + +app = FastMCP("MCP Office Tools") + +@app.resource( + "office://{doc_id}/{resource_type}/{resource_id}", + name="office_embedded_resource", + description="Embedded content from Office documents (images, charts, media, etc.)" +) +def get_office_resource(doc_id: str, resource_type: str, resource_id: str) -> bytes: + """Retrieve embedded resource from an Office document.""" + resource = resource_store.get(doc_id, resource_type, resource_id) + if resource is None: + raise ValueError( + f"Resource not found: office://{doc_id}/{resource_type}/{resource_id}" + ) + return resource.data +``` + +### 3. Integration with extract_images Tool + +Modify `extract_images` to populate the resource store: + +```python +@mcp_tool(name="extract_images") +async def extract_images(self, file_path: str, ...) -> dict: + # ... existing extraction logic ... + + doc_id = ResourceStore.get_doc_id(resolved_path) + + for idx, image_data in enumerate(extracted_images): + resource = EmbeddedResource( + resource_id=str(idx), + resource_type="image", + mime_type=image_data["mime_type"], + data=image_data["bytes"], + name=image_data.get("filename"), + metadata={"width": ..., "height": ...} + ) + resource_store.store(doc_id, resource) + + # Return URIs instead of base64 data + return { + "doc_id": doc_id, + "images": [ + { + "uri": f"office://{doc_id}/image/{idx}", + "mime_type": img["mime_type"], + "dimensions": {...} + } + for idx, img in enumerate(extracted_images) + ], + "message": "Use resource URIs to fetch image data" + } +``` + +### 4. New Tool: list_embedded_resources + +```python +@mcp_tool(name="list_embedded_resources") +async def list_embedded_resources( + self, + file_path: str, + resource_types: str = "all" # "all", "image", "chart", "media", etc. +) -> dict: + """ + Scan document and return URIs for all embedded resources. + Does not extract content - just identifies what's available. + """ + doc_id = ResourceStore.get_doc_id(resolved_path) + + # Scan document for resources + resources = scan_for_resources(resolved_path, resource_types) + + # Store metadata (not content yet - lazy loading) + for r in resources: + resource_store.store(doc_id, r) + + return { + "doc_id": doc_id, + "resources": resource_store.list_resources(doc_id), + "total_count": sum(len(v) for v in resources.values()) + } +``` + +## Usage Flow + +1. **Client extracts images or lists resources:** + ``` + → list_embedded_resources("report.docx") + ← { "doc_id": "a1b2c3d4e5f6", "resources": { "image": [...], "chart": [...] } } + ``` + +2. **Client fetches specific resource via URI:** + ``` + → read_resource("office://a1b2c3d4e5f6/image/0") + ← + ``` + +3. **Resources remain available for the session** (or until cache expires) + +## Benefits + +1. **Smaller tool responses** - URIs instead of base64 blobs +2. **On-demand fetching** - Client only loads what it needs +3. **Unified access** - Same pattern for images, charts, media, embeds +4. **Cacheable** - Document ID enables client-side caching +5. **Discoverable** - `list_embedded_resources` shows what's available + +## Future Extensions + +- **Lazy extraction** - Only extract when resource is read, not when listed +- **Thumbnails** - `office://{doc_id}/image/{id}?size=thumb` +- **Format conversion** - `office://{doc_id}/image/{id}?format=webp` +- **Expiration** - TTL on cached resources +- **Persistence** - Optional disk-backed store for large documents diff --git a/src/mcp_office_tools/mixins/universal.py b/src/mcp_office_tools/mixins/universal.py index fc01060..0c4645f 100644 --- a/src/mcp_office_tools/mixins/universal.py +++ b/src/mcp_office_tools/mixins/universal.py @@ -14,6 +14,7 @@ from ..utils import ( resolve_office_file_path, validate_office_file, ) +from ..resources import resource_store, EmbeddedResource, ResourceStore class UniversalMixin(MCPMixin): @@ -340,4 +341,282 @@ class UniversalMixin(MCPMixin): async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]: """Extract basic metadata common to all documents.""" from ..utils import _extract_basic_metadata - return await _extract_basic_metadata(file_path, extension, category) \ No newline at end of file + return await _extract_basic_metadata(file_path, extension, category) + + @mcp_tool( + name="index_document", + description="Scan and index all resources in a document (images, chapters, sheets, slides). Returns resource URIs that can be fetched individually. Use this before accessing resources via their URIs." + ) + async def index_document( + self, + file_path: str = Field(description="Path to Office document or URL"), + include_images: bool = Field(default=True, description="Index embedded images"), + include_chapters: bool = Field(default=True, description="Index chapters/sections (Word docs)"), + include_sheets: bool = Field(default=True, description="Index sheets (Excel docs)"), + include_slides: bool = Field(default=True, description="Index slides (PowerPoint docs)") + ) -> dict[str, Any]: + """Scan document and populate resource store with available content. + + Returns URIs for all indexed resources that can be fetched via MCP resources. + """ + start_time = time.time() + + # Resolve and validate + local_path = await resolve_office_file_path(file_path) + validation = await validate_office_file(local_path) + if not validation["is_valid"]: + raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") + + format_info = await detect_format(local_path) + category = format_info["category"] + extension = format_info["extension"] + + # Generate stable document ID + doc_id = ResourceStore.get_doc_id(local_path) + + # Clear any existing resources for this doc + resource_store.clear_document(doc_id) + + indexed = { + "doc_id": doc_id, + "file": file_path, + "format": format_info["format_name"], + "resources": {} + } + + # Index images + if include_images: + try: + images = await self._extract_images_by_category( + local_path, extension, category, "png", 50, 50 + ) + for idx, img in enumerate(images): + resource = EmbeddedResource( + resource_id=str(idx), + resource_type="image", + mime_type=img.get("mime_type", "image/png"), + data=img.get("data", b""), + name=img.get("filename"), + metadata={ + "width": img.get("width"), + "height": img.get("height"), + "format": img.get("format", "png") + } + ) + resource_store.store(doc_id, resource, local_path) + + indexed["resources"]["image"] = [ + {"id": str(i), "uri": f"image://{doc_id}/{i}"} + for i in range(len(images)) + ] + except Exception as e: + indexed["resources"]["image"] = {"error": str(e)} + + # Index chapters (Word documents) + if include_chapters and category == "word": + try: + chapters = await self._index_word_chapters(local_path, doc_id) + indexed["resources"]["chapter"] = chapters + except Exception as e: + indexed["resources"]["chapter"] = {"error": str(e)} + + # Index sheets (Excel documents) + if include_sheets and category == "excel": + try: + sheets = await self._index_excel_sheets(local_path, doc_id) + indexed["resources"]["sheet"] = sheets + except Exception as e: + indexed["resources"]["sheet"] = {"error": str(e)} + + # Index slides (PowerPoint documents) + if include_slides and category == "powerpoint": + try: + slides = await self._index_powerpoint_slides(local_path, doc_id) + indexed["resources"]["slide"] = slides + except Exception as e: + indexed["resources"]["slide"] = {"error": str(e)} + + indexed["indexing_time"] = round(time.time() - start_time, 3) + indexed["total_resources"] = sum( + len(v) if isinstance(v, list) else 0 + for v in indexed["resources"].values() + ) + + return indexed + + async def _index_word_chapters(self, file_path: str, doc_id: str) -> list[dict]: + """Extract and index chapters from a Word document.""" + import re + from docx import Document + + doc = Document(file_path) + chapters = [] + current_chapter = None + current_paragraphs = [] + chapter_pattern = re.compile(r'^chapter\s*(\d+)', re.IGNORECASE) + + def save_chapter(): + nonlocal current_chapter, current_paragraphs + if current_chapter is not None: + # Convert to markdown + markdown_lines = [] + markdown_lines.append(f"# {current_chapter['title']}\n") + for para in current_paragraphs: + text = para.strip() + if text: + markdown_lines.append(text + "\n") + + content = "\n".join(markdown_lines) + + resource = EmbeddedResource( + resource_id=str(current_chapter["number"]), + resource_type="chapter", + mime_type="text/markdown", + data=content, + name=current_chapter["title"], + metadata={ + "word_count": len(content.split()), + "paragraph_count": len(current_paragraphs) + } + ) + resource_store.store(doc_id, resource, file_path) + + chapters.append({ + "id": str(current_chapter["number"]), + "title": current_chapter["title"], + "uri": f"chapter://{doc_id}/{current_chapter['number']}", + "word_count": len(content.split()) + }) + + for para in doc.paragraphs: + text = para.text.strip() + match = chapter_pattern.match(text) + + if match: + save_chapter() + current_chapter = { + "number": int(match.group(1)), + "title": text[:100] + } + current_paragraphs = [] + elif current_chapter is not None: + current_paragraphs.append(text) + + # Save last chapter + save_chapter() + + return chapters + + async def _index_excel_sheets(self, file_path: str, doc_id: str) -> list[dict]: + """Extract and index sheets from an Excel document.""" + import openpyxl + + wb = openpyxl.load_workbook(file_path, data_only=True) + sheets = [] + + for sheet_name in wb.sheetnames: + ws = wb[sheet_name] + + # Convert to markdown table + rows = [] + for row in ws.iter_rows(values_only=True): + row_data = [str(cell) if cell is not None else "" for cell in row] + if any(row_data): # Skip empty rows + rows.append(row_data) + + if not rows: + continue + + # Build markdown table + md_lines = [] + md_lines.append("| " + " | ".join(rows[0]) + " |") + md_lines.append("| " + " | ".join(["---"] * len(rows[0])) + " |") + for row in rows[1:]: + # Pad row if needed + while len(row) < len(rows[0]): + row.append("") + md_lines.append("| " + " | ".join(row[:len(rows[0])]) + " |") + + content = "\n".join(md_lines) + + resource = EmbeddedResource( + resource_id=sheet_name, + resource_type="sheet", + mime_type="text/markdown", + data=content, + name=sheet_name, + metadata={ + "rows": len(rows), + "columns": len(rows[0]) if rows else 0 + } + ) + resource_store.store(doc_id, resource, file_path) + + sheets.append({ + "id": sheet_name, + "name": sheet_name, + "uri": f"sheet://{doc_id}/{sheet_name}", + "rows": len(rows), + "columns": len(rows[0]) if rows else 0 + }) + + wb.close() + return sheets + + async def _index_powerpoint_slides(self, file_path: str, doc_id: str) -> list[dict]: + """Extract and index slides from a PowerPoint document.""" + from pptx import Presentation + + prs = Presentation(file_path) + slides = [] + + for idx, slide in enumerate(prs.slides): + slide_num = idx + 1 + + # Extract text from shapes + text_parts = [] + title = None + + for shape in slide.shapes: + if hasattr(shape, "text") and shape.text.strip(): + if shape.is_placeholder and hasattr(shape, "placeholder_format"): + if shape.placeholder_format.type == 1: # Title + title = shape.text.strip() + text_parts.append(shape.text.strip()) + + if not text_parts: + continue + + # Build markdown + md_lines = [] + if title: + md_lines.append(f"# Slide {slide_num}: {title}\n") + else: + md_lines.append(f"# Slide {slide_num}\n") + + for text in text_parts: + if text != title: + md_lines.append(text + "\n") + + content = "\n".join(md_lines) + + resource = EmbeddedResource( + resource_id=str(slide_num), + resource_type="slide", + mime_type="text/markdown", + data=content, + name=title or f"Slide {slide_num}", + metadata={ + "slide_number": slide_num, + "has_title": title is not None + } + ) + resource_store.store(doc_id, resource, file_path) + + slides.append({ + "id": str(slide_num), + "title": title or f"Slide {slide_num}", + "uri": f"slide://{doc_id}/{slide_num}" + }) + + return slides \ No newline at end of file diff --git a/src/mcp_office_tools/resources.py b/src/mcp_office_tools/resources.py new file mode 100644 index 0000000..53423a0 --- /dev/null +++ b/src/mcp_office_tools/resources.py @@ -0,0 +1,243 @@ +"""Resource store for embedded Office document content. + +Provides caching and retrieval of extracted resources (images, charts, media, embeds) +and structural content (chapters, pages, sheets) with stable document IDs. + +Resource URI Schemes: + Binary content: + image://{doc_id}/{id} - Embedded images + chart://{doc_id}/{id} - Charts (as PNG or data) + media://{doc_id}/{id} - Audio/video files + embed://{doc_id}/{id} - OLE embedded objects + + Text/structural content: + chapter://{doc_id}/{num} - Word chapter as markdown + section://{doc_id}/{id} - Document section + page://{doc_id}/{num} - Page content + sheet://{doc_id}/{name} - Excel sheet as markdown/CSV + slide://{doc_id}/{num} - PowerPoint slide content +""" + +import hashlib +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Union +from pathlib import Path + + +# Resource type categories +BINARY_TYPES = {"image", "chart", "media", "embed"} +TEXT_TYPES = {"chapter", "section", "page", "sheet", "slide"} +ALL_RESOURCE_TYPES = BINARY_TYPES | TEXT_TYPES + + +@dataclass +class EmbeddedResource: + """Represents an embedded resource from an Office document. + + Can hold either binary data (images, media) or text content (chapters, sheets). + """ + resource_id: str + resource_type: str # image, chart, media, embed, chapter, section, page, sheet, slide + mime_type: str + data: Union[bytes, str] # bytes for binary, str for text content + name: Optional[str] = None # Original filename or title + metadata: Dict = field(default_factory=dict) # Dimensions, word count, etc. + + @property + def uri(self) -> str: + """Generate the MCP resource URI for this resource.""" + return f"{self.resource_type}://{self.metadata.get('doc_id', 'unknown')}/{self.resource_id}" + + @property + def is_binary(self) -> bool: + """Check if this resource contains binary data.""" + return self.resource_type in BINARY_TYPES + + @property + def is_text(self) -> bool: + """Check if this resource contains text data.""" + return self.resource_type in TEXT_TYPES + + @property + def size(self) -> int: + """Get size in bytes.""" + if isinstance(self.data, bytes): + return len(self.data) + return len(self.data.encode('utf-8')) + + +class ResourceStore: + """Manages extracted resources from Office documents. + + Resources are cached in memory and accessible via MCP resource URIs. + Document IDs are generated from content hashes for stability. + """ + + def __init__(self): + # Structure: {doc_id: {resource_type: [EmbeddedResource, ...]}} + self._documents: Dict[str, Dict[str, List[EmbeddedResource]]] = {} + # Track doc_id to file path mapping + self._doc_paths: Dict[str, str] = {} + + @staticmethod + def get_doc_id(file_path: str) -> str: + """Generate stable document ID from file content hash. + + Uses first 12 characters of SHA256 hash - enough uniqueness + for practical purposes while keeping URIs readable. + """ + path = Path(file_path) + if not path.exists(): + # Fallback to path hash if file doesn't exist + return hashlib.sha256(str(path).encode()).hexdigest()[:12] + + with open(path, 'rb') as f: + content_hash = hashlib.sha256(f.read()).hexdigest() + return content_hash[:12] + + def store(self, doc_id: str, resource: EmbeddedResource, file_path: Optional[str] = None): + """Store an extracted resource. + + Args: + doc_id: Document identifier (from get_doc_id) + resource: The embedded resource to store + file_path: Optional original file path for reference + """ + if doc_id not in self._documents: + self._documents[doc_id] = {} + + rtype = resource.resource_type + if rtype not in self._documents[doc_id]: + self._documents[doc_id][rtype] = [] + + # Add doc_id to metadata for URI generation + resource.metadata["doc_id"] = doc_id + + self._documents[doc_id][rtype].append(resource) + + if file_path: + self._doc_paths[doc_id] = file_path + + def get(self, doc_id: str, resource_type: str, resource_id: str) -> Optional[EmbeddedResource]: + """Retrieve a specific resource. + + Args: + doc_id: Document identifier + resource_type: Type of resource (image, chart, media, embed) + resource_id: Resource identifier (index or name) + + Returns: + EmbeddedResource if found, None otherwise + """ + if doc_id not in self._documents: + return None + + resources = self._documents[doc_id].get(resource_type, []) + + # Try by index first (most common) + if resource_id.isdigit(): + idx = int(resource_id) + if 0 <= idx < len(resources): + return resources[idx] + + # Try by resource_id match + for r in resources: + if r.resource_id == resource_id: + return r + + # Try by name match + for r in resources: + if r.name and r.name == resource_id: + return r + + return None + + def list_resources(self, doc_id: str, resource_type: Optional[str] = None) -> Dict[str, List[dict]]: + """List all resources for a document. + + Args: + doc_id: Document identifier + resource_type: Optional filter by type + + Returns: + Dict mapping resource types to lists of resource info + """ + if doc_id not in self._documents: + return {} + + result = {} + for rtype, resources in self._documents[doc_id].items(): + if resource_type and rtype != resource_type: + continue + + result[rtype] = [ + { + "id": r.resource_id, + "name": r.name, + "mime_type": r.mime_type, + "uri": f"{rtype}://{doc_id}/{r.resource_id}", + "size_bytes": len(r.data), + **{k: v for k, v in r.metadata.items() if k != "doc_id"} + } + for r in resources + ] + + return result + + def get_doc_info(self, doc_id: str) -> Optional[dict]: + """Get information about a cached document.""" + if doc_id not in self._documents: + return None + + resource_counts = { + rtype: len(resources) + for rtype, resources in self._documents[doc_id].items() + } + + return { + "doc_id": doc_id, + "file_path": self._doc_paths.get(doc_id), + "resource_counts": resource_counts, + "total_resources": sum(resource_counts.values()) + } + + def clear_document(self, doc_id: str): + """Remove all cached resources for a document.""" + if doc_id in self._documents: + del self._documents[doc_id] + if doc_id in self._doc_paths: + del self._doc_paths[doc_id] + + def clear_all(self): + """Clear all cached resources.""" + self._documents.clear() + self._doc_paths.clear() + + @property + def cached_documents(self) -> List[str]: + """List all cached document IDs.""" + return list(self._documents.keys()) + + def get_stats(self) -> dict: + """Get cache statistics.""" + total_resources = 0 + total_bytes = 0 + type_counts = {} + + for doc_id, types in self._documents.items(): + for rtype, resources in types.items(): + count = len(resources) + total_resources += count + type_counts[rtype] = type_counts.get(rtype, 0) + count + total_bytes += sum(len(r.data) for r in resources) + + return { + "documents_cached": len(self._documents), + "total_resources": total_resources, + "total_bytes": total_bytes, + "by_type": type_counts + } + + +# Global singleton instance +resource_store = ResourceStore() diff --git a/src/mcp_office_tools/server.py b/src/mcp_office_tools/server.py index 50c1a5a..7b554dc 100644 --- a/src/mcp_office_tools/server.py +++ b/src/mcp_office_tools/server.py @@ -17,6 +17,7 @@ from fastmcp import FastMCP from fastmcp.prompts import Prompt from .mixins import UniversalMixin, WordMixin, ExcelMixin, PowerPointMixin +from .resources import resource_store, BINARY_TYPES, TEXT_TYPES # Initialize FastMCP app app = FastMCP("MCP Office Tools") @@ -41,6 +42,276 @@ powerpoint_mixin.register_all(app, prefix="") # This allows gradual migration while maintaining backward compatibility +# ==================== MCP Resources ==================== +# Expose embedded document content via URI-based resources +# Supports format suffixes: .md, .txt, .html (e.g., chapter://doc/3.txt) +# Supports ranges: chapters://doc/1-5, slides://doc/1,3,5 + +import re as _re + + +def _parse_format_suffix(resource_id: str) -> tuple[str, str]: + """Extract format suffix from resource ID. + + Examples: + '3.md' -> ('3', 'md') + '3.txt' -> ('3', 'txt') + '3.html' -> ('3', 'html') + '3' -> ('3', 'md') # default to markdown + """ + match = _re.match(r'^(.+)\.(md|txt|html)$', resource_id) + if match: + return match.group(1), match.group(2) + return resource_id, 'md' # default to markdown + + +def _convert_markdown_to_format(content: str, fmt: str) -> str: + """Convert markdown content to requested format.""" + if fmt == 'md': + return content + elif fmt == 'txt': + # Strip markdown formatting for plain text + text = content + # Remove headers (# ## ###) + text = _re.sub(r'^#+\s+', '', text, flags=_re.MULTILINE) + # Remove bold/italic + text = _re.sub(r'\*\*(.+?)\*\*', r'\1', text) + text = _re.sub(r'\*(.+?)\*', r'\1', text) + text = _re.sub(r'__(.+?)__', r'\1', text) + text = _re.sub(r'_(.+?)_', r'\1', text) + # Remove links but keep text + text = _re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) + # Remove horizontal rules + text = _re.sub(r'^---+$', '', text, flags=_re.MULTILINE) + return text.strip() + elif fmt == 'html': + # Simple markdown to HTML conversion + html = content + # Headers + html = _re.sub(r'^### (.+)$', r'

\1

', html, flags=_re.MULTILINE) + html = _re.sub(r'^## (.+)$', r'

\1

', html, flags=_re.MULTILINE) + html = _re.sub(r'^# (.+)$', r'

\1

', html, flags=_re.MULTILINE) + # Bold/italic + html = _re.sub(r'\*\*(.+?)\*\*', r'\1', html) + html = _re.sub(r'\*(.+?)\*', r'\1', html) + # Links + html = _re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'\1', html) + # Paragraphs + paragraphs = html.split('\n\n') + html = '\n'.join(f'

{p.strip()}

' if not p.strip().startswith(' list[int]: + """Parse range string like '1-5', '1,3,5', or '1-3,7,9-10' into list of integers.""" + result = [] + for part in range_str.split(','): + part = part.strip() + if '-' in part: + start, end = part.split('-', 1) + start_int = int(start.strip()) + end_int = int(end.strip()) + result.extend(range(start_int, min(end_int + 1, max_val + 1))) + else: + result.append(int(part)) + return sorted(set(result)) + + +@app.resource( + "image://{doc_id}/{resource_id}", + name="document_image", + description="Embedded image from an Office document" +) +def get_image_resource(doc_id: str, resource_id: str) -> bytes: + """Retrieve an embedded image.""" + resource = resource_store.get(doc_id, "image", resource_id) + if resource is None: + raise ValueError(f"Image not found: image://{doc_id}/{resource_id}") + return resource.data + + +@app.resource( + "chart://{doc_id}/{resource_id}", + name="document_chart", + description="Chart from an Office document (as image or data)" +) +def get_chart_resource(doc_id: str, resource_id: str) -> bytes: + """Retrieve a chart.""" + resource = resource_store.get(doc_id, "chart", resource_id) + if resource is None: + raise ValueError(f"Chart not found: chart://{doc_id}/{resource_id}") + return resource.data + + +@app.resource( + "media://{doc_id}/{resource_id}", + name="document_media", + description="Audio or video from an Office document" +) +def get_media_resource(doc_id: str, resource_id: str) -> bytes: + """Retrieve embedded media.""" + resource = resource_store.get(doc_id, "media", resource_id) + if resource is None: + raise ValueError(f"Media not found: media://{doc_id}/{resource_id}") + return resource.data + + +@app.resource( + "embed://{doc_id}/{resource_id}", + name="embedded_object", + description="Embedded OLE object (PDF, another Office doc, etc.)" +) +def get_embed_resource(doc_id: str, resource_id: str) -> bytes: + """Retrieve an embedded object.""" + resource = resource_store.get(doc_id, "embed", resource_id) + if resource is None: + raise ValueError(f"Embedded object not found: embed://{doc_id}/{resource_id}") + return resource.data + + +@app.resource( + "chapter://{doc_id}/{resource_id}", + mime_type="text/markdown", + name="document_chapter", + description="Chapter from a Word document. Supports format suffixes: chapter://doc/3.md, chapter://doc/3.txt, chapter://doc/3.html" +) +def get_chapter_resource(doc_id: str, resource_id: str) -> str: + """Retrieve a chapter with optional format conversion. + + Examples: + chapter://abc123/3 -> Chapter 3 as markdown (default) + chapter://abc123/3.md -> Chapter 3 as markdown + chapter://abc123/3.txt -> Chapter 3 as plain text + chapter://abc123/3.html -> Chapter 3 as HTML + """ + chapter_id, fmt = _parse_format_suffix(resource_id) + resource = resource_store.get(doc_id, "chapter", chapter_id) + if resource is None: + raise ValueError(f"Chapter not found: chapter://{doc_id}/{resource_id}") + return _convert_markdown_to_format(resource.data, fmt) + + +@app.resource( + "section://{doc_id}/{resource_id}", + mime_type="text/markdown", + name="document_section", + description="Section from a document as Markdown" +) +def get_section_resource(doc_id: str, resource_id: str) -> str: + """Retrieve a section as markdown.""" + resource = resource_store.get(doc_id, "section", resource_id) + if resource is None: + raise ValueError(f"Section not found: section://{doc_id}/{resource_id}") + return resource.data + + +@app.resource( + "sheet://{doc_id}/{resource_id}", + mime_type="text/markdown", + name="excel_sheet", + description="Excel sheet as Markdown table or CSV" +) +def get_sheet_resource(doc_id: str, resource_id: str) -> str: + """Retrieve an Excel sheet.""" + resource = resource_store.get(doc_id, "sheet", resource_id) + if resource is None: + raise ValueError(f"Sheet not found: sheet://{doc_id}/{resource_id}") + return resource.data + + +@app.resource( + "slide://{doc_id}/{resource_id}", + mime_type="text/markdown", + name="powerpoint_slide", + description="PowerPoint slide content as Markdown" +) +def get_slide_resource(doc_id: str, resource_id: str) -> str: + """Retrieve a slide as markdown.""" + resource = resource_store.get(doc_id, "slide", resource_id) + if resource is None: + raise ValueError(f"Slide not found: slide://{doc_id}/{resource_id}") + return resource.data + + +# ==================== Range-Based Resources ==================== +# Support for fetching multiple items at once: chapters://doc/1-5, slides://doc/1,3,5 + +@app.resource( + "chapters://{doc_id}/{range_spec}", + mime_type="text/markdown", + name="document_chapters_range", + description="Multiple chapters from a Word document as combined Markdown (e.g., chapters://doc/1-5)" +) +def get_chapters_range(doc_id: str, range_spec: str) -> str: + """Retrieve multiple chapters as combined markdown. + + Range formats: '1-5' (chapters 1-5), '1,3,5' (specific chapters), '1-3,7' (mixed) + """ + chapter_nums = _parse_range(range_spec) + chapters_content = [] + + for num in chapter_nums: + resource = resource_store.get(doc_id, "chapter", str(num)) + if resource is not None: + chapters_content.append(resource.data) + + if not chapters_content: + raise ValueError(f"No chapters found for range: chapters://{doc_id}/{range_spec}") + + return "\n\n---\n\n".join(chapters_content) + + +@app.resource( + "slides://{doc_id}/{range_spec}", + mime_type="text/markdown", + name="powerpoint_slides_range", + description="Multiple slides from a PowerPoint as combined Markdown (e.g., slides://doc/1-10)" +) +def get_slides_range(doc_id: str, range_spec: str) -> str: + """Retrieve multiple slides as combined markdown.""" + slide_nums = _parse_range(range_spec) + slides_content = [] + + for num in slide_nums: + resource = resource_store.get(doc_id, "slide", str(num)) + if resource is not None: + slides_content.append(resource.data) + + if not slides_content: + raise ValueError(f"No slides found for range: slides://{doc_id}/{range_spec}") + + return "\n\n---\n\n".join(slides_content) + + +@app.resource( + "paragraph://{doc_id}/{chapter_id}/{paragraph_id}", + mime_type="text/markdown", + name="chapter_paragraph", + description="Specific paragraph from a chapter (e.g., paragraph://doc/3/5 for chapter 3, paragraph 5)" +) +def get_paragraph(doc_id: str, chapter_id: str, paragraph_id: str) -> str: + """Retrieve a specific paragraph from a chapter.""" + resource = resource_store.get(doc_id, "chapter", chapter_id) + if resource is None: + raise ValueError(f"Chapter not found: {chapter_id}") + + # Split chapter content into paragraphs + paragraphs = [p.strip() for p in resource.data.split('\n\n') if p.strip()] + + try: + para_idx = int(paragraph_id) + if 0 <= para_idx < len(paragraphs): + return paragraphs[para_idx] + elif 1 <= para_idx <= len(paragraphs): + # 1-indexed fallback + return paragraphs[para_idx - 1] + else: + raise ValueError(f"Paragraph {paragraph_id} out of range (0-{len(paragraphs)-1})") + except (ValueError, IndexError): + raise ValueError(f"Invalid paragraph: paragraph://{doc_id}/{chapter_id}/{paragraph_id}") + + # ==================== MCP Prompts ==================== # Prompts help users understand how to use tools effectively # Organized from basic to advanced multi-step workflows diff --git a/tests/test_mixins.py b/tests/test_mixins.py index e24fa08..cd47f3b 100644 --- a/tests/test_mixins.py +++ b/tests/test_mixins.py @@ -59,7 +59,7 @@ class TestMixinArchitecture: universal = UniversalMixin() universal.register_all(app) universal_tools = len(app._tool_manager._tools) - initial_tool_count - assert universal_tools == 6 # 6 universal tools + assert universal_tools == 7 # 7 universal tools (includes index_document) word = WordMixin() word.register_all(app) diff --git a/tests/test_server.py b/tests/test_server.py index 7972120..c40880b 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -149,8 +149,8 @@ class TestMixinIntegration: # Verify no duplicates assert len(tool_names) == len(set(tool_names)), "Tool names should be unique" - # Verify expected count: 6 universal + 10 word + 3 excel = 19 - assert len(tool_names) == 19, f"Expected 19 tools, got {len(tool_names)}: {list(tool_names.keys())}" + # Verify expected count: 7 universal + 10 word + 3 excel = 20 + assert len(tool_names) == 20, f"Expected 20 tools, got {len(tool_names)}: {list(tool_names.keys())}" if __name__ == "__main__": diff --git a/tests/test_universal_mixin.py b/tests/test_universal_mixin.py index 5134f24..1092069 100644 --- a/tests/test_universal_mixin.py +++ b/tests/test_universal_mixin.py @@ -30,7 +30,7 @@ class TestUniversalMixinRegistration: mixin.register_all(app) assert mixin is not None - assert len(app._tool_manager._tools) == 6 # 6 universal tools + assert len(app._tool_manager._tools) == 7 # 7 universal tools (includes index_document) def test_tool_names_registered(self): """Test that all expected tool names are registered.""" @@ -43,7 +43,8 @@ class TestUniversalMixinRegistration: "extract_metadata", "detect_office_format", "analyze_document_health", - "get_supported_formats" + "get_supported_formats", + "index_document" } registered_tools = set(app._tool_manager._tools.keys())