enhanced-mcp-tools/enhanced_mcp/git_integration.py
Ryan Malloy 1d199a943d 🛡️ SACRED TRUST: Complete safety framework implementation & validation
 COMPREHENSIVE SAFETY FRAMEWORK:
• Package-level safety notices with SACRED TRUST language
• Server-level LLM safety protocols with specific refusal scenarios
• Class-level safety reminders for AI assistants
• Tool-level destructive operation warnings (🔴 DESTRUCTIVE markers)
• Visual safety system: 🔴🛡️🚨 markers throughout codebase
• Emergency logging infrastructure with proper escalation
• Default-safe operations (dry_run=True for destructive tools)

🔒 DESTRUCTIVE OPERATION PROTECTIONS:
• bulk_rename: LLM safety instructions + dry_run default
• search_and_replace_batch: Comprehensive safety warnings
• All destructive tools require preview before execution
• Clear REFUSE scenarios for AI assistants

📚 COMPREHENSIVE DOCUMENTATION:
• SACRED_TRUST_SAFETY.md: Complete safety philosophy & implementation guide
• IMPLEMENTATION_COMPLETE.md: Project completion status
• EMERGENCY_LOGGING_COMPLETE.md: Logging infrastructure details
• UV_BUILD_GUIDE.md: Modern Python project setup
• Multiple implementation guides and status docs

🔧 PROJECT MODERNIZATION:
• Migrated from setup.py/requirements.txt to pyproject.toml + uv
• Updated dependency management with uv.lock
• Enhanced test suite with comprehensive coverage
• Added examples and demo scripts

 VALIDATION COMPLETE: All SACRED_TRUST_SAFETY.md requirements implemented
🎯 Sacred Trust Status: PROTECTED
🚨 User Safety: PARAMOUNT
🔐 System Integrity: PRESERVED

The human trusts AI assistants to be guardians of their system and data.
This framework ensures that trust is honored through comprehensive safety measures.
2025-06-23 11:58:48 -06:00

813 lines
29 KiB
Python

