Ryan Malloy 76c7a0b2d0 Add decorators for field defaults and error handling, fix Excel performance
- Create @resolve_field_defaults decorator to handle Pydantic FieldInfo
  objects when tools are called directly (outside MCP framework)
- Create @handle_office_errors decorator for consistent error wrapping
- Apply decorators to Excel and Word mixins, removing ~100 lines of
  boilerplate code
- Fix Excel formula extraction performance: load workbooks once before
  loop instead of per-cell (100x faster with calculated values)
- Update test suite to use correct mock patch paths (patch where names
  are looked up, not where defined)
- Add torture_test.py for real document validation
2026-01-10 23:51:30 -07:00

473 lines
20 KiB
Python

"""Excel Document Tools Mixin - Specialized tools for Excel spreadsheet processing."""
import time
from typing import Any, List, Optional, Dict
import tempfile
import os
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool
from pydantic import Field
from ..utils import (
OfficeFileError,
resolve_office_file_path,
validate_office_file,
resolve_field_defaults,
handle_office_errors
)
class ExcelMixin(MCPMixin):
"""Mixin containing Excel-specific tools for advanced spreadsheet processing."""
@mcp_tool(
name="analyze_excel_data",
description="Comprehensive statistical analysis of Excel spreadsheet data including data types, missing values, statistics, and data quality assessment."
)
@handle_office_errors("Excel analysis")
@resolve_field_defaults(
sheet_names=[],
include_statistics=True,
detect_data_types=True,
check_data_quality=True
)
async def analyze_excel_data(
self,
file_path: str = Field(description="Path to Excel document or URL"),
sheet_names: List[str] = Field(default=[], description="Specific sheets to analyze (empty = all sheets)"),
include_statistics: bool = Field(default=True, description="Include statistical analysis (mean, median, etc.)"),
detect_data_types: bool = Field(default=True, description="Analyze and detect optimal data types"),
check_data_quality: bool = Field(default=True, description="Check for missing values, duplicates, outliers")
) -> Dict[str, Any]:
"""Analyze Excel data with comprehensive statistics and data quality assessment."""
start_time = time.time()
# Resolve and validate file
resolved_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(resolved_path)
if validation["category"] not in ["excel"]:
raise OfficeFileError(f"File is not an Excel document: {validation['format_name']}")
# Import required libraries
import pandas as pd
import numpy as np
import warnings
# Read Excel file
if validation["extension"] == ".csv":
sheets_data = {"Sheet1": pd.read_csv(resolved_path)}
else:
if sheet_names:
sheets_data = pd.read_excel(resolved_path, sheet_name=sheet_names)
else:
sheets_data = pd.read_excel(resolved_path, sheet_name=None)
analysis_results = {}
for sheet_name, df in sheets_data.items():
sheet_analysis = {
"sheet_name": sheet_name,
"dimensions": {"rows": len(df), "columns": len(df.columns)},
"column_info": {}
}
# Basic column information
for col in df.columns:
col_info = {
"data_type": str(df[col].dtype),
"non_null_count": df[col].count(),
"null_count": df[col].isnull().sum(),
"null_percentage": (df[col].isnull().sum() / len(df)) * 100
}
if detect_data_types:
# Suggest optimal data type
if df[col].dtype == 'object':
# Check if it could be numeric
try:
pd.to_numeric(df[col], errors='raise')
col_info["suggested_type"] = "numeric"
except (ValueError, TypeError):
# Check if it could be datetime (suppress format inference warning)
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Could not infer format.*")
pd.to_datetime(df[col], errors='raise')
col_info["suggested_type"] = "datetime"
except (ValueError, TypeError):
col_info["suggested_type"] = "text"
else:
col_info["suggested_type"] = str(df[col].dtype)
if include_statistics and df[col].dtype in ['int64', 'float64']:
# Numerical statistics
col_info["statistics"] = {
"mean": float(df[col].mean()) if not df[col].isnull().all() else None,
"median": float(df[col].median()) if not df[col].isnull().all() else None,
"std": float(df[col].std()) if not df[col].isnull().all() else None,
"min": float(df[col].min()) if not df[col].isnull().all() else None,
"max": float(df[col].max()) if not df[col].isnull().all() else None,
"q25": float(df[col].quantile(0.25)) if not df[col].isnull().all() else None,
"q75": float(df[col].quantile(0.75)) if not df[col].isnull().all() else None
}
elif include_statistics:
# Categorical statistics
col_info["statistics"] = {
"unique_count": df[col].nunique(),
"most_frequent": str(df[col].mode().iloc[0]) if not df[col].empty and not df[col].mode().empty else None,
"frequency_of_most": int(df[col].value_counts().iloc[0]) if not df[col].empty else 0
}
if check_data_quality:
# Data quality checks
quality_issues = []
# Check for duplicates in column
if df[col].duplicated().any():
quality_issues.append(f"{df[col].duplicated().sum()} duplicate values")
# Check for potential outliers (for numeric columns)
if df[col].dtype in ['int64', 'float64'] and not df[col].isnull().all():
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
outliers = df[(df[col] < (q1 - 1.5 * iqr)) | (df[col] > (q3 + 1.5 * iqr))][col]
if len(outliers) > 0:
quality_issues.append(f"{len(outliers)} potential outliers")
col_info["quality_issues"] = quality_issues
sheet_analysis["column_info"][col] = col_info
if check_data_quality:
# Overall data quality assessment
total_cells = len(df) * len(df.columns)
null_cells = df.isnull().sum().sum()
duplicate_rows = df.duplicated().sum()
sheet_analysis["data_quality"] = {
"completeness_percentage": ((total_cells - null_cells) / total_cells) * 100,
"duplicate_rows": int(duplicate_rows),
"total_rows": len(df),
"data_density": f"{((total_cells - null_cells) / total_cells) * 100:.1f}%"
}
analysis_results[sheet_name] = sheet_analysis
return {
"analysis": analysis_results,
"summary": {
"total_sheets": len(sheets_data),
"sheets_analyzed": list(sheets_data.keys()),
"analysis_time": time.time() - start_time,
"file_info": validation
}
}
@mcp_tool(
name="extract_excel_formulas",
description="Extract and analyze formulas from Excel spreadsheets including formula text, calculated values, dependencies, and validation."
)
@handle_office_errors("Formula extraction")
@resolve_field_defaults(
sheet_names=[],
include_values=True,
analyze_dependencies=True
)
async def extract_excel_formulas(
self,
file_path: str = Field(description="Path to Excel document or URL"),
sheet_names: List[str] = Field(default=[], description="Specific sheets to process (empty = all sheets)"),
include_values: bool = Field(default=True, description="Include calculated values alongside formulas"),
analyze_dependencies: bool = Field(default=True, description="Analyze formula dependencies and references")
) -> Dict[str, Any]:
"""Extract formulas from Excel spreadsheets with analysis."""
start_time = time.time()
import re
# Resolve and validate file
resolved_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(resolved_path)
if validation["category"] not in ["excel"] or validation["extension"] == ".csv":
raise OfficeFileError(f"Formula extraction requires Excel format, got: {validation['format_name']}")
# Import required libraries
import openpyxl
from openpyxl.utils import get_column_letter
# Load workbooks ONCE upfront (performance fix: was loading per-formula)
wb = openpyxl.load_workbook(resolved_path, data_only=False)
wb_with_values = openpyxl.load_workbook(resolved_path, data_only=True) if include_values else None
formulas_data = {}
# Process specified sheets or all sheets
sheets_to_process = sheet_names if sheet_names else wb.sheetnames
for sheet_name in sheets_to_process:
if sheet_name not in wb.sheetnames:
continue
ws = wb[sheet_name]
ws_values = wb_with_values[sheet_name] if wb_with_values else None
sheet_formulas = []
for row in ws.iter_rows():
for cell in row:
if cell.data_type == 'f': # Formula cell
formula_info = {
"cell": f"{get_column_letter(cell.column)}{cell.row}",
"formula": cell.value,
"row": cell.row,
"column": cell.column,
"column_letter": get_column_letter(cell.column)
}
if ws_values:
# Get calculated value from pre-loaded workbook
calculated_cell = ws_values.cell(row=cell.row, column=cell.column)
formula_info["calculated_value"] = calculated_cell.value
if analyze_dependencies:
# Simple dependency analysis
formula_text = str(cell.value)
# Extract cell references (basic pattern matching)
cell_refs = re.findall(r'[A-Z]+\d+', formula_text)
sheet_refs = re.findall(r"'?([^'!]+)'?![A-Z]+\d+", formula_text)
formula_info["dependencies"] = {
"cell_references": list(set(cell_refs)),
"sheet_references": list(set(sheet_refs)),
"external_references": "!" in formula_text and not any(ref in formula_text for ref in wb.sheetnames)
}
sheet_formulas.append(formula_info)
formulas_data[sheet_name] = {
"formulas": sheet_formulas,
"formula_count": len(sheet_formulas),
"sheet_info": {
"total_cells": ws.max_row * ws.max_column,
"formula_density": (len(sheet_formulas) / (ws.max_row * ws.max_column)) * 100 if ws.max_row and ws.max_column else 0
}
}
# Cleanup
if wb_with_values:
wb_with_values.close()
wb.close()
# Generate summary statistics
total_formulas = sum(len(data["formulas"]) for data in formulas_data.values())
return {
"formulas": formulas_data,
"summary": {
"total_formulas": total_formulas,
"sheets_processed": len(formulas_data),
"extraction_time": time.time() - start_time,
"file_info": validation
}
}
@mcp_tool(
name="create_excel_chart_data",
description="Analyze Excel data and generate chart configurations for popular visualization libraries (Chart.js, Plotly, Matplotlib) with data preparation."
)
@handle_office_errors("Chart data generation")
@resolve_field_defaults(
sheet_name="",
chart_type="auto",
x_column="",
y_columns=[],
output_format="chartjs"
)
async def create_excel_chart_data(
self,
file_path: str = Field(description="Path to Excel document or URL"),
sheet_name: str = Field(default="", description="Sheet to process (empty = first sheet)"),
chart_type: str = Field(default="auto", description="Chart type: auto, bar, line, pie, scatter, histogram"),
x_column: str = Field(default="", description="Column for X-axis (empty = auto-detect)"),
y_columns: List[str] = Field(default=[], description="Columns for Y-axis (empty = auto-detect)"),
output_format: str = Field(default="chartjs", description="Output format: chartjs, plotly, matplotlib, all")
) -> Dict[str, Any]:
"""Generate chart-ready data and configurations from Excel spreadsheets."""
start_time = time.time()
# Resolve and validate file
resolved_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(resolved_path)
if validation["category"] not in ["excel"]:
raise OfficeFileError(f"File is not an Excel document: {validation['format_name']}")
# Import required libraries
import pandas as pd
# Read Excel file
if validation["extension"] == ".csv":
df = pd.read_csv(resolved_path)
used_sheet = "CSV Data"
else:
if sheet_name:
df = pd.read_excel(resolved_path, sheet_name=sheet_name)
used_sheet = sheet_name
else:
# Use first sheet
excel_data = pd.read_excel(resolved_path, sheet_name=None)
first_sheet = list(excel_data.keys())[0]
df = excel_data[first_sheet]
used_sheet = first_sheet
# Auto-detect columns if not specified
if not x_column:
# Look for text/date columns for X-axis
text_cols = df.select_dtypes(include=['object', 'datetime64']).columns
x_column = text_cols[0] if len(text_cols) > 0 else df.columns[0]
if not y_columns:
# Look for numeric columns for Y-axis
numeric_cols = df.select_dtypes(include=['number']).columns
# Remove x_column if it's numeric
y_columns = [col for col in numeric_cols if col != x_column][:3] # Limit to 3 series
# Auto-detect chart type if needed
if chart_type == "auto":
if len(df) > 50:
chart_type = "line" # Line chart for time series
elif df[x_column].dtype == 'object' and len(df[x_column].unique()) < 20:
chart_type = "bar" # Bar chart for categories
elif len(y_columns) == 1:
chart_type = "scatter" # Scatter for single numeric relationship
else:
chart_type = "line" # Default to line
# Prepare data
chart_data = {
"source_data": {
"x_column": x_column,
"y_columns": y_columns,
"chart_type": chart_type,
"data_points": len(df)
},
"processed_data": {}
}
# Clean and prepare the data
clean_df = df[[x_column] + y_columns].dropna()
# Generate Chart.js configuration
if output_format in ["chartjs", "all"]:
chartjs_config = {
"type": chart_type,
"data": {
"labels": clean_df[x_column].astype(str).tolist(),
"datasets": []
},
"options": {
"responsive": True,
"plugins": {
"title": {
"display": True,
"text": f"Chart from {used_sheet}"
}
},
"scales": {
"x": {"title": {"display": True, "text": x_column}},
"y": {"title": {"display": True, "text": "Values"}}
}
}
}
colors = ["rgb(255, 99, 132)", "rgb(54, 162, 235)", "rgb(255, 205, 86)", "rgb(75, 192, 192)"]
for i, y_col in enumerate(y_columns):
dataset = {
"label": y_col,
"data": clean_df[y_col].tolist(),
"borderColor": colors[i % len(colors)],
"backgroundColor": colors[i % len(colors)].replace("rgb", "rgba").replace(")", ", 0.2)")
}
chartjs_config["data"]["datasets"].append(dataset)
chart_data["processed_data"]["chartjs"] = chartjs_config
# Generate Plotly configuration
if output_format in ["plotly", "all"]:
plotly_config = {
"data": [],
"layout": {
"title": f"Chart from {used_sheet}",
"xaxis": {"title": x_column},
"yaxis": {"title": "Values"}
}
}
for y_col in y_columns:
trace = {
"x": clean_df[x_column].tolist(),
"y": clean_df[y_col].tolist(),
"name": y_col,
"type": "scatter" if chart_type == "scatter" else chart_type
}
if chart_type == "line":
trace["mode"] = "lines+markers"
plotly_config["data"].append(trace)
chart_data["processed_data"]["plotly"] = plotly_config
# Generate Matplotlib code template
if output_format in ["matplotlib", "all"]:
matplotlib_code = f"""
import matplotlib.pyplot as plt
import pandas as pd
# Data preparation
x_data = {clean_df[x_column].tolist()}
"""
for y_col in y_columns:
matplotlib_code += f"{y_col.replace(' ', '_')}_data = {clean_df[y_col].tolist()}\n"
matplotlib_code += f"""
# Create the plot
plt.figure(figsize=(10, 6))
"""
if chart_type == "bar":
for i, y_col in enumerate(y_columns):
matplotlib_code += f"plt.bar(x_data, {y_col.replace(' ', '_')}_data, label='{y_col}', alpha=0.7)\n"
elif chart_type == "line":
for y_col in y_columns:
matplotlib_code += f"plt.plot(x_data, {y_col.replace(' ', '_')}_data, label='{y_col}', marker='o')\n"
elif chart_type == "scatter":
for y_col in y_columns:
matplotlib_code += f"plt.scatter(x_data, {y_col.replace(' ', '_')}_data, label='{y_col}', alpha=0.7)\n"
matplotlib_code += f"""
plt.xlabel('{x_column}')
plt.ylabel('Values')
plt.title('Chart from {used_sheet}')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
"""
chart_data["processed_data"]["matplotlib"] = matplotlib_code
return {
"chart_configuration": chart_data,
"data_summary": {
"original_rows": len(df),
"clean_rows": len(clean_df),
"x_column": x_column,
"y_columns": y_columns,
"chart_type": chart_type,
"sheet_used": used_sheet
},
"generation_time": time.time() - start_time,
"file_info": validation
}