""" Data importer for Claude Code .claude.json file. This module provides functionality to import historical data from the .claude.json configuration file into the project tracker. """ import json import os from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.database.connection import get_db from app.models.project import Project from app.models.session import Session from app.models.conversation import Conversation router = APIRouter() class ClaudeJsonImporter: """Importer for .claude.json data.""" def __init__(self, db: AsyncSession): self.db = db async def import_from_file(self, file_path: str) -> Dict[str, Any]: """Import data from .claude.json file.""" if not os.path.exists(file_path): raise FileNotFoundError(f"Claude configuration file not found: {file_path}") try: with open(file_path, 'r', encoding='utf-8') as f: claude_data = json.load(f) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in Claude configuration file: {e}") results = { "projects_imported": 0, "sessions_estimated": 0, "conversations_imported": 0, "errors": [] } # Import basic usage statistics await self._import_usage_stats(claude_data, results) # Import projects and their history if "projects" in claude_data: await self._import_projects(claude_data["projects"], results) return results async def _import_usage_stats(self, claude_data: Dict[str, Any], results: Dict[str, Any]): """Import basic usage statistics.""" # We could create a synthetic "Claude Code Usage" project to track overall stats if claude_data.get("numStartups") and claude_data.get("firstStartTime"): try: first_start = datetime.fromisoformat( claude_data["firstStartTime"].replace('Z', '+00:00') ) # Create a synthetic project for overall Claude Code usage usage_project = await self._get_or_create_project( name="Claude Code Usage Statistics", path="", description="Imported usage statistics from .claude.json" ) # Estimate session distribution over time num_startups = claude_data["numStartups"] days_since_first = (datetime.now() - first_start.replace(tzinfo=None)).days if days_since_first > 0: # Create estimated sessions spread over the usage period await self._create_estimated_sessions( usage_project, first_start.replace(tzinfo=None), num_startups, days_since_first ) results["sessions_estimated"] = num_startups except Exception as e: results["errors"].append(f"Failed to import usage stats: {e}") async def _import_projects(self, projects_data: Dict[str, Any], results: Dict[str, Any]): """Import project data from .claude.json.""" for project_path, project_info in projects_data.items(): try: # Skip system paths or non-meaningful paths if project_path in ["", "/", "/tmp"]: continue # Extract project name from path project_name = Path(project_path).name or "Unknown Project" # Create or get existing project project = await self._get_or_create_project( name=project_name, path=project_path ) results["projects_imported"] += 1 # Import conversation history if available if "history" in project_info and isinstance(project_info["history"], list): conversation_count = await self._import_project_history( project, project_info["history"] ) results["conversations_imported"] += conversation_count except Exception as e: results["errors"].append(f"Failed to import project {project_path}: {e}") async def _get_or_create_project( self, name: str, path: str, description: Optional[str] = None ) -> Project: """Get existing project or create new one.""" # Check if project already exists result = await self.db.execute( select(Project).where(Project.path == path) ) existing_project = result.scalars().first() if existing_project: return existing_project # Try to detect languages from path languages = self._detect_languages(path) # Create new project project = Project( name=name, path=path, languages=languages ) self.db.add(project) await self.db.commit() await self.db.refresh(project) return project def _detect_languages(self, project_path: str) -> Optional[List[str]]: """Attempt to detect programming languages from project directory.""" languages = [] try: if os.path.exists(project_path) and os.path.isdir(project_path): # Look for common files to infer languages files = os.listdir(project_path) # Python if any(f.endswith(('.py', '.pyx', '.pyi')) for f in files) or 'requirements.txt' in files: languages.append('python') # JavaScript/TypeScript if any(f.endswith(('.js', '.jsx', '.ts', '.tsx')) for f in files) or 'package.json' in files: if any(f.endswith(('.ts', '.tsx')) for f in files): languages.append('typescript') else: languages.append('javascript') # Go if any(f.endswith('.go') for f in files) or 'go.mod' in files: languages.append('go') # Rust if any(f.endswith('.rs') for f in files) or 'Cargo.toml' in files: languages.append('rust') # Java if any(f.endswith('.java') for f in files) or 'pom.xml' in files: languages.append('java') except (OSError, PermissionError): # If we can't read the directory, that's okay pass return languages if languages else None async def _create_estimated_sessions( self, project: Project, first_start: datetime, num_startups: int, days_since_first: int ): """Create estimated sessions based on startup count.""" # Check if we already have sessions for this project existing_sessions = await self.db.execute( select(Session).where( Session.project_id == project.id, Session.session_type == "startup" ) ) if existing_sessions.scalars().first(): return # Sessions already exist, skip creation # Don't create too many sessions - limit to reasonable estimates max_sessions = min(num_startups, 50) # Cap at 50 sessions # Distribute sessions over the time period if days_since_first > 0: sessions_per_day = max_sessions / days_since_first for i in range(max_sessions): # Spread sessions over the time period days_offset = int(i / sessions_per_day) if sessions_per_day > 0 else i session_time = first_start + timedelta(days=days_offset) # Estimate session duration (30-180 minutes) import random duration = random.randint(30, 180) session = Session( project_id=project.id, start_time=session_time, end_time=session_time + timedelta(minutes=duration), session_type="startup", working_directory=project.path, duration_minutes=duration, activity_count=random.randint(5, 25), # Estimated activity conversation_count=random.randint(2, 8) # Estimated conversations ) self.db.add(session) await self.db.commit() async def _import_project_history( self, project: Project, history: List[Dict[str, Any]] ) -> int: """Import conversation history for a project.""" # Check if we already have history conversations for this project existing_conversations = await self.db.execute( select(Conversation).where( Conversation.context.like('%"imported_from": ".claude.json"%'), Conversation.session.has(Session.project_id == project.id) ) ) if existing_conversations.scalars().first(): return 0 # History already imported, skip conversation_count = 0 # Create a synthetic session for imported history history_session = Session( project_id=project.id, start_time=datetime.now() - timedelta(days=30), # Assume recent session_type="history_import", # Different type to avoid conflicts working_directory=project.path, activity_count=len(history), conversation_count=len(history) ) self.db.add(history_session) await self.db.commit() await self.db.refresh(history_session) # Import each history entry as a conversation for i, entry in enumerate(history[:20]): # Limit to 20 entries try: display_text = entry.get("display", "") if display_text: conversation = Conversation( session_id=history_session.id, timestamp=history_session.start_time + timedelta(minutes=i * 5), user_prompt=display_text, exchange_type="user_prompt", context={"imported_from": ".claude.json"} ) self.db.add(conversation) conversation_count += 1 except Exception as e: # Skip problematic entries continue if conversation_count > 0: await self.db.commit() return conversation_count @router.post("/import/claude-json") async def import_claude_json( file_path: Optional[str] = None, db: AsyncSession = Depends(get_db) ): """ Import data from .claude.json file. If no file_path is provided, tries to find .claude.json in the user's home directory. """ if not file_path: # Try default location home_path = Path.home() / ".claude.json" file_path = str(home_path) try: importer = ClaudeJsonImporter(db) results = await importer.import_from_file(file_path) return { "success": True, "message": "Import completed successfully", "results": results } except FileNotFoundError as e: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Claude configuration file not found: {e}" ) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid file format: {e}" ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Import failed: {e}" ) @router.get("/import/claude-json/preview") async def preview_claude_json_import( file_path: Optional[str] = None ): """ Preview what would be imported from .claude.json file without actually importing. """ if not file_path: home_path = Path.home() / ".claude.json" file_path = str(home_path) if not os.path.exists(file_path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Claude configuration file not found" ) try: with open(file_path, 'r', encoding='utf-8') as f: claude_data = json.load(f) except json.JSONDecodeError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON in Claude configuration file: {e}" ) preview = { "file_path": file_path, "file_size_mb": round(os.path.getsize(file_path) / (1024 * 1024), 2), "claude_usage": { "num_startups": claude_data.get("numStartups", 0), "first_start_time": claude_data.get("firstStartTime"), "prompt_queue_use_count": claude_data.get("promptQueueUseCount", 0) }, "projects": { "total_count": len(claude_data.get("projects", {})), "paths": list(claude_data.get("projects", {}).keys())[:10], # Show first 10 "has_more": len(claude_data.get("projects", {})) > 10 }, "history_entries": 0 } # Count total history entries across all projects if "projects" in claude_data: total_history = sum( len(proj.get("history", [])) for proj in claude_data["projects"].values() ) preview["history_entries"] = total_history return preview