""" Analytics API endpoints for productivity insights and metrics. """ from typing import List, Optional, Dict, Any from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_, or_ from sqlalchemy.orm import selectinload from app.database.connection import get_db from app.models.project import Project from app.models.session import Session from app.models.conversation import Conversation from app.models.activity import Activity from app.models.waiting_period import WaitingPeriod from app.models.git_operation import GitOperation from app.api.schemas import ProductivityMetrics router = APIRouter() async def calculate_engagement_score(waiting_periods: List[WaitingPeriod]) -> float: """Calculate overall engagement score from waiting periods.""" if not waiting_periods: return 75.0 # Default neutral score scores = [wp.engagement_score for wp in waiting_periods] avg_score = sum(scores) / len(scores) return round(avg_score * 100, 1) # Convert to 0-100 scale async def get_productivity_trends(db: AsyncSession, sessions: List[Session], days: int) -> List[Dict[str, Any]]: """Calculate daily productivity trends.""" from datetime import datetime, timedelta # Group sessions by date daily_data = {} for session in sessions: date_key = session.start_time.date().isoformat() if date_key not in daily_data: daily_data[date_key] = { "sessions": 0, "total_time": 0, "activities": 0, "conversations": 0 } daily_data[date_key]["sessions"] += 1 daily_data[date_key]["total_time"] += session.calculated_duration_minutes or 0 daily_data[date_key]["activities"] += session.activity_count daily_data[date_key]["conversations"] += session.conversation_count # Calculate productivity scores (0-100 based on relative activity) if daily_data: max_activities = max(day["activities"] for day in daily_data.values()) or 1 max_time = max(day["total_time"] for day in daily_data.values()) or 1 trends = [] for date, data in sorted(daily_data.items()): # Weighted score: 60% activities, 40% time activity_score = (data["activities"] / max_activities) * 60 time_score = (data["total_time"] / max_time) * 40 productivity_score = activity_score + time_score trends.append({ "date": date, "score": round(productivity_score, 1) }) return trends return [] @router.get("/analytics/productivity", response_model=ProductivityMetrics) async def get_productivity_metrics( project_id: Optional[int] = Query(None, description="Filter by project ID"), days: int = Query(30, description="Number of days to analyze"), db: AsyncSession = Depends(get_db) ): """ Get comprehensive productivity analytics and insights. Analyzes engagement, tool usage, and productivity patterns. """ try: from datetime import datetime, timedelta # Date filter start_date = datetime.utcnow() - timedelta(days=days) if days > 0 else None # Base query for sessions session_query = select(Session).options( selectinload(Session.project), selectinload(Session.activities), selectinload(Session.conversations), selectinload(Session.waiting_periods) ) if project_id: session_query = session_query.where(Session.project_id == project_id) if start_date: session_query = session_query.where(Session.start_time >= start_date) session_result = await db.execute(session_query) sessions = session_result.scalars().all() if not sessions: return ProductivityMetrics( engagement_score=0.0, average_session_length=0.0, think_time_average=0.0, files_per_session=0.0, tools_most_used=[], productivity_trends=[] ) # Calculate basic metrics total_sessions = len(sessions) total_time = sum(s.calculated_duration_minutes or 0 for s in sessions) average_session_length = total_time / total_sessions if total_sessions > 0 else 0 # Collect all waiting periods for engagement analysis all_waiting_periods = [] for session in sessions: all_waiting_periods.extend(session.waiting_periods) # Calculate think time average valid_wait_times = [wp.calculated_duration_seconds for wp in all_waiting_periods if wp.calculated_duration_seconds is not None] think_time_average = sum(valid_wait_times) / len(valid_wait_times) if valid_wait_times else 0 # Calculate engagement score engagement_score = await calculate_engagement_score(all_waiting_periods) # Calculate files per session total_files = sum(len(s.files_touched or []) for s in sessions) files_per_session = total_files / total_sessions if total_sessions > 0 else 0 # Tool usage analysis tool_usage = {} for session in sessions: for activity in session.activities: tool = activity.tool_name if tool not in tool_usage: tool_usage[tool] = 0 tool_usage[tool] += 1 tools_most_used = [ {"tool": tool, "count": count} for tool, count in sorted(tool_usage.items(), key=lambda x: x[1], reverse=True)[:10] ] # Get productivity trends productivity_trends = await get_productivity_trends(db, sessions, days) return ProductivityMetrics( engagement_score=engagement_score, average_session_length=round(average_session_length, 1), think_time_average=round(think_time_average, 1), files_per_session=round(files_per_session, 1), tools_most_used=tools_most_used, productivity_trends=productivity_trends ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get productivity metrics: {str(e)}" ) @router.get("/analytics/patterns") async def get_development_patterns( project_id: Optional[int] = Query(None, description="Filter by project ID"), days: int = Query(30, description="Number of days to analyze"), db: AsyncSession = Depends(get_db) ): """Analyze development patterns and workflow insights.""" try: from datetime import datetime, timedelta start_date = datetime.utcnow() - timedelta(days=days) if days > 0 else None # Get sessions with related data session_query = select(Session).options( selectinload(Session.activities), selectinload(Session.conversations), selectinload(Session.waiting_periods), selectinload(Session.git_operations) ) if project_id: session_query = session_query.where(Session.project_id == project_id) if start_date: session_query = session_query.where(Session.start_time >= start_date) session_result = await db.execute(session_query) sessions = session_result.scalars().all() if not sessions: return {"message": "No data available for the specified period"} # Working hours analysis hour_distribution = {} for session in sessions: hour = session.start_time.hour hour_distribution[hour] = hour_distribution.get(hour, 0) + 1 # Session type patterns session_type_distribution = {} for session in sessions: session_type = session.session_type session_type_distribution[session_type] = session_type_distribution.get(session_type, 0) + 1 # Git workflow patterns git_patterns = {"commits_per_session": 0, "commit_frequency": {}} total_commits = 0 commit_days = set() for session in sessions: session_commits = sum(1 for op in session.git_operations if op.is_commit) total_commits += session_commits for op in session.git_operations: if op.is_commit: commit_days.add(op.timestamp.date()) git_patterns["commits_per_session"] = round(total_commits / len(sessions), 2) if sessions else 0 git_patterns["commit_frequency"] = round(len(commit_days) / days, 2) if days > 0 else 0 # Problem-solving patterns problem_solving = {"debug_sessions": 0, "learning_sessions": 0, "implementation_sessions": 0} for session in sessions: # Analyze conversation content to infer session type debug_keywords = ["error", "debug", "bug", "fix", "problem", "issue"] learn_keywords = ["how", "what", "explain", "understand", "learn", "tutorial"] impl_keywords = ["implement", "create", "build", "add", "feature"] session_content = " ".join([ conv.user_prompt or "" for conv in session.conversations ]).lower() if any(keyword in session_content for keyword in debug_keywords): problem_solving["debug_sessions"] += 1 elif any(keyword in session_content for keyword in learn_keywords): problem_solving["learning_sessions"] += 1 elif any(keyword in session_content for keyword in impl_keywords): problem_solving["implementation_sessions"] += 1 # Tool workflow patterns common_sequences = {} for session in sessions: activities = sorted(session.activities, key=lambda a: a.timestamp) if len(activities) >= 2: for i in range(len(activities) - 1): sequence = f"{activities[i].tool_name} → {activities[i+1].tool_name}" common_sequences[sequence] = common_sequences.get(sequence, 0) + 1 # Get top 5 tool sequences top_sequences = sorted(common_sequences.items(), key=lambda x: x[1], reverse=True)[:5] return { "analysis_period_days": days, "total_sessions_analyzed": len(sessions), "working_hours": { "distribution": hour_distribution, "peak_hours": sorted(hour_distribution.items(), key=lambda x: x[1], reverse=True)[:3], "most_active_hour": max(hour_distribution.items(), key=lambda x: x[1])[0] if hour_distribution else None }, "session_patterns": { "type_distribution": session_type_distribution, "average_duration_minutes": round(sum(s.calculated_duration_minutes or 0 for s in sessions) / len(sessions), 1) }, "git_workflow": git_patterns, "problem_solving_patterns": problem_solving, "tool_workflows": { "common_sequences": [{"sequence": seq, "count": count} for seq, count in top_sequences], "total_unique_sequences": len(common_sequences) } } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to analyze development patterns: {str(e)}" ) @router.get("/analytics/learning") async def get_learning_insights( project_id: Optional[int] = Query(None, description="Filter by project ID"), days: int = Query(30, description="Number of days to analyze"), db: AsyncSession = Depends(get_db) ): """Analyze learning patterns and knowledge development.""" try: from datetime import datetime, timedelta start_date = datetime.utcnow() - timedelta(days=days) if days > 0 else None # Get conversations for learning analysis conv_query = select(Conversation).options(selectinload(Conversation.session)) if project_id: conv_query = conv_query.join(Session).where(Session.project_id == project_id) if start_date: conv_query = conv_query.where(Conversation.timestamp >= start_date) conv_result = await db.execute(conv_query) conversations = conv_result.scalars().all() if not conversations: return {"message": "No conversation data available for learning analysis"} # Topic frequency analysis learning_keywords = { "authentication": ["auth", "login", "password", "token", "session"], "database": ["database", "sql", "query", "table", "migration"], "api": ["api", "rest", "endpoint", "request", "response"], "testing": ["test", "pytest", "unittest", "mock", "fixture"], "deployment": ["deploy", "docker", "aws", "server", "production"], "debugging": ["debug", "error", "exception", "traceback", "log"], "optimization": ["optimize", "performance", "speed", "memory", "cache"], "security": ["security", "vulnerability", "encrypt", "hash", "ssl"] } topic_frequency = {topic: 0 for topic in learning_keywords.keys()} for conv in conversations: if conv.user_prompt: prompt_lower = conv.user_prompt.lower() for topic, keywords in learning_keywords.items(): if any(keyword in prompt_lower for keyword in keywords): topic_frequency[topic] += 1 # Question complexity analysis complexity_indicators = { "beginner": ["how to", "what is", "how do i", "basic", "simple"], "intermediate": ["best practice", "optimize", "improve", "better way"], "advanced": ["architecture", "pattern", "scalability", "design", "system"] } complexity_distribution = {level: 0 for level in complexity_indicators.keys()} for conv in conversations: if conv.user_prompt: prompt_lower = conv.user_prompt.lower() for level, indicators in complexity_indicators.items(): if any(indicator in prompt_lower for indicator in indicators): complexity_distribution[level] += 1 break # Learning progression analysis weekly_topics = {} for conv in conversations: if conv.user_prompt: week = conv.timestamp.strftime("%Y-W%U") if week not in weekly_topics: weekly_topics[week] = set() prompt_lower = conv.user_prompt.lower() for topic, keywords in learning_keywords.items(): if any(keyword in prompt_lower for keyword in keywords): weekly_topics[week].add(topic) # Calculate learning velocity (new topics per week) learning_velocity = [] for week, topics in sorted(weekly_topics.items()): learning_velocity.append({ "week": week, "new_topics": len(topics), "topics": list(topics) }) # Repetition patterns (topics asked about multiple times) repeated_topics = {topic: count for topic, count in topic_frequency.items() if count > 1} return { "analysis_period_days": days, "total_conversations_analyzed": len(conversations), "learning_topics": { "frequency": topic_frequency, "most_discussed": max(topic_frequency.items(), key=lambda x: x[1]) if topic_frequency else None, "repeated_topics": repeated_topics }, "question_complexity": { "distribution": complexity_distribution, "progression_indicator": "advancing" if complexity_distribution["advanced"] > complexity_distribution["beginner"] else "learning_basics" }, "learning_velocity": learning_velocity, "insights": { "diverse_learning": len([t for t, c in topic_frequency.items() if c > 0]), "deep_dives": len(repeated_topics), "active_learning_weeks": len(weekly_topics), "avg_topics_per_week": round(sum(len(topics) for topics in weekly_topics.values()) / len(weekly_topics), 1) if weekly_topics else 0 } } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to analyze learning insights: {str(e)}" ) @router.get("/analytics/summary") async def get_analytics_summary( project_id: Optional[int] = Query(None, description="Filter by project ID"), db: AsyncSession = Depends(get_db) ): """Get a high-level analytics summary dashboard.""" try: # Get overall statistics base_query = select(Session) if project_id: base_query = base_query.where(Session.project_id == project_id) session_result = await db.execute(base_query) all_sessions = session_result.scalars().all() if not all_sessions: return {"message": "No data available"} # Basic metrics total_sessions = len(all_sessions) total_time_hours = sum(s.calculated_duration_minutes or 0 for s in all_sessions) / 60 avg_session_minutes = (sum(s.calculated_duration_minutes or 0 for s in all_sessions) / total_sessions) if total_sessions else 0 # Date range start_date = min(s.start_time for s in all_sessions) end_date = max(s.start_time for s in all_sessions) total_days = (end_date - start_date).days + 1 # Activity summary total_activities = sum(s.activity_count for s in all_sessions) total_conversations = sum(s.conversation_count for s in all_sessions) # Recent activity (last 7 days) from datetime import datetime, timedelta week_ago = datetime.utcnow() - timedelta(days=7) recent_sessions = [s for s in all_sessions if s.start_time >= week_ago] # Project diversity (if not filtered by project) project_count = 1 if project_id else len(set(s.project_id for s in all_sessions)) return { "overview": { "total_sessions": total_sessions, "total_time_hours": round(total_time_hours, 1), "average_session_minutes": round(avg_session_minutes, 1), "total_activities": total_activities, "total_conversations": total_conversations, "projects_tracked": project_count, "tracking_period_days": total_days }, "recent_activity": { "sessions_last_7_days": len(recent_sessions), "time_last_7_days_hours": round(sum(s.calculated_duration_minutes or 0 for s in recent_sessions) / 60, 1), "daily_average_last_week": round(len(recent_sessions) / 7, 1) }, "productivity_indicators": { "activities_per_session": round(total_activities / total_sessions, 1) if total_sessions else 0, "conversations_per_session": round(total_conversations / total_sessions, 1) if total_sessions else 0, "productivity_score": min(100, round((total_activities / total_sessions) * 5, 1)) if total_sessions else 0 # Rough scoring }, "time_distribution": { "daily_average_hours": round(total_time_hours / total_days, 1), "longest_session_minutes": max(s.calculated_duration_minutes or 0 for s in all_sessions), "shortest_session_minutes": min(s.calculated_duration_minutes or 0 for s in all_sessions if s.calculated_duration_minutes) } } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get analytics summary: {str(e)}" )