""" Bulk Operations Workflow Templates Pre-built workflow templates for common development tasks using BulkToolCaller. These templates demonstrate best practices for secure batch operations. """ import asyncio import sys from pathlib import Path # Add src to path for imports sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from enhanced_mcp.bulk_operations import BulkToolCaller, BulkOperationMode from enhanced_mcp.base import SecurityLevel class WorkflowTemplates: """Collection of pre-built workflow templates""" @staticmethod def comprehensive_code_review_workflow(project_path: str, exclude_dirs: list = None) -> dict: """Create a comprehensive code review workflow""" exclude_dirs = exclude_dirs or ["node_modules", ".git", "__pycache__", "dist", "build"] return { "name": "Comprehensive Code Review", "description": f"Multi-stage code review for project: {project_path}", "operations": [ { "id": "git_status_check", "tool_name": "git_status", "arguments": {"path": project_path}, "description": "Check Git repository status and uncommitted changes", "security_level": SecurityLevel.SAFE }, { "id": "dependency_scan", "tool_name": "dev_workflow_analyze_dependencies", "arguments": { "path": project_path, "check_vulnerabilities": True, "check_outdated": True }, "description": "Analyze project dependencies for security and updates", "security_level": SecurityLevel.SAFE, "depends_on": ["git_status_check"] }, { "id": "security_scan", "tool_name": "search_analysis_security_pattern_scan", "arguments": { "path": project_path, "scan_types": ["secrets", "sql_injection", "xss", "hardcoded_passwords"], "exclude_patterns": exclude_dirs }, "description": "Comprehensive security vulnerability scan", "security_level": SecurityLevel.SAFE, "depends_on": ["dependency_scan"] }, { "id": "code_quality_analysis", "tool_name": "file_ops_analyze_file_complexity", "arguments": { "path": project_path, "include_patterns": ["*.py", "*.js", "*.ts", "*.go", "*.rs", "*.java"], "exclude_patterns": exclude_dirs }, "description": "Analyze code complexity and quality metrics", "security_level": SecurityLevel.SAFE, "depends_on": ["security_scan"] }, { "id": "todo_fixme_scan", "tool_name": "search_analysis_advanced_search", "arguments": { "path": project_path, "patterns": ["TODO", "FIXME", "HACK", "XXX", "DEPRECATED"], "file_patterns": ["*.py", "*.js", "*.ts", "*.go", "*.rs", "*.java", "*.cpp", "*.h"], "exclude_patterns": exclude_dirs }, "description": "Find all TODO, FIXME, and technical debt markers", "security_level": SecurityLevel.SAFE, "depends_on": ["code_quality_analysis"] } ], "mode": "staged" } @staticmethod def automated_fix_workflow(file_paths: list, backup_enabled: bool = True) -> dict: """Create an automated fix workflow with proper safety measures""" operations = [] if backup_enabled: operations.append({ "id": "create_backup", "tool_name": "archive_create_backup", "arguments": { "source_paths": file_paths, "backup_name": f"auto_fix_backup_{int(asyncio.get_event_loop().time())}" }, "description": "Create backup before applying automated fixes", "security_level": SecurityLevel.CAUTION }) # Add individual file fixing operations for i, file_path in enumerate(file_paths): operations.append({ "id": f"fix_file_{i}", "tool_name": "file_ops_auto_fix_issues", "arguments": { "file_path": file_path, "fix_types": ["formatting", "imports", "basic_linting"], "dry_run": True # Always start with dry run }, "description": f"Auto-fix common issues in {Path(file_path).name}", "security_level": SecurityLevel.CAUTION, "depends_on": ["create_backup"] if backup_enabled else [] }) # Add validation step operations.append({ "id": "validate_fixes", "tool_name": "dev_workflow_lint_code", "arguments": { "paths": file_paths, "fix_issues": False # Just validate, don't fix }, "description": "Validate that fixes don't introduce new issues", "security_level": SecurityLevel.SAFE, "depends_on": [f"fix_file_{i}" for i in range(len(file_paths))] }) return { "name": "Automated Fix Workflow", "description": f"Safe automated fixing for {len(file_paths)} files", "operations": operations, "mode": "staged" } @staticmethod def ci_cd_preparation_workflow(project_path: str) -> dict: """Create a CI/CD preparation workflow""" return { "name": "CI/CD Preparation", "description": f"Prepare project for CI/CD pipeline: {project_path}", "operations": [ { "id": "git_status", "tool_name": "git_status", "arguments": {"path": project_path}, "description": "Check Git repository status", "security_level": SecurityLevel.SAFE }, { "id": "run_unit_tests", "tool_name": "dev_workflow_run_tests", "arguments": { "test_type": "unit", "coverage": True, "path": project_path }, "description": "Run unit tests with coverage reporting", "security_level": SecurityLevel.SAFE, "depends_on": ["git_status"] }, { "id": "run_integration_tests", "tool_name": "dev_workflow_run_tests", "arguments": { "test_type": "integration", "path": project_path }, "description": "Run integration tests", "security_level": SecurityLevel.SAFE, "depends_on": ["run_unit_tests"] }, { "id": "security_tests", "tool_name": "dev_workflow_run_tests", "arguments": { "test_type": "security", "path": project_path }, "description": "Run security-focused tests", "security_level": SecurityLevel.SAFE, "depends_on": ["run_integration_tests"] }, { "id": "lint_check", "tool_name": "dev_workflow_lint_code", "arguments": { "paths": [project_path], "fix_issues": False, "strict": True }, "description": "Strict linting check for CI/CD standards", "security_level": SecurityLevel.SAFE, "depends_on": ["security_tests"] } ], "mode": "sequential" } @staticmethod def data_migration_workflow(source_path: str, destination_path: str, validation_enabled: bool = True) -> dict: """Create a safe data migration workflow""" operations = [ { "id": "validate_source", "tool_name": "file_ops_analyze_file_complexity", "arguments": {"path": source_path}, "description": "Validate source data integrity", "security_level": SecurityLevel.SAFE }, { "id": "create_backup", "tool_name": "archive_create_backup", "arguments": { "source_paths": [source_path], "backup_name": f"migration_backup_{int(asyncio.get_event_loop().time())}" }, "description": "Create backup of source data", "security_level": SecurityLevel.CAUTION, "depends_on": ["validate_source"] }, { "id": "prepare_destination", "tool_name": "file_ops_create_backup", "arguments": { "source_paths": [destination_path], "backup_name": f"destination_backup_{int(asyncio.get_event_loop().time())}" }, "description": "Backup destination before migration", "security_level": SecurityLevel.CAUTION, "depends_on": ["create_backup"] } ] if validation_enabled: operations.append({ "id": "validate_migration", "tool_name": "file_ops_analyze_file_complexity", "arguments": {"path": destination_path}, "description": "Validate migrated data integrity", "security_level": SecurityLevel.SAFE, "depends_on": ["prepare_destination"] }) return { "name": "Data Migration Workflow", "description": f"Safe data migration from {source_path} to {destination_path}", "operations": operations, "mode": "sequential" } @staticmethod def security_hardening_workflow(project_path: str) -> dict: """Create a comprehensive security hardening workflow""" return { "name": "Security Hardening", "description": f"Comprehensive security hardening for: {project_path}", "operations": [ { "id": "secrets_scan", "tool_name": "search_analysis_security_pattern_scan", "arguments": { "path": project_path, "scan_types": ["secrets", "api_keys", "passwords"], "deep_scan": True }, "description": "Deep scan for exposed secrets and credentials", "security_level": SecurityLevel.SAFE }, { "id": "vulnerability_scan", "tool_name": "search_analysis_security_pattern_scan", "arguments": { "path": project_path, "scan_types": ["sql_injection", "xss", "csrf", "path_traversal"], "check_dependencies": True }, "description": "Comprehensive vulnerability scanning", "security_level": SecurityLevel.SAFE, "depends_on": ["secrets_scan"] }, { "id": "dependency_audit", "tool_name": "dev_workflow_analyze_dependencies", "arguments": { "path": project_path, "security_focus": True, "check_licenses": True }, "description": "Audit dependencies for security issues", "security_level": SecurityLevel.SAFE, "depends_on": ["vulnerability_scan"] }, { "id": "security_tests", "tool_name": "dev_workflow_run_tests", "arguments": { "test_type": "security", "path": project_path, "coverage": True }, "description": "Run comprehensive security test suite", "security_level": SecurityLevel.SAFE, "depends_on": ["dependency_audit"] } ], "mode": "staged" } async def demonstrate_workflow_templates(): """Demonstrate various workflow templates""" print("šŸŽÆ Workflow Templates Demonstration") print("=" * 50) templates = WorkflowTemplates() # 1. Code Review Workflow print("\n1. šŸ“‹ Comprehensive Code Review Workflow") code_review = templates.comprehensive_code_review_workflow( project_path="/path/to/project", exclude_dirs=["node_modules", ".git", "dist"] ) print(f" Operations: {len(code_review['operations'])}") print(f" Mode: {code_review['mode']}") print(" Stages:") for i, op in enumerate(code_review['operations'], 1): print(f" {i}. {op['description']}") # 2. Automated Fix Workflow print("\n2. šŸ”§ Automated Fix Workflow") fix_workflow = templates.automated_fix_workflow( file_paths=["/path/to/file1.py", "/path/to/file2.js"], backup_enabled=True ) print(f" Operations: {len(fix_workflow['operations'])}") print(" Safety features:") print(" • Automatic backup creation") print(" • Dry-run by default") print(" • Validation step") # 3. CI/CD Preparation print("\n3. šŸš€ CI/CD Preparation Workflow") cicd_workflow = templates.ci_cd_preparation_workflow("/path/to/project") print(f" Operations: {len(cicd_workflow['operations'])}") print(" Test coverage:") print(" • Unit tests with coverage") print(" • Integration tests") print(" • Security tests") print(" • Linting validation") # 4. Security Hardening print("\n4. šŸ›”ļø Security Hardening Workflow") security_workflow = templates.security_hardening_workflow("/path/to/project") print(f" Operations: {len(security_workflow['operations'])}") print(" Security checks:") for op in security_workflow['operations']: print(f" • {op['description']}") # 5. Data Migration print("\n5. šŸ”„ Data Migration Workflow") migration_workflow = templates.data_migration_workflow( source_path="/data/source", destination_path="/data/destination", validation_enabled=True ) print(f" Operations: {len(migration_workflow['operations'])}") print(" Safety measures:") print(" • Source validation") print(" • Multiple backup points") print(" • Post-migration validation") async def demonstrate_workflow_execution(): """Demonstrate executing workflows with proper safety controls""" print("\nšŸš€ Workflow Execution Demonstration") print("=" * 50) # Create BulkToolCaller instance bulk_operations = BulkToolCaller() # Create a simple workflow templates = WorkflowTemplates() workflow_data = templates.comprehensive_code_review_workflow("/example/project") print("\n1. Creating workflow...") create_result = await bulk_operations.create_bulk_workflow( name=workflow_data["name"], description=workflow_data["description"], operations=workflow_data["operations"], mode=workflow_data["mode"] ) if create_result.get("success"): workflow_id = create_result["workflow_id"] print(f" āœ… Workflow created: {workflow_id}") print("\n2. Running dry run validation...") dry_run_result = await bulk_operations.dry_run_bulk_workflow(workflow_id) if dry_run_result.get("ready_for_execution"): print(" āœ… Workflow passed dry run validation") print(f" šŸ“Š {dry_run_result['safe_operations']}/{dry_run_result['total_operations']} operations are safe") print("\n3. Executing workflow (dry run mode)...") execution_result = await bulk_operations.execute_bulk_workflow( workflow_id=workflow_id, dry_run=True, # Start with dry run confirm_destructive=False ) if execution_result.get("success"): print(" āœ… Dry run execution completed successfully") print(f" šŸ“ˆ Progress: {execution_result['completed_operations']}/{execution_result['total_operations']}") else: print(f" āŒ Dry run failed: {execution_result.get('error', 'Unknown error')}") else: print(" āŒ Workflow failed dry run validation") if dry_run_result.get("safety_issues"): print(" Safety issues:") for issue in dry_run_result["safety_issues"]: print(f" • {issue}") else: print(f" āŒ Failed to create workflow: {create_result.get('error', 'Unknown error')}") def main(): """Main demonstration function""" print("šŸŽÆ Bulk Operations Workflow Templates") print("=" * 60) async def run_demonstrations(): await demonstrate_workflow_templates() await demonstrate_workflow_execution() print("\nāœ… All demonstrations completed!") print("\nšŸ’” Key Benefits of Workflow Templates:") print(" • Pre-configured best practices") print(" • Built-in safety measures") print(" • Dependency management") print(" • Comprehensive error handling") print(" • Integration with SecurityManager") print("\nšŸ›”ļø Safety Features:") print(" • Automatic backup creation") print(" • Dry-run validation") print(" • Progressive tool disclosure") print(" • Rollback capabilities (where possible)") print(" • Security level enforcement") # Run the demonstrations asyncio.run(run_demonstrations()) if __name__ == "__main__": main()