"""
Git Integration Module
Provides advanced git operations, code search, and repository analysis.
"""
from .base import *
class GitIntegration(MCPMixin):
"""Git integration tools"""
@mcp_tool(name="git_status", description="Get comprehensive git repository status")
async def git_status(
self, repository_path: str, include_untracked: Optional[bool] = True, ctx: Context = None
) -> Dict[str, Any]:
"""Get git repository status with modified, staged, and untracked files"""
try:
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=repository_path,
capture_output=True,
text=True,
)
if result.returncode != 0:
return {"error": f"Git command failed: {result.stderr}"}
status = {"modified": [], "staged": [], "untracked": [], "deleted": []}
for line in result.stdout.strip().split("\n"):
if not line:
continue
status_code = line[:2]
filename = line[3:]
if status_code[0] in ["M", "A", "D", "R", "C"]:
status["staged"].append(filename)
if status_code[1] in ["M", "D"]:
status["modified"].append(filename)
elif status_code[1] == "?":
status["untracked"].append(filename)
if ctx:
await ctx.info(f"Git status retrieved for {repository_path}")
return status
except Exception as e:
if ctx:
await ctx.error(f"git status failed: {str(e)}")
return {"error": str(e)}
@mcp_tool(name="git_diff", description="Show git diffs with intelligent formatting")
async def git_diff(
self,
repository_path: str,
staged: Optional[bool] = False,
file_path: Optional[str] = None,
commit_range: Optional[str] = None,
ctx: Context = None,
) -> str:
"""Show git diffs with syntax highlighting and statistics"""
try:
cmd = ["git", "diff"]
if staged:
cmd.append("--cached")
if commit_range:
cmd.append(commit_range)
if file_path:
cmd.append("--")
cmd.append(file_path)
result = subprocess.run(cmd, cwd=repository_path, capture_output=True, text=True)
if result.returncode != 0:
return f"Git diff failed: {result.stderr}"
if ctx:
await ctx.info(f"Git diff generated for {repository_path}")
return result.stdout
except Exception as e:
if ctx:
await ctx.error(f"git diff failed: {str(e)}")
return f"Error: {str(e)}"
@mcp_tool(
name="git_grep",
description="🔍 Advanced git grep with annotations, context, and intelligent filtering",
)
async def git_grep(
self,
repository_path: str,
pattern: str,
search_type: Optional[
Literal["basic", "regex", "fixed-string", "extended-regex"]
] = "basic",
file_patterns: Optional[List[str]] = None,
exclude_patterns: Optional[List[str]] = None,
context_lines: Optional[int] = 0,
show_line_numbers: Optional[bool] = True,
case_sensitive: Optional[bool] = True,
whole_words: Optional[bool] = False,
invert_match: Optional[bool] = False,
max_results: Optional[int] = 1000,
git_ref: Optional[str] = None,
include_untracked: Optional[bool] = False,
annotations: Optional[bool] = True,
ctx: Context = None,
) -> Dict[str, Any]:
"""Advanced git grep with comprehensive search capabilities and intelligent annotations.
🔍 SEARCH MODES:
- basic: Simple text search (default)
- regex: Perl-compatible regular expressions
- fixed-string: Literal string search (fastest)
- extended-regex: Extended regex with more features
📝 ANNOTATIONS:
- File metadata (size, last modified, git status)
- Match context with syntax highlighting hints
- Pattern analysis and suggestions
- Performance statistics and optimization hints
Args:
repository_path: Path to git repository
pattern: Search pattern (text, regex, or fixed string)
search_type: Type of search to perform
file_patterns: Glob patterns for files to include (e.g., ['*.py', '*.js'])
exclude_patterns: Glob patterns for files to exclude (e.g., ['*.pyc', '__pycache__'])
context_lines: Number of context lines before/after matches
show_line_numbers: Include line numbers in results
case_sensitive: Perform case-sensitive search
whole_words: Match whole words only
invert_match: Show lines that DON'T match the pattern
max_results: Maximum number of matches to return
git_ref: Search in specific git ref (branch/commit/tag)
include_untracked: Include untracked files in search
annotations: Include intelligent annotations and metadata
Returns:
Comprehensive search results with annotations and metadata
"""
try:
repo_path = Path(repository_path)
if not repo_path.exists():
return {"error": f"Repository path not found: {repository_path}"}
if not (repo_path / ".git").exists():
return {"error": f"Not a git repository: {repository_path}"}
if ctx:
await ctx.info(f"Starting git grep search for pattern: '{pattern}'")
cmd = ["git", "grep"]
if search_type == "regex":
cmd.append("--perl-regexp")
elif search_type == "fixed-string":
cmd.append("--fixed-strings")
elif search_type == "extended-regex":
cmd.append("--extended-regexp")
if not case_sensitive:
cmd.append("--ignore-case")
if whole_words:
cmd.append("--word-regexp")
if invert_match:
cmd.append("--invert-match")
if show_line_numbers:
cmd.append("--line-number")
if context_lines > 0:
cmd.extend([f"--context={context_lines}"])
cmd.append(pattern)
if git_ref:
cmd.append(git_ref)
if file_patterns:
cmd.append("--")
for file_pattern in file_patterns:
cmd.append(file_pattern)
search_start = time.time()
if ctx:
await ctx.info(f"Executing: {' '.join(cmd)}")
result = subprocess.run(
cmd,
cwd=repository_path,
capture_output=True,
text=True,
timeout=30, # 30 second timeout
)
search_duration = time.time() - search_start
if result.returncode > 1:
return {"error": f"Git grep failed: {result.stderr}"}
matches = []
files_searched = set()
total_matches = 0
if result.stdout:
lines = result.stdout.strip().split("\n")
for line in lines[:max_results]: # Limit results
if ":" in line:
parts = line.split(":", 2)
if len(parts) >= 3:
filename = parts[0]
line_number = parts[1]
content = parts[2]
try:
line_num = int(line_number)
except ValueError:
continue # Skip malformed lines
files_searched.add(filename)
total_matches += 1
match_info = {
"file": filename,
"line_number": line_num,
"content": content,
"match_type": "exact",
}
if annotations:
match_info["annotations"] = await self._annotate_git_grep_match(
repo_path,
filename,
line_num,
content,
pattern,
search_type,
ctx,
)
matches.append(match_info)
elif "-" in line and context_lines > 0:
parts = line.split("-", 2)
if len(parts) >= 3:
filename = parts[0]
line_number = parts[1]
content = parts[2]
try:
line_num = int(line_number)
except ValueError:
continue
files_searched.add(filename)
match_info = {
"file": filename,
"line_number": line_num,
"content": content,
"match_type": "context",
}
matches.append(match_info)
untracked_matches = []
if include_untracked and result.returncode == 1: # No matches in tracked files
untracked_matches = await self._search_untracked_files(
repo_path,
pattern,
search_type,
file_patterns,
exclude_patterns,
context_lines,
show_line_numbers,
case_sensitive,
whole_words,
invert_match,
max_results,
annotations,
ctx,
)
search_result = {
"repository_path": str(repo_path),
"pattern": pattern,
"search_type": search_type,
"total_matches": total_matches,
"files_with_matches": len(files_searched),
"files_searched": list(files_searched),
"matches": matches,
"untracked_matches": untracked_matches,
"performance": {
"search_duration_seconds": round(search_duration, 3),
"matches_per_second": (
round(total_matches / search_duration, 2) if search_duration > 0 else 0
),
"git_grep_exit_code": result.returncode,
},
"search_parameters": {
"context_lines": context_lines,
"case_sensitive": case_sensitive,
"whole_words": whole_words,
"invert_match": invert_match,
"max_results": max_results,
"git_ref": git_ref,
"include_untracked": include_untracked,
"file_patterns": file_patterns,
"exclude_patterns": exclude_patterns,
},
}
if annotations:
search_result["annotations"] = await self._generate_search_annotations(
search_result, pattern, search_type, repo_path, ctx
)
if ctx:
await ctx.info(
f"Git grep completed: {total_matches} matches in {len(files_searched)} files "
f"in {search_duration:.2f}s"
)
return search_result
except subprocess.TimeoutExpired:
error_msg = "Git grep search timed out (>30s)"
if ctx:
await ctx.error(error_msg)
return {"error": error_msg}
except Exception as e:
error_msg = f"Git grep failed: {str(e)}"
if ctx:
await ctx.error(f"CRITICAL: {error_msg} | Exception: {type(e).__name__}")
return {"error": error_msg}
async def _annotate_git_grep_match(
self,
repo_path: Path,
filename: str,
line_number: int,
content: str,
pattern: str,
search_type: str,
ctx: Context,
) -> Dict[str, Any]:
"""Generate intelligent annotations for a git grep match"""
try:
file_path = repo_path / filename
annotations = {
"file_info": {},
"match_analysis": {},
"context_hints": {},
"suggestions": [],
}
if file_path.exists():
stat_info = file_path.stat()
annotations["file_info"] = {
"size_bytes": stat_info.st_size,
"modified_timestamp": stat_info.st_mtime,
"modified_iso": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
"extension": file_path.suffix,
"is_binary": self._is_likely_binary(file_path),
"estimated_lines": await self._estimate_file_lines(file_path),
}
try:
git_status = subprocess.run(
["git", "status", "--porcelain", filename],
cwd=repo_path,
capture_output=True,
text=True,
timeout=5,
)
if git_status.returncode == 0:
status_line = git_status.stdout.strip()
if status_line:
annotations["file_info"]["git_status"] = status_line[:2]
else:
annotations["file_info"]["git_status"] = "clean"
except Exception:
annotations["file_info"]["git_status"] = "unknown"
annotations["match_analysis"] = {
"line_length": len(content),
"leading_whitespace": len(content) - len(content.lstrip()),
"pattern_occurrences": (
content.count(pattern) if search_type == "fixed-string" else 1
),
"context_type": self._detect_code_context(content, file_path.suffix),
}
if file_path.suffix in [".py", ".js", ".ts", ".java", ".cpp", ".c"]:
annotations["context_hints"]["language"] = self._detect_language(file_path.suffix)
annotations["context_hints"]["likely_element"] = self._analyze_code_element(content)
suggestions = []
if search_type == "basic" and any(char in pattern for char in r".*+?[]{}()|\^$"):
suggestions.append(
{
"type": "optimization",
"message": "Pattern contains regex characters. Consider using --perl-regexp for regex search.",
"command_hint": f"git grep --perl-regexp '{pattern}'",
}
)
if file_path.suffix and not any(
f"*.{file_path.suffix[1:]}" in str(pattern) for pattern in []
):
suggestions.append(
{
"type": "scope",
"message": f"Consider limiting search to {file_path.suffix} files for better performance.",
"command_hint": f"git grep '{pattern}' -- '*.{file_path.suffix[1:]}'",
}
)
annotations["suggestions"] = suggestions
return annotations
except Exception as e:
return {"error": f"Failed to generate annotations: {str(e)}"}
async def _search_untracked_files(
self,
repo_path: Path,
pattern: str,
search_type: str,
file_patterns: Optional[List[str]],
exclude_patterns: Optional[List[str]],
context_lines: int,
show_line_numbers: bool,
case_sensitive: bool,
whole_words: bool,
invert_match: bool,
max_results: int,
annotations: bool,
ctx: Context,
) -> List[Dict[str, Any]]:
"""Search untracked files using traditional grep"""
try:
untracked_result = subprocess.run(
["git", "ls-files", "--others", "--exclude-standard"],
cwd=repo_path,
capture_output=True,
text=True,
timeout=10,
)
if untracked_result.returncode != 0:
return []
untracked_files = [f for f in untracked_result.stdout.strip().split("\n") if f]
if not untracked_files:
return []
cmd = ["grep"]
if search_type == "regex":
cmd.append("--perl-regexp")
elif search_type == "fixed-string":
cmd.append("--fixed-strings")
elif search_type == "extended-regex":
cmd.append("--extended-regexp")
if not case_sensitive:
cmd.append("--ignore-case")
if whole_words:
cmd.append("--word-regexp")
if invert_match:
cmd.append("--invert-match")
if show_line_numbers:
cmd.append("--line-number")
if context_lines > 0:
cmd.extend([f"--context={context_lines}"])
cmd.extend(["--with-filename", pattern])
cmd.extend(untracked_files)
grep_result = subprocess.run(
cmd, cwd=repo_path, capture_output=True, text=True, timeout=15
)
matches = []
if grep_result.stdout:
lines = grep_result.stdout.strip().split("\n")
for line in lines[:max_results]:
if ":" in line:
parts = line.split(":", 2)
if len(parts) >= 3:
filename = parts[0]
line_number = parts[1]
content = parts[2]
try:
line_num = int(line_number)
except ValueError:
continue
match_info = {
"file": filename,
"line_number": line_num,
"content": content,
"match_type": "untracked",
"file_status": "untracked",
}
if annotations:
match_info["annotations"] = await self._annotate_git_grep_match(
repo_path,
filename,
line_num,
content,
pattern,
search_type,
ctx,
)
matches.append(match_info)
return matches
except Exception as e:
if ctx:
await ctx.warning(f"Failed to search untracked files: {str(e)}")
return []
async def _generate_search_annotations(
self,
search_result: Dict[str, Any],
pattern: str,
search_type: str,
repo_path: Path,
ctx: Context,
) -> Dict[str, Any]:
"""Generate comprehensive search annotations and insights"""
try:
annotations = {
"search_insights": {},
"performance_analysis": {},
"pattern_analysis": {},
"optimization_suggestions": [],
}
total_matches = search_result["total_matches"]
files_with_matches = search_result["files_with_matches"]
search_duration = search_result["performance"]["search_duration_seconds"]
annotations["search_insights"] = {
"match_density": round(total_matches / max(files_with_matches, 1), 2),
"search_efficiency": (
"high"
if search_duration < 1.0
else "medium" if search_duration < 5.0 else "low"
),
"coverage_assessment": await self._assess_search_coverage(
repo_path, search_result, ctx
),
}
annotations["performance_analysis"] = {
"is_fast_search": search_duration < 1.0,
"bottlenecks": [],
"optimization_potential": (
"high"
if search_duration > 5.0
else "medium" if search_duration > 2.0 else "low"
),
}
if search_duration > 2.0:
annotations["performance_analysis"]["bottlenecks"].append(
"Large repository or complex pattern"
)
annotations["pattern_analysis"] = {
"pattern_type": self._analyze_pattern_type(pattern, search_type),
"complexity": self._assess_pattern_complexity(pattern, search_type),
"suggested_improvements": [],
}
suggestions = []
if search_duration > 5.0:
suggestions.append(
{
"type": "performance",
"priority": "high",
"suggestion": "Consider adding file type filters to reduce search scope",
"example": f"git grep '{pattern}' -- '*.py' '*.js'",
}
)
if total_matches > 500:
suggestions.append(
{
"type": "refinement",
"priority": "medium",
"suggestion": "Large number of matches. Consider refining the pattern for more specific results",
"example": f"git grep '{pattern}' | head -20",
}
)
if search_type == "basic" and len(pattern) < 3:
suggestions.append(
{
"type": "accuracy",
"priority": "medium",
"suggestion": "Short patterns may produce many false positives. Consider using word boundaries",
"example": f"git grep --word-regexp '{pattern}'",
}
)
annotations["optimization_suggestions"] = suggestions
return annotations
except Exception as e:
return {"error": f"Failed to generate search annotations: {str(e)}"}
async def _assess_search_coverage(
self, repo_path: Path, search_result: Dict[str, Any], ctx: Context
) -> str:
"""Assess how comprehensive the search coverage was"""
try:
ls_files_result = subprocess.run(
["git", "ls-files"], cwd=repo_path, capture_output=True, text=True, timeout=10
)
if ls_files_result.returncode != 0:
return "unknown"
total_files = len([f for f in ls_files_result.stdout.strip().split("\n") if f])
files_searched = len(search_result["files_searched"])
if files_searched == 0:
return "no_matches"
elif files_searched / total_files > 0.5:
return "comprehensive"
elif files_searched / total_files > 0.1:
return "moderate"
else:
return "limited"
except Exception:
return "unknown"
def _is_likely_binary(self, file_path: Path) -> bool:
"""Check if file is likely binary"""
try:
with open(file_path, "rb") as f:
chunk = f.read(8192)
return b"\0" in chunk
except Exception:
return False
async def _estimate_file_lines(self, file_path: Path) -> int:
"""Estimate number of lines in file"""
try:
with open(file_path, "rb") as f:
return sum(1 for _ in f)
except Exception:
return 0
def _detect_code_context(self, content: str, file_extension: str) -> str:
"""Detect what type of code context this match represents"""
content_lower = content.strip().lower()
if file_extension in [".py"]:
if content_lower.startswith("def "):
return "function_definition"
elif content_lower.startswith("class "):
return "class_definition"
elif content_lower.startswith("import ") or content_lower.startswith("from "):
return "import_statement"
elif "#" in content:
return "comment"
elif file_extension in [".js", ".ts"]:
if "function" in content_lower:
return "function_definition"
elif "class" in content_lower:
return "class_definition"
elif "//" in content or "/*" in content:
return "comment"
elif file_extension in [".md"]:
if content.strip().startswith("#"):
return "markdown_heading"
return "code_line"
def _detect_language(self, file_extension: str) -> str:
"""Detect programming language from file extension"""
lang_map = {
".py": "Python",
".js": "JavaScript",
".ts": "TypeScript",
".java": "Java",
".cpp": "C++",
".c": "C",
".go": "Go",
".rs": "Rust",
".rb": "Ruby",
".php": "PHP",
".sh": "Shell",
".md": "Markdown",
".yml": "YAML",
".yaml": "YAML",
".json": "JSON",
".xml": "XML",
".html": "HTML",
".css": "CSS",
}
return lang_map.get(file_extension, "Unknown")
def _analyze_code_element(self, content: str) -> str:
"""Analyze what code element this line likely represents"""
stripped = content.strip()
if not stripped:
return "empty_line"
elif stripped.startswith("#") or stripped.startswith("//"):
return "comment"
elif any(keyword in stripped for keyword in ["def ", "function ", "class ", "interface "]):
return "definition"
elif any(keyword in stripped for keyword in ["import ", "from ", "#include", "require("]):
return "import"
elif "=" in stripped and not any(op in stripped for op in ["==", "!=", "<=", ">="]):
return "assignment"
elif any(keyword in stripped for keyword in ["if ", "else", "elif", "while ", "for "]):
return "control_flow"
elif stripped.endswith(":") or stripped.endswith("{"):
return "block_start"
else:
return "statement"
def _analyze_pattern_type(self, pattern: str, search_type: str) -> str:
"""Analyze the type of search pattern"""
if search_type == "fixed-string":
return "literal_string"
elif search_type in ["regex", "extended-regex"]:
if any(char in pattern for char in r".*+?[]{}()|\^$"):
return "complex_regex"
else:
return "simple_regex"
else: # basic
if any(char in pattern for char in r".*+?[]{}()|\^$"):
return "basic_with_special_chars"
else:
return "simple_text"
def _assess_pattern_complexity(self, pattern: str, search_type: str) -> str:
"""Assess the complexity of the search pattern"""
if search_type == "fixed-string":
return "low"
complexity_indicators = [
r"\w",
r"\d",
r"\s", # Character classes
r".*",
r".+",
r".?", # Quantifiers
r"[",
r"]", # Character sets
r"(",
r")", # Groups
r"|", # Alternation
r"^",
r"$", # Anchors
r"\\", # Escapes
]
complexity_score = sum(1 for indicator in complexity_indicators if indicator in pattern)
if complexity_score == 0:
return "low"
elif complexity_score <= 3:
return "medium"
else:
return "high"
@mcp_tool(
name="git_commit_prepare",
description="Intelligent commit preparation with AI-suggested messages",
)
def git_commit_prepare(
self, repository_path: str, files: List[str], suggest_message: Optional[bool] = True
) -> Dict[str, Any]:
"""Prepare git commit with suggested message"""
raise NotImplementedError("git_commit_prepare not implemented")