""" BulkToolCaller Integration Examples Demonstrates how to integrate BulkToolCaller with existing Enhanced MCP Tools architecture including SecurityManager integration and workflow orchestration patterns. """ import asyncio import sys from pathlib import Path # Add src to path for imports sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from enhanced_mcp.bulk_operations import BulkToolCaller, BulkOperationMode from enhanced_mcp.security_manager import SecurityManager from enhanced_mcp.file_operations import EnhancedFileOperations from enhanced_mcp.git_integration import GitIntegration from enhanced_mcp.workflow_tools import DevelopmentWorkflow, AdvancedSearchAnalysis from enhanced_mcp.base import FastMCP, SecurityLevel class IntegratedMCPServer: """Example of integrating BulkToolCaller with existing Enhanced MCP Tools""" def __init__(self): # Initialize core modules self.security_manager = SecurityManager() self.bulk_caller = BulkToolCaller() self.file_ops = EnhancedFileOperations() self.git_ops = GitIntegration() self.dev_workflow = DevelopmentWorkflow() self.search_analysis = AdvancedSearchAnalysis() # Store all modules for registration self.modules = { "security_manager": self.security_manager, "bulk_operations": self.bulk_caller, "file_ops": self.file_ops, "git": self.git_ops, "dev_workflow": self.dev_workflow, "search_analysis": self.search_analysis } # Setup integration self._setup_integration() def _setup_integration(self): """Setup integration between modules""" # Register tool modules with security manager for name, module in self.modules.items(): if name != "security_manager" and hasattr(module, '_tool_metadata'): self.security_manager.register_tool_module(name, module) # Setup BulkToolCaller with security manager self.bulk_caller.set_security_manager(self.security_manager) # Register tool executors with BulkToolCaller self._register_tool_executors() def _register_tool_executors(self): """Register tool executor functions for bulk operations""" # File operations self.bulk_caller.register_tool_executor( "file_ops_read_file", self.file_ops.read_file ) self.bulk_caller.register_tool_executor( "file_ops_write_file", self.file_ops.write_file ) self.bulk_caller.register_tool_executor( "file_ops_create_backup", self.file_ops.create_backup ) # Git operations self.bulk_caller.register_tool_executor( "git_status", self.git_ops.get_git_status ) self.bulk_caller.register_tool_executor( "git_commit", self.git_ops.commit_changes ) # Search and analysis self.bulk_caller.register_tool_executor( "search_analysis_advanced_search", self.search_analysis.advanced_search ) self.bulk_caller.register_tool_executor( "search_analysis_security_pattern_scan", self.search_analysis.security_pattern_scan ) # Development workflow self.bulk_caller.register_tool_executor( "dev_workflow_run_tests", self.dev_workflow.run_tests ) self.bulk_caller.register_tool_executor( "dev_workflow_analyze_dependencies", self.dev_workflow.analyze_dependencies ) def create_fastmcp_app(self) -> FastMCP: """Create FastMCP application with all modules registered""" app = FastMCP("Enhanced MCP Tools with Bulk Operations") # Register all modules with enhanced registration for name, module in self.modules.items(): if hasattr(module, 'safe_register_all'): module.safe_register_all(app, prefix=name) else: module.register_all(app, prefix=name) return app async def demo_code_analysis_workflow(): """Demonstrate creating and executing a code analysis workflow""" print("๐Ÿ” Demo: Code Analysis Workflow") print("=" * 50) # Create integrated server server = IntegratedMCPServer() # Create code analysis workflow workflow_result = await server.bulk_caller.create_code_analysis_workflow( name="Demo Code Analysis", target_path="/path/to/project", include_patterns=["*.py", "*.js", "*.ts"], exclude_patterns=["node_modules/*", "*.min.js"] ) print(f"โœ… Created workflow: {workflow_result}") if workflow_result.get("success"): workflow_id = workflow_result["workflow_id"] # Perform dry run validation print("\n๐Ÿงช Running dry run validation...") dry_run_result = await server.bulk_caller.dry_run_bulk_workflow(workflow_id) print(f"Dry run result: {dry_run_result}") # Check if ready for execution if dry_run_result.get("ready_for_execution"): print("\nโœ… Workflow is ready for execution!") # Execute in dry run mode first execution_result = await server.bulk_caller.execute_bulk_workflow( workflow_id=workflow_id, dry_run=True, # Always start with dry run confirm_destructive=False ) print(f"Execution result: {execution_result}") else: print("\nโŒ Workflow has safety issues and is not ready for execution") print(f"Issues: {dry_run_result.get('safety_issues', [])}") async def demo_fix_and_test_workflow(): """Demonstrate creating and executing a fix and test workflow""" print("\n๐Ÿ”ง Demo: Fix and Test Workflow") print("=" * 50) server = IntegratedMCPServer() # First enable destructive tools (required for file modifications) enable_result = await server.security_manager.enable_destructive_tools( enabled=True, confirm_destructive=True ) print(f"Enabled destructive tools: {enable_result}") # Create fix and test workflow workflow_result = await server.bulk_caller.create_fix_and_test_workflow( name="Demo Fix and Test", target_files=["/path/to/file1.py", "/path/to/file2.py"], backup_enabled=True, run_tests=True ) print(f"โœ… Created workflow: {workflow_result}") if workflow_result.get("success"): workflow_id = workflow_result["workflow_id"] # Dry run first print("\n๐Ÿงช Running comprehensive dry run...") dry_run_result = await server.bulk_caller.dry_run_bulk_workflow(workflow_id) if dry_run_result.get("ready_for_execution"): print("\nโœ… Ready for execution with safety confirmations") # Execute with confirmations execution_result = await server.bulk_caller.execute_bulk_workflow( workflow_id=workflow_id, dry_run=False, # Real execution confirm_destructive=True, # Required for destructive operations continue_on_error=False ) print(f"Execution result: {execution_result}") else: print(f"โŒ Safety issues found: {dry_run_result.get('safety_issues')}") async def demo_custom_workflow(): """Demonstrate creating a custom workflow with dependencies""" print("\n๐ŸŽฏ Demo: Custom Workflow with Dependencies") print("=" * 50) server = IntegratedMCPServer() # Create custom workflow with complex dependencies operations = [ { "id": "check_git", "tool_name": "git_status", "arguments": {"path": "/path/to/repo"}, "description": "Check Git repository status", "security_level": SecurityLevel.SAFE }, { "id": "backup_critical_files", "tool_name": "file_ops_create_backup", "arguments": { "source_paths": ["/path/to/critical/file.py"], "backup_name": "pre_analysis_backup" }, "description": "Backup critical files before analysis", "security_level": SecurityLevel.CAUTION, "depends_on": ["check_git"] }, { "id": "run_security_scan", "tool_name": "search_analysis_security_pattern_scan", "arguments": { "path": "/path/to/repo", "scan_types": ["secrets", "sql_injection"] }, "description": "Scan for security vulnerabilities", "security_level": SecurityLevel.SAFE, "depends_on": ["backup_critical_files"] }, { "id": "run_tests", "tool_name": "dev_workflow_run_tests", "arguments": {"test_type": "security"}, "description": "Run security tests", "security_level": SecurityLevel.SAFE, "depends_on": ["run_security_scan"] } ] workflow_result = await server.bulk_caller.create_bulk_workflow( name="Custom Security Analysis", description="Multi-stage security analysis with dependency management", operations=operations, mode="staged" # Execute in dependency-aware stages ) print(f"โœ… Created custom workflow: {workflow_result}") if workflow_result.get("success"): workflow_id = workflow_result["workflow_id"] # Get detailed workflow status status = await server.bulk_caller.get_workflow_status( workflow_id=workflow_id, include_operation_details=True ) print(f"\n๐Ÿ“Š Workflow status: {status}") # Show dependency resolution print("\n๐Ÿ”— Dependency stages will be:") print("Stage 1: check_git") print("Stage 2: backup_critical_files") print("Stage 3: run_security_scan") print("Stage 4: run_tests") async def demo_security_integration(): """Demonstrate security manager integration""" print("\n๐Ÿ›ก๏ธ Demo: Security Manager Integration") print("=" * 50) server = IntegratedMCPServer() # Check initial security status security_status = await server.security_manager.security_status() print(f"Initial security status: {security_status}") # List tools by security level tools_by_security = await server.security_manager.list_tools_by_security() print(f"\nTools by security level: {tools_by_security}") # Try to create a workflow with destructive operations (should warn about safety) operations = [ { "id": "dangerous_op", "tool_name": "file_ops_delete_file", "arguments": {"file_path": "/tmp/test.txt"}, "description": "Delete a test file", "security_level": SecurityLevel.DESTRUCTIVE } ] workflow_result = await server.bulk_caller.create_bulk_workflow( name="Dangerous Workflow", description="Contains destructive operations", operations=operations, mode="sequential" ) print(f"\nโš ๏ธ Created workflow with destructive operations: {workflow_result}") if workflow_result.get("success"): workflow_id = workflow_result["workflow_id"] # Try to execute without enabling destructive tools print("\nโŒ Attempting execution without enabling destructive tools...") execution_result = await server.bulk_caller.execute_bulk_workflow( workflow_id=workflow_id, dry_run=False, confirm_destructive=True ) print(f"Expected failure: {execution_result}") # Enable destructive tools and try again print("\n๐Ÿ”“ Enabling destructive tools...") enable_result = await server.security_manager.enable_destructive_tools( enabled=True, confirm_destructive=True ) print(f"Enable result: {enable_result}") # Now execution should be possible (but we'll still do dry run for safety) print("\n๐Ÿงช Now attempting dry run with destructive tools enabled...") dry_run_result = await server.bulk_caller.dry_run_bulk_workflow(workflow_id) print(f"Dry run with destructive tools enabled: {dry_run_result}") async def demo_error_handling_and_rollback(): """Demonstrate error handling and rollback capabilities""" print("\n๐Ÿ”„ Demo: Error Handling and Rollback") print("=" * 50) server = IntegratedMCPServer() # Create workflow with intentional failure scenario operations = [ { "id": "good_op", "tool_name": "git_status", "arguments": {"path": "/valid/path"}, "description": "This should succeed", "security_level": SecurityLevel.SAFE }, { "id": "bad_op", "tool_name": "nonexistent_tool", "arguments": {"some": "args"}, "description": "This should fail", "security_level": SecurityLevel.SAFE, "depends_on": ["good_op"] }, { "id": "never_run", "tool_name": "git_status", "arguments": {"path": "/another/path"}, "description": "This should never run due to failure", "security_level": SecurityLevel.SAFE, "depends_on": ["bad_op"] } ] workflow_result = await server.bulk_caller.create_bulk_workflow( name="Error Demonstration", description="Workflow designed to fail for demonstration", operations=operations, mode="staged" ) if workflow_result.get("success"): workflow_id = workflow_result["workflow_id"] # Execute with continue_on_error=False (default) print("\nโŒ Executing workflow that will fail...") execution_result = await server.bulk_caller.execute_bulk_workflow( workflow_id=workflow_id, dry_run=True, # Safe to run in dry mode continue_on_error=False ) print(f"Execution result with failure: {execution_result}") # Show final workflow status final_status = await server.bulk_caller.get_workflow_status( workflow_id=workflow_id, include_operation_details=True ) print(f"\nFinal workflow status: {final_status}") def main(): """Run all demonstrations""" print("๐Ÿš€ BulkToolCaller Integration Demonstrations") print("=" * 60) async def run_all_demos(): await demo_code_analysis_workflow() await demo_fix_and_test_workflow() await demo_custom_workflow() await demo_security_integration() await demo_error_handling_and_rollback() print("\nโœ… All demonstrations completed!") print("\n๐Ÿ’ก Key Takeaways:") print(" โ€ข Always run dry_run_bulk_workflow before live execution") print(" โ€ข Enable destructive tools only when necessary") print(" โ€ข Use dependency management for complex workflows") print(" โ€ข BulkToolCaller integrates seamlessly with SecurityManager") print(" โ€ข Error handling and rollback provide safety nets") # Run the demonstrations asyncio.run(run_all_demos()) if __name__ == "__main__": main()