""" Activity tracking API endpoints. """ from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from sqlalchemy.orm import selectinload from app.database.connection import get_db from app.models.activity import Activity from app.models.session import Session from app.api.schemas import ActivityRequest, ActivityResponse router = APIRouter() @router.post("/activity", response_model=ActivityResponse, status_code=status.HTTP_201_CREATED) async def record_activity( request: ActivityRequest, db: AsyncSession = Depends(get_db) ): """ Record a development activity (tool usage, file operation, etc.). This endpoint is called by Claude Code PostToolUse hooks. """ try: # Verify session exists result = await db.execute( select(Session).where(Session.id == request.session_id) ) session = result.scalars().first() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Session {request.session_id} not found" ) # Create activity record activity = Activity( session_id=request.session_id, conversation_id=request.conversation_id, timestamp=request.timestamp, tool_name=request.tool_name, action=request.action, file_path=request.file_path, metadata=request.metadata, success=request.success, error_message=request.error_message, lines_added=request.lines_added, lines_removed=request.lines_removed ) db.add(activity) # Update session activity count session.add_activity() # Add file to session's touched files if applicable if request.file_path: session.add_file_touched(request.file_path) await db.commit() await db.refresh(activity) return ActivityResponse( id=activity.id, session_id=activity.session_id, tool_name=activity.tool_name, action=activity.action, timestamp=activity.timestamp, success=activity.success ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to record activity: {str(e)}" ) @router.get("/activities") async def get_activities( session_id: Optional[int] = Query(None, description="Filter by session ID"), tool_name: Optional[str] = Query(None, description="Filter by tool name"), limit: int = Query(50, description="Maximum number of results"), offset: int = Query(0, description="Number of results to skip"), db: AsyncSession = Depends(get_db) ): """Get activities with optional filtering.""" try: query = select(Activity).options( selectinload(Activity.session).selectinload(Session.project) ) # Apply filters if session_id: query = query.where(Activity.session_id == session_id) if tool_name: query = query.where(Activity.tool_name == tool_name) # Order by timestamp descending query = query.order_by(Activity.timestamp.desc()).offset(offset).limit(limit) result = await db.execute(query) activities = result.scalars().all() return [ { "id": activity.id, "session_id": activity.session_id, "project_name": activity.session.project.name, "timestamp": activity.timestamp, "tool_name": activity.tool_name, "action": activity.action, "file_path": activity.file_path, "success": activity.success, "programming_language": activity.get_programming_language(), "lines_changed": activity.total_lines_changed, "metadata": activity.metadata } for activity in activities ] except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get activities: {str(e)}" ) @router.get("/activities/{activity_id}") async def get_activity( activity_id: int, db: AsyncSession = Depends(get_db) ): """Get detailed information about a specific activity.""" try: result = await db.execute( select(Activity) .options( selectinload(Activity.session).selectinload(Session.project), selectinload(Activity.conversation) ) .where(Activity.id == activity_id) ) activity = result.scalars().first() if not activity: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Activity {activity_id} not found" ) return { "id": activity.id, "session_id": activity.session_id, "conversation_id": activity.conversation_id, "project_name": activity.session.project.name, "timestamp": activity.timestamp, "tool_name": activity.tool_name, "action": activity.action, "file_path": activity.file_path, "metadata": activity.metadata, "success": activity.success, "error_message": activity.error_message, "lines_added": activity.lines_added, "lines_removed": activity.lines_removed, "total_lines_changed": activity.total_lines_changed, "net_lines_changed": activity.net_lines_changed, "file_extension": activity.get_file_extension(), "programming_language": activity.get_programming_language(), "is_file_operation": activity.is_file_operation, "is_code_execution": activity.is_code_execution, "is_search_operation": activity.is_search_operation, "command_executed": activity.get_command_executed(), "search_pattern": activity.get_search_pattern(), "task_type": activity.get_task_type() } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get activity: {str(e)}" ) @router.get("/activities/stats/tools") async def get_tool_usage_stats( session_id: Optional[int] = Query(None, description="Filter by session ID"), project_id: Optional[int] = Query(None, description="Filter by project ID"), days: int = Query(30, description="Number of days to include"), db: AsyncSession = Depends(get_db) ): """Get tool usage statistics.""" try: # Base query for tool usage counts query = select( Activity.tool_name, func.count(Activity.id).label('usage_count'), func.count(func.distinct(Activity.session_id)).label('sessions_used'), func.avg(Activity.lines_added + Activity.lines_removed).label('avg_lines_changed'), func.sum(Activity.lines_added + Activity.lines_removed).label('total_lines_changed') ).group_by(Activity.tool_name) # Apply filters if session_id: query = query.where(Activity.session_id == session_id) elif project_id: query = query.join(Session).where(Session.project_id == project_id) # Filter by date range if days > 0: from datetime import datetime, timedelta start_date = datetime.utcnow() - timedelta(days=days) query = query.where(Activity.timestamp >= start_date) result = await db.execute(query) stats = result.all() return [ { "tool_name": stat.tool_name, "usage_count": stat.usage_count, "sessions_used": stat.sessions_used, "avg_lines_changed": float(stat.avg_lines_changed or 0), "total_lines_changed": stat.total_lines_changed or 0 } for stat in stats ] except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get tool usage stats: {str(e)}" ) @router.get("/activities/stats/languages") async def get_language_usage_stats( project_id: Optional[int] = Query(None, description="Filter by project ID"), days: int = Query(30, description="Number of days to include"), db: AsyncSession = Depends(get_db) ): """Get programming language usage statistics.""" try: # Get activities with file operations query = select(Activity).where( Activity.file_path.isnot(None), Activity.tool_name.in_(["Edit", "Write", "Read"]) ) # Apply filters if project_id: query = query.join(Session).where(Session.project_id == project_id) if days > 0: from datetime import datetime, timedelta start_date = datetime.utcnow() - timedelta(days=days) query = query.where(Activity.timestamp >= start_date) result = await db.execute(query) activities = result.scalars().all() # Count by programming language language_stats = {} for activity in activities: lang = activity.get_programming_language() if lang: if lang not in language_stats: language_stats[lang] = { "language": lang, "file_count": 0, "activity_count": 0, "lines_added": 0, "lines_removed": 0 } language_stats[lang]["activity_count"] += 1 language_stats[lang]["lines_added"] += activity.lines_added or 0 language_stats[lang]["lines_removed"] += activity.lines_removed or 0 # Count unique files per language for activity in activities: lang = activity.get_programming_language() if lang and lang in language_stats: # This is a rough approximation - in reality we'd need to track unique files language_stats[lang]["file_count"] += 1 return list(language_stats.values()) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get language usage stats: {str(e)}" )