Add MCP resource system for embedded document content
Some checks are pending
Test Dashboard / test-and-dashboard (push) Waiting to run

Implements URI-based access to document content with:

- ResourceStore for caching extracted images, chapters, sheets, slides
- Content-based document IDs (SHA256 hash) for stable URIs across sessions
- 11 resource templates with flexible URI patterns:
  - Binary: image://, chart://, media://, embed://
  - Text: chapter://, section://, sheet://, slide://
  - Ranges: chapters://doc/1-5, slides://doc/1,3,5
  - Hierarchical: paragraph://doc/3/5

- Format suffixes for output control:
  - chapter://doc/3.md (default markdown)
  - chapter://doc/3.txt (plain text)
  - chapter://doc/3.html (basic HTML)

- index_document tool scans and populates resources:
  - Word: chapters as markdown, embedded images
  - Excel: sheets as markdown tables
  - PowerPoint: slides as markdown

Tool responses return URIs instead of blobs - clients fetch only what they need.
This commit is contained in:
Ryan Malloy 2026-01-11 09:04:29 -07:00
parent 11defb4eae
commit d569034fa3
7 changed files with 1066 additions and 6 deletions

266
docs/RESOURCE_DESIGN.md Normal file
View File

@ -0,0 +1,266 @@
# MCP Resources Design for Embedded Office Content
## Overview
Expose embedded content from Office documents as MCP resources, allowing clients to fetch specific items on-demand rather than bloating tool responses.
## URI Scheme
```
office://{doc_id}/{resource_type}/{resource_id}
```
**Examples:**
- `office://abc123/image/0` - First image from document abc123
- `office://abc123/chart/revenue-q4` - Named chart
- `office://abc123/media/video-1` - Embedded video
- `office://abc123/embed/attached.pdf` - Embedded PDF
## Supported Resource Types
| Type | MIME Types | Sources |
|------|-----------|---------|
| `image` | image/png, image/jpeg, image/gif, image/wmf, image/emf | All Office formats |
| `chart` | image/png (rendered), application/json (data) | Excel, Word, PowerPoint |
| `media` | audio/*, video/* | PowerPoint, Word |
| `embed` | application/pdf, application/msword, etc. | OLE embedded objects |
| `font` | font/ttf, font/otf | Embedded fonts |
| `slide` | image/png (rendered) | PowerPoint slides as images |
## Document ID Strategy
Documents need stable IDs for resource URIs. Options:
1. **Content hash** - SHA256 of file content (stable across sessions)
2. **Path hash** - Hash of file path (simpler, works for local files)
3. **Session ID** - Random ID per extraction (only valid during session)
**Recommendation:** Use content hash prefix (first 12 chars of SHA256) for stability.
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ MCP Client │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Resource Template: office://{doc_id}/{type}/{resource_id} │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Resource Manager │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ ImageStore │ │ ChartStore │ │ MediaStore │ ... │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Document Cache │
│ { doc_id: { images: [...], charts: [...], media: [...] } } │
└─────────────────────────────────────────────────────────────┘
```
## Implementation
### 1. Resource Store (in-memory cache)
```python
from dataclasses import dataclass
from typing import Dict, List, Optional
import hashlib
@dataclass
class EmbeddedResource:
"""Represents an embedded resource from an Office document."""
resource_id: str
resource_type: str # image, chart, media, embed
mime_type: str
data: bytes
name: Optional[str] = None # Original filename if available
metadata: Optional[dict] = None # Size, dimensions, etc.
class ResourceStore:
"""Manages extracted resources from Office documents."""
def __init__(self):
self._documents: Dict[str, Dict[str, List[EmbeddedResource]]] = {}
@staticmethod
def get_doc_id(file_path: str) -> str:
"""Generate stable document ID from file content."""
with open(file_path, 'rb') as f:
content_hash = hashlib.sha256(f.read()).hexdigest()
return content_hash[:12]
def store(self, doc_id: str, resource: EmbeddedResource):
"""Store an extracted resource."""
if doc_id not in self._documents:
self._documents[doc_id] = {}
rtype = resource.resource_type
if rtype not in self._documents[doc_id]:
self._documents[doc_id][rtype] = []
self._documents[doc_id][rtype].append(resource)
def get(self, doc_id: str, resource_type: str, resource_id: str) -> Optional[EmbeddedResource]:
"""Retrieve a specific resource."""
if doc_id not in self._documents:
return None
resources = self._documents[doc_id].get(resource_type, [])
# Try by index first
if resource_id.isdigit():
idx = int(resource_id)
if 0 <= idx < len(resources):
return resources[idx]
# Try by name
for r in resources:
if r.resource_id == resource_id or r.name == resource_id:
return r
return None
def list_resources(self, doc_id: str) -> Dict[str, List[dict]]:
"""List all resources for a document."""
if doc_id not in self._documents:
return {}
result = {}
for rtype, resources in self._documents[doc_id].items():
result[rtype] = [
{
"id": r.resource_id,
"name": r.name,
"mime_type": r.mime_type,
"uri": f"office://{doc_id}/{rtype}/{r.resource_id}"
}
for r in resources
]
return result
# Global instance
resource_store = ResourceStore()
```
### 2. Resource Template Registration
```python
from fastmcp import FastMCP
app = FastMCP("MCP Office Tools")
@app.resource(
"office://{doc_id}/{resource_type}/{resource_id}",
name="office_embedded_resource",
description="Embedded content from Office documents (images, charts, media, etc.)"
)
def get_office_resource(doc_id: str, resource_type: str, resource_id: str) -> bytes:
"""Retrieve embedded resource from an Office document."""
resource = resource_store.get(doc_id, resource_type, resource_id)
if resource is None:
raise ValueError(
f"Resource not found: office://{doc_id}/{resource_type}/{resource_id}"
)
return resource.data
```
### 3. Integration with extract_images Tool
Modify `extract_images` to populate the resource store:
```python
@mcp_tool(name="extract_images")
async def extract_images(self, file_path: str, ...) -> dict:
# ... existing extraction logic ...
doc_id = ResourceStore.get_doc_id(resolved_path)
for idx, image_data in enumerate(extracted_images):
resource = EmbeddedResource(
resource_id=str(idx),
resource_type="image",
mime_type=image_data["mime_type"],
data=image_data["bytes"],
name=image_data.get("filename"),
metadata={"width": ..., "height": ...}
)
resource_store.store(doc_id, resource)
# Return URIs instead of base64 data
return {
"doc_id": doc_id,
"images": [
{
"uri": f"office://{doc_id}/image/{idx}",
"mime_type": img["mime_type"],
"dimensions": {...}
}
for idx, img in enumerate(extracted_images)
],
"message": "Use resource URIs to fetch image data"
}
```
### 4. New Tool: list_embedded_resources
```python
@mcp_tool(name="list_embedded_resources")
async def list_embedded_resources(
self,
file_path: str,
resource_types: str = "all" # "all", "image", "chart", "media", etc.
) -> dict:
"""
Scan document and return URIs for all embedded resources.
Does not extract content - just identifies what's available.
"""
doc_id = ResourceStore.get_doc_id(resolved_path)
# Scan document for resources
resources = scan_for_resources(resolved_path, resource_types)
# Store metadata (not content yet - lazy loading)
for r in resources:
resource_store.store(doc_id, r)
return {
"doc_id": doc_id,
"resources": resource_store.list_resources(doc_id),
"total_count": sum(len(v) for v in resources.values())
}
```
## Usage Flow
1. **Client extracts images or lists resources:**
```
→ list_embedded_resources("report.docx")
← { "doc_id": "a1b2c3d4e5f6", "resources": { "image": [...], "chart": [...] } }
```
2. **Client fetches specific resource via URI:**
```
→ read_resource("office://a1b2c3d4e5f6/image/0")
<binary PNG data>
```
3. **Resources remain available for the session** (or until cache expires)
## Benefits
1. **Smaller tool responses** - URIs instead of base64 blobs
2. **On-demand fetching** - Client only loads what it needs
3. **Unified access** - Same pattern for images, charts, media, embeds
4. **Cacheable** - Document ID enables client-side caching
5. **Discoverable** - `list_embedded_resources` shows what's available
## Future Extensions
- **Lazy extraction** - Only extract when resource is read, not when listed
- **Thumbnails** - `office://{doc_id}/image/{id}?size=thumb`
- **Format conversion** - `office://{doc_id}/image/{id}?format=webp`
- **Expiration** - TTL on cached resources
- **Persistence** - Optional disk-backed store for large documents

View File

@ -14,6 +14,7 @@ from ..utils import (
resolve_office_file_path,
validate_office_file,
)
from ..resources import resource_store, EmbeddedResource, ResourceStore
class UniversalMixin(MCPMixin):
@ -340,4 +341,282 @@ class UniversalMixin(MCPMixin):
async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
"""Extract basic metadata common to all documents."""
from ..utils import _extract_basic_metadata
return await _extract_basic_metadata(file_path, extension, category)
return await _extract_basic_metadata(file_path, extension, category)
@mcp_tool(
name="index_document",
description="Scan and index all resources in a document (images, chapters, sheets, slides). Returns resource URIs that can be fetched individually. Use this before accessing resources via their URIs."
)
async def index_document(
self,
file_path: str = Field(description="Path to Office document or URL"),
include_images: bool = Field(default=True, description="Index embedded images"),
include_chapters: bool = Field(default=True, description="Index chapters/sections (Word docs)"),
include_sheets: bool = Field(default=True, description="Index sheets (Excel docs)"),
include_slides: bool = Field(default=True, description="Index slides (PowerPoint docs)")
) -> dict[str, Any]:
"""Scan document and populate resource store with available content.
Returns URIs for all indexed resources that can be fetched via MCP resources.
"""
start_time = time.time()
# Resolve and validate
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Generate stable document ID
doc_id = ResourceStore.get_doc_id(local_path)
# Clear any existing resources for this doc
resource_store.clear_document(doc_id)
indexed = {
"doc_id": doc_id,
"file": file_path,
"format": format_info["format_name"],
"resources": {}
}
# Index images
if include_images:
try:
images = await self._extract_images_by_category(
local_path, extension, category, "png", 50, 50
)
for idx, img in enumerate(images):
resource = EmbeddedResource(
resource_id=str(idx),
resource_type="image",
mime_type=img.get("mime_type", "image/png"),
data=img.get("data", b""),
name=img.get("filename"),
metadata={
"width": img.get("width"),
"height": img.get("height"),
"format": img.get("format", "png")
}
)
resource_store.store(doc_id, resource, local_path)
indexed["resources"]["image"] = [
{"id": str(i), "uri": f"image://{doc_id}/{i}"}
for i in range(len(images))
]
except Exception as e:
indexed["resources"]["image"] = {"error": str(e)}
# Index chapters (Word documents)
if include_chapters and category == "word":
try:
chapters = await self._index_word_chapters(local_path, doc_id)
indexed["resources"]["chapter"] = chapters
except Exception as e:
indexed["resources"]["chapter"] = {"error": str(e)}
# Index sheets (Excel documents)
if include_sheets and category == "excel":
try:
sheets = await self._index_excel_sheets(local_path, doc_id)
indexed["resources"]["sheet"] = sheets
except Exception as e:
indexed["resources"]["sheet"] = {"error": str(e)}
# Index slides (PowerPoint documents)
if include_slides and category == "powerpoint":
try:
slides = await self._index_powerpoint_slides(local_path, doc_id)
indexed["resources"]["slide"] = slides
except Exception as e:
indexed["resources"]["slide"] = {"error": str(e)}
indexed["indexing_time"] = round(time.time() - start_time, 3)
indexed["total_resources"] = sum(
len(v) if isinstance(v, list) else 0
for v in indexed["resources"].values()
)
return indexed
async def _index_word_chapters(self, file_path: str, doc_id: str) -> list[dict]:
"""Extract and index chapters from a Word document."""
import re
from docx import Document
doc = Document(file_path)
chapters = []
current_chapter = None
current_paragraphs = []
chapter_pattern = re.compile(r'^chapter\s*(\d+)', re.IGNORECASE)
def save_chapter():
nonlocal current_chapter, current_paragraphs
if current_chapter is not None:
# Convert to markdown
markdown_lines = []
markdown_lines.append(f"# {current_chapter['title']}\n")
for para in current_paragraphs:
text = para.strip()
if text:
markdown_lines.append(text + "\n")
content = "\n".join(markdown_lines)
resource = EmbeddedResource(
resource_id=str(current_chapter["number"]),
resource_type="chapter",
mime_type="text/markdown",
data=content,
name=current_chapter["title"],
metadata={
"word_count": len(content.split()),
"paragraph_count": len(current_paragraphs)
}
)
resource_store.store(doc_id, resource, file_path)
chapters.append({
"id": str(current_chapter["number"]),
"title": current_chapter["title"],
"uri": f"chapter://{doc_id}/{current_chapter['number']}",
"word_count": len(content.split())
})
for para in doc.paragraphs:
text = para.text.strip()
match = chapter_pattern.match(text)
if match:
save_chapter()
current_chapter = {
"number": int(match.group(1)),
"title": text[:100]
}
current_paragraphs = []
elif current_chapter is not None:
current_paragraphs.append(text)
# Save last chapter
save_chapter()
return chapters
async def _index_excel_sheets(self, file_path: str, doc_id: str) -> list[dict]:
"""Extract and index sheets from an Excel document."""
import openpyxl
wb = openpyxl.load_workbook(file_path, data_only=True)
sheets = []
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
# Convert to markdown table
rows = []
for row in ws.iter_rows(values_only=True):
row_data = [str(cell) if cell is not None else "" for cell in row]
if any(row_data): # Skip empty rows
rows.append(row_data)
if not rows:
continue
# Build markdown table
md_lines = []
md_lines.append("| " + " | ".join(rows[0]) + " |")
md_lines.append("| " + " | ".join(["---"] * len(rows[0])) + " |")
for row in rows[1:]:
# Pad row if needed
while len(row) < len(rows[0]):
row.append("")
md_lines.append("| " + " | ".join(row[:len(rows[0])]) + " |")
content = "\n".join(md_lines)
resource = EmbeddedResource(
resource_id=sheet_name,
resource_type="sheet",
mime_type="text/markdown",
data=content,
name=sheet_name,
metadata={
"rows": len(rows),
"columns": len(rows[0]) if rows else 0
}
)
resource_store.store(doc_id, resource, file_path)
sheets.append({
"id": sheet_name,
"name": sheet_name,
"uri": f"sheet://{doc_id}/{sheet_name}",
"rows": len(rows),
"columns": len(rows[0]) if rows else 0
})
wb.close()
return sheets
async def _index_powerpoint_slides(self, file_path: str, doc_id: str) -> list[dict]:
"""Extract and index slides from a PowerPoint document."""
from pptx import Presentation
prs = Presentation(file_path)
slides = []
for idx, slide in enumerate(prs.slides):
slide_num = idx + 1
# Extract text from shapes
text_parts = []
title = None
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
if shape.is_placeholder and hasattr(shape, "placeholder_format"):
if shape.placeholder_format.type == 1: # Title
title = shape.text.strip()
text_parts.append(shape.text.strip())
if not text_parts:
continue
# Build markdown
md_lines = []
if title:
md_lines.append(f"# Slide {slide_num}: {title}\n")
else:
md_lines.append(f"# Slide {slide_num}\n")
for text in text_parts:
if text != title:
md_lines.append(text + "\n")
content = "\n".join(md_lines)
resource = EmbeddedResource(
resource_id=str(slide_num),
resource_type="slide",
mime_type="text/markdown",
data=content,
name=title or f"Slide {slide_num}",
metadata={
"slide_number": slide_num,
"has_title": title is not None
}
)
resource_store.store(doc_id, resource, file_path)
slides.append({
"id": str(slide_num),
"title": title or f"Slide {slide_num}",
"uri": f"slide://{doc_id}/{slide_num}"
})
return slides

View File

@ -0,0 +1,243 @@
"""Resource store for embedded Office document content.
Provides caching and retrieval of extracted resources (images, charts, media, embeds)
and structural content (chapters, pages, sheets) with stable document IDs.
Resource URI Schemes:
Binary content:
image://{doc_id}/{id} - Embedded images
chart://{doc_id}/{id} - Charts (as PNG or data)
media://{doc_id}/{id} - Audio/video files
embed://{doc_id}/{id} - OLE embedded objects
Text/structural content:
chapter://{doc_id}/{num} - Word chapter as markdown
section://{doc_id}/{id} - Document section
page://{doc_id}/{num} - Page content
sheet://{doc_id}/{name} - Excel sheet as markdown/CSV
slide://{doc_id}/{num} - PowerPoint slide content
"""
import hashlib
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union
from pathlib import Path
# Resource type categories
BINARY_TYPES = {"image", "chart", "media", "embed"}
TEXT_TYPES = {"chapter", "section", "page", "sheet", "slide"}
ALL_RESOURCE_TYPES = BINARY_TYPES | TEXT_TYPES
@dataclass
class EmbeddedResource:
"""Represents an embedded resource from an Office document.
Can hold either binary data (images, media) or text content (chapters, sheets).
"""
resource_id: str
resource_type: str # image, chart, media, embed, chapter, section, page, sheet, slide
mime_type: str
data: Union[bytes, str] # bytes for binary, str for text content
name: Optional[str] = None # Original filename or title
metadata: Dict = field(default_factory=dict) # Dimensions, word count, etc.
@property
def uri(self) -> str:
"""Generate the MCP resource URI for this resource."""
return f"{self.resource_type}://{self.metadata.get('doc_id', 'unknown')}/{self.resource_id}"
@property
def is_binary(self) -> bool:
"""Check if this resource contains binary data."""
return self.resource_type in BINARY_TYPES
@property
def is_text(self) -> bool:
"""Check if this resource contains text data."""
return self.resource_type in TEXT_TYPES
@property
def size(self) -> int:
"""Get size in bytes."""
if isinstance(self.data, bytes):
return len(self.data)
return len(self.data.encode('utf-8'))
class ResourceStore:
"""Manages extracted resources from Office documents.
Resources are cached in memory and accessible via MCP resource URIs.
Document IDs are generated from content hashes for stability.
"""
def __init__(self):
# Structure: {doc_id: {resource_type: [EmbeddedResource, ...]}}
self._documents: Dict[str, Dict[str, List[EmbeddedResource]]] = {}
# Track doc_id to file path mapping
self._doc_paths: Dict[str, str] = {}
@staticmethod
def get_doc_id(file_path: str) -> str:
"""Generate stable document ID from file content hash.
Uses first 12 characters of SHA256 hash - enough uniqueness
for practical purposes while keeping URIs readable.
"""
path = Path(file_path)
if not path.exists():
# Fallback to path hash if file doesn't exist
return hashlib.sha256(str(path).encode()).hexdigest()[:12]
with open(path, 'rb') as f:
content_hash = hashlib.sha256(f.read()).hexdigest()
return content_hash[:12]
def store(self, doc_id: str, resource: EmbeddedResource, file_path: Optional[str] = None):
"""Store an extracted resource.
Args:
doc_id: Document identifier (from get_doc_id)
resource: The embedded resource to store
file_path: Optional original file path for reference
"""
if doc_id not in self._documents:
self._documents[doc_id] = {}
rtype = resource.resource_type
if rtype not in self._documents[doc_id]:
self._documents[doc_id][rtype] = []
# Add doc_id to metadata for URI generation
resource.metadata["doc_id"] = doc_id
self._documents[doc_id][rtype].append(resource)
if file_path:
self._doc_paths[doc_id] = file_path
def get(self, doc_id: str, resource_type: str, resource_id: str) -> Optional[EmbeddedResource]:
"""Retrieve a specific resource.
Args:
doc_id: Document identifier
resource_type: Type of resource (image, chart, media, embed)
resource_id: Resource identifier (index or name)
Returns:
EmbeddedResource if found, None otherwise
"""
if doc_id not in self._documents:
return None
resources = self._documents[doc_id].get(resource_type, [])
# Try by index first (most common)
if resource_id.isdigit():
idx = int(resource_id)
if 0 <= idx < len(resources):
return resources[idx]
# Try by resource_id match
for r in resources:
if r.resource_id == resource_id:
return r
# Try by name match
for r in resources:
if r.name and r.name == resource_id:
return r
return None
def list_resources(self, doc_id: str, resource_type: Optional[str] = None) -> Dict[str, List[dict]]:
"""List all resources for a document.
Args:
doc_id: Document identifier
resource_type: Optional filter by type
Returns:
Dict mapping resource types to lists of resource info
"""
if doc_id not in self._documents:
return {}
result = {}
for rtype, resources in self._documents[doc_id].items():
if resource_type and rtype != resource_type:
continue
result[rtype] = [
{
"id": r.resource_id,
"name": r.name,
"mime_type": r.mime_type,
"uri": f"{rtype}://{doc_id}/{r.resource_id}",
"size_bytes": len(r.data),
**{k: v for k, v in r.metadata.items() if k != "doc_id"}
}
for r in resources
]
return result
def get_doc_info(self, doc_id: str) -> Optional[dict]:
"""Get information about a cached document."""
if doc_id not in self._documents:
return None
resource_counts = {
rtype: len(resources)
for rtype, resources in self._documents[doc_id].items()
}
return {
"doc_id": doc_id,
"file_path": self._doc_paths.get(doc_id),
"resource_counts": resource_counts,
"total_resources": sum(resource_counts.values())
}
def clear_document(self, doc_id: str):
"""Remove all cached resources for a document."""
if doc_id in self._documents:
del self._documents[doc_id]
if doc_id in self._doc_paths:
del self._doc_paths[doc_id]
def clear_all(self):
"""Clear all cached resources."""
self._documents.clear()
self._doc_paths.clear()
@property
def cached_documents(self) -> List[str]:
"""List all cached document IDs."""
return list(self._documents.keys())
def get_stats(self) -> dict:
"""Get cache statistics."""
total_resources = 0
total_bytes = 0
type_counts = {}
for doc_id, types in self._documents.items():
for rtype, resources in types.items():
count = len(resources)
total_resources += count
type_counts[rtype] = type_counts.get(rtype, 0) + count
total_bytes += sum(len(r.data) for r in resources)
return {
"documents_cached": len(self._documents),
"total_resources": total_resources,
"total_bytes": total_bytes,
"by_type": type_counts
}
# Global singleton instance
resource_store = ResourceStore()

View File

@ -17,6 +17,7 @@ from fastmcp import FastMCP
from fastmcp.prompts import Prompt
from .mixins import UniversalMixin, WordMixin, ExcelMixin, PowerPointMixin
from .resources import resource_store, BINARY_TYPES, TEXT_TYPES
# Initialize FastMCP app
app = FastMCP("MCP Office Tools")
@ -41,6 +42,276 @@ powerpoint_mixin.register_all(app, prefix="")
# This allows gradual migration while maintaining backward compatibility
# ==================== MCP Resources ====================
# Expose embedded document content via URI-based resources
# Supports format suffixes: .md, .txt, .html (e.g., chapter://doc/3.txt)
# Supports ranges: chapters://doc/1-5, slides://doc/1,3,5
import re as _re
def _parse_format_suffix(resource_id: str) -> tuple[str, str]:
"""Extract format suffix from resource ID.
Examples:
'3.md' -> ('3', 'md')
'3.txt' -> ('3', 'txt')
'3.html' -> ('3', 'html')
'3' -> ('3', 'md') # default to markdown
"""
match = _re.match(r'^(.+)\.(md|txt|html)$', resource_id)
if match:
return match.group(1), match.group(2)
return resource_id, 'md' # default to markdown
def _convert_markdown_to_format(content: str, fmt: str) -> str:
"""Convert markdown content to requested format."""
if fmt == 'md':
return content
elif fmt == 'txt':
# Strip markdown formatting for plain text
text = content
# Remove headers (# ## ###)
text = _re.sub(r'^#+\s+', '', text, flags=_re.MULTILINE)
# Remove bold/italic
text = _re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = _re.sub(r'\*(.+?)\*', r'\1', text)
text = _re.sub(r'__(.+?)__', r'\1', text)
text = _re.sub(r'_(.+?)_', r'\1', text)
# Remove links but keep text
text = _re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
# Remove horizontal rules
text = _re.sub(r'^---+$', '', text, flags=_re.MULTILINE)
return text.strip()
elif fmt == 'html':
# Simple markdown to HTML conversion
html = content
# Headers
html = _re.sub(r'^### (.+)$', r'<h3>\1</h3>', html, flags=_re.MULTILINE)
html = _re.sub(r'^## (.+)$', r'<h2>\1</h2>', html, flags=_re.MULTILINE)
html = _re.sub(r'^# (.+)$', r'<h1>\1</h1>', html, flags=_re.MULTILINE)
# Bold/italic
html = _re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', html)
html = _re.sub(r'\*(.+?)\*', r'<em>\1</em>', html)
# Links
html = _re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'<a href="\2">\1</a>', html)
# Paragraphs
paragraphs = html.split('\n\n')
html = '\n'.join(f'<p>{p.strip()}</p>' if not p.strip().startswith('<h') else p for p in paragraphs if p.strip())
return html
return content
def _parse_range(range_str: str, max_val: int = 1000) -> list[int]:
"""Parse range string like '1-5', '1,3,5', or '1-3,7,9-10' into list of integers."""
result = []
for part in range_str.split(','):
part = part.strip()
if '-' in part:
start, end = part.split('-', 1)
start_int = int(start.strip())
end_int = int(end.strip())
result.extend(range(start_int, min(end_int + 1, max_val + 1)))
else:
result.append(int(part))
return sorted(set(result))
@app.resource(
"image://{doc_id}/{resource_id}",
name="document_image",
description="Embedded image from an Office document"
)
def get_image_resource(doc_id: str, resource_id: str) -> bytes:
"""Retrieve an embedded image."""
resource = resource_store.get(doc_id, "image", resource_id)
if resource is None:
raise ValueError(f"Image not found: image://{doc_id}/{resource_id}")
return resource.data
@app.resource(
"chart://{doc_id}/{resource_id}",
name="document_chart",
description="Chart from an Office document (as image or data)"
)
def get_chart_resource(doc_id: str, resource_id: str) -> bytes:
"""Retrieve a chart."""
resource = resource_store.get(doc_id, "chart", resource_id)
if resource is None:
raise ValueError(f"Chart not found: chart://{doc_id}/{resource_id}")
return resource.data
@app.resource(
"media://{doc_id}/{resource_id}",
name="document_media",
description="Audio or video from an Office document"
)
def get_media_resource(doc_id: str, resource_id: str) -> bytes:
"""Retrieve embedded media."""
resource = resource_store.get(doc_id, "media", resource_id)
if resource is None:
raise ValueError(f"Media not found: media://{doc_id}/{resource_id}")
return resource.data
@app.resource(
"embed://{doc_id}/{resource_id}",
name="embedded_object",
description="Embedded OLE object (PDF, another Office doc, etc.)"
)
def get_embed_resource(doc_id: str, resource_id: str) -> bytes:
"""Retrieve an embedded object."""
resource = resource_store.get(doc_id, "embed", resource_id)
if resource is None:
raise ValueError(f"Embedded object not found: embed://{doc_id}/{resource_id}")
return resource.data
@app.resource(
"chapter://{doc_id}/{resource_id}",
mime_type="text/markdown",
name="document_chapter",
description="Chapter from a Word document. Supports format suffixes: chapter://doc/3.md, chapter://doc/3.txt, chapter://doc/3.html"
)
def get_chapter_resource(doc_id: str, resource_id: str) -> str:
"""Retrieve a chapter with optional format conversion.
Examples:
chapter://abc123/3 -> Chapter 3 as markdown (default)
chapter://abc123/3.md -> Chapter 3 as markdown
chapter://abc123/3.txt -> Chapter 3 as plain text
chapter://abc123/3.html -> Chapter 3 as HTML
"""
chapter_id, fmt = _parse_format_suffix(resource_id)
resource = resource_store.get(doc_id, "chapter", chapter_id)
if resource is None:
raise ValueError(f"Chapter not found: chapter://{doc_id}/{resource_id}")
return _convert_markdown_to_format(resource.data, fmt)
@app.resource(
"section://{doc_id}/{resource_id}",
mime_type="text/markdown",
name="document_section",
description="Section from a document as Markdown"
)
def get_section_resource(doc_id: str, resource_id: str) -> str:
"""Retrieve a section as markdown."""
resource = resource_store.get(doc_id, "section", resource_id)
if resource is None:
raise ValueError(f"Section not found: section://{doc_id}/{resource_id}")
return resource.data
@app.resource(
"sheet://{doc_id}/{resource_id}",
mime_type="text/markdown",
name="excel_sheet",
description="Excel sheet as Markdown table or CSV"
)
def get_sheet_resource(doc_id: str, resource_id: str) -> str:
"""Retrieve an Excel sheet."""
resource = resource_store.get(doc_id, "sheet", resource_id)
if resource is None:
raise ValueError(f"Sheet not found: sheet://{doc_id}/{resource_id}")
return resource.data
@app.resource(
"slide://{doc_id}/{resource_id}",
mime_type="text/markdown",
name="powerpoint_slide",
description="PowerPoint slide content as Markdown"
)
def get_slide_resource(doc_id: str, resource_id: str) -> str:
"""Retrieve a slide as markdown."""
resource = resource_store.get(doc_id, "slide", resource_id)
if resource is None:
raise ValueError(f"Slide not found: slide://{doc_id}/{resource_id}")
return resource.data
# ==================== Range-Based Resources ====================
# Support for fetching multiple items at once: chapters://doc/1-5, slides://doc/1,3,5
@app.resource(
"chapters://{doc_id}/{range_spec}",
mime_type="text/markdown",
name="document_chapters_range",
description="Multiple chapters from a Word document as combined Markdown (e.g., chapters://doc/1-5)"
)
def get_chapters_range(doc_id: str, range_spec: str) -> str:
"""Retrieve multiple chapters as combined markdown.
Range formats: '1-5' (chapters 1-5), '1,3,5' (specific chapters), '1-3,7' (mixed)
"""
chapter_nums = _parse_range(range_spec)
chapters_content = []
for num in chapter_nums:
resource = resource_store.get(doc_id, "chapter", str(num))
if resource is not None:
chapters_content.append(resource.data)
if not chapters_content:
raise ValueError(f"No chapters found for range: chapters://{doc_id}/{range_spec}")
return "\n\n---\n\n".join(chapters_content)
@app.resource(
"slides://{doc_id}/{range_spec}",
mime_type="text/markdown",
name="powerpoint_slides_range",
description="Multiple slides from a PowerPoint as combined Markdown (e.g., slides://doc/1-10)"
)
def get_slides_range(doc_id: str, range_spec: str) -> str:
"""Retrieve multiple slides as combined markdown."""
slide_nums = _parse_range(range_spec)
slides_content = []
for num in slide_nums:
resource = resource_store.get(doc_id, "slide", str(num))
if resource is not None:
slides_content.append(resource.data)
if not slides_content:
raise ValueError(f"No slides found for range: slides://{doc_id}/{range_spec}")
return "\n\n---\n\n".join(slides_content)
@app.resource(
"paragraph://{doc_id}/{chapter_id}/{paragraph_id}",
mime_type="text/markdown",
name="chapter_paragraph",
description="Specific paragraph from a chapter (e.g., paragraph://doc/3/5 for chapter 3, paragraph 5)"
)
def get_paragraph(doc_id: str, chapter_id: str, paragraph_id: str) -> str:
"""Retrieve a specific paragraph from a chapter."""
resource = resource_store.get(doc_id, "chapter", chapter_id)
if resource is None:
raise ValueError(f"Chapter not found: {chapter_id}")
# Split chapter content into paragraphs
paragraphs = [p.strip() for p in resource.data.split('\n\n') if p.strip()]
try:
para_idx = int(paragraph_id)
if 0 <= para_idx < len(paragraphs):
return paragraphs[para_idx]
elif 1 <= para_idx <= len(paragraphs):
# 1-indexed fallback
return paragraphs[para_idx - 1]
else:
raise ValueError(f"Paragraph {paragraph_id} out of range (0-{len(paragraphs)-1})")
except (ValueError, IndexError):
raise ValueError(f"Invalid paragraph: paragraph://{doc_id}/{chapter_id}/{paragraph_id}")
# ==================== MCP Prompts ====================
# Prompts help users understand how to use tools effectively
# Organized from basic to advanced multi-step workflows

View File

@ -59,7 +59,7 @@ class TestMixinArchitecture:
universal = UniversalMixin()
universal.register_all(app)
universal_tools = len(app._tool_manager._tools) - initial_tool_count
assert universal_tools == 6 # 6 universal tools
assert universal_tools == 7 # 7 universal tools (includes index_document)
word = WordMixin()
word.register_all(app)

View File

@ -149,8 +149,8 @@ class TestMixinIntegration:
# Verify no duplicates
assert len(tool_names) == len(set(tool_names)), "Tool names should be unique"
# Verify expected count: 6 universal + 10 word + 3 excel = 19
assert len(tool_names) == 19, f"Expected 19 tools, got {len(tool_names)}: {list(tool_names.keys())}"
# Verify expected count: 7 universal + 10 word + 3 excel = 20
assert len(tool_names) == 20, f"Expected 20 tools, got {len(tool_names)}: {list(tool_names.keys())}"
if __name__ == "__main__":

View File

@ -30,7 +30,7 @@ class TestUniversalMixinRegistration:
mixin.register_all(app)
assert mixin is not None
assert len(app._tool_manager._tools) == 6 # 6 universal tools
assert len(app._tool_manager._tools) == 7 # 7 universal tools (includes index_document)
def test_tool_names_registered(self):
"""Test that all expected tool names are registered."""
@ -43,7 +43,8 @@ class TestUniversalMixinRegistration:
"extract_metadata",
"detect_office_format",
"analyze_document_health",
"get_supported_formats"
"get_supported_formats",
"index_document"
}
registered_tools = set(app._tool_manager._tools.keys())