claude-code-tracker/app/api/analytics.py
Ryan Malloy 44ed9936b7 Initial commit: Claude Code Project Tracker
Add comprehensive development intelligence system that tracks:
- Development sessions with automatic start/stop
- Full conversation history with semantic search
- Tool usage and file operation analytics
- Think time and engagement analysis
- Git activity correlation
- Learning pattern recognition
- Productivity insights and metrics

Features:
- FastAPI backend with SQLite database
- Modern web dashboard with interactive charts
- Claude Code hook integration for automatic tracking
- Comprehensive test suite with 100+ tests
- Complete API documentation (OpenAPI/Swagger)
- Privacy-first design with local data storage

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-11 02:59:21 -06:00

483 lines
20 KiB
Python

"""
Analytics API endpoints for productivity insights and metrics.
"""
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import selectinload
from app.database.connection import get_db
from app.models.project import Project
from app.models.session import Session
from app.models.conversation import Conversation
from app.models.activity import Activity
from app.models.waiting_period import WaitingPeriod
from app.models.git_operation import GitOperation
from app.api.schemas import ProductivityMetrics
router = APIRouter()
async def calculate_engagement_score(waiting_periods: List[WaitingPeriod]) -> float:
"""Calculate overall engagement score from waiting periods."""
if not waiting_periods:
return 75.0 # Default neutral score
scores = [wp.engagement_score for wp in waiting_periods]
avg_score = sum(scores) / len(scores)
return round(avg_score * 100, 1) # Convert to 0-100 scale
async def get_productivity_trends(db: AsyncSession, sessions: List[Session], days: int) -> List[Dict[str, Any]]:
"""Calculate daily productivity trends."""
from datetime import datetime, timedelta
# Group sessions by date
daily_data = {}
for session in sessions:
date_key = session.start_time.date().isoformat()
if date_key not in daily_data:
daily_data[date_key] = {
"sessions": 0,
"total_time": 0,
"activities": 0,
"conversations": 0
}
daily_data[date_key]["sessions"] += 1
daily_data[date_key]["total_time"] += session.calculated_duration_minutes or 0
daily_data[date_key]["activities"] += session.activity_count
daily_data[date_key]["conversations"] += session.conversation_count
# Calculate productivity scores (0-100 based on relative activity)
if daily_data:
max_activities = max(day["activities"] for day in daily_data.values()) or 1
max_time = max(day["total_time"] for day in daily_data.values()) or 1
trends = []
for date, data in sorted(daily_data.items()):
# Weighted score: 60% activities, 40% time
activity_score = (data["activities"] / max_activities) * 60
time_score = (data["total_time"] / max_time) * 40
productivity_score = activity_score + time_score
trends.append({
"date": date,
"score": round(productivity_score, 1)
})
return trends
return []
@router.get("/analytics/productivity", response_model=ProductivityMetrics)
async def get_productivity_metrics(
project_id: Optional[int] = Query(None, description="Filter by project ID"),
days: int = Query(30, description="Number of days to analyze"),
db: AsyncSession = Depends(get_db)
):
"""
Get comprehensive productivity analytics and insights.
Analyzes engagement, tool usage, and productivity patterns.
"""
try:
from datetime import datetime, timedelta
# Date filter
start_date = datetime.utcnow() - timedelta(days=days) if days > 0 else None
# Base query for sessions
session_query = select(Session).options(
selectinload(Session.project),
selectinload(Session.activities),
selectinload(Session.conversations),
selectinload(Session.waiting_periods)
)
if project_id:
session_query = session_query.where(Session.project_id == project_id)
if start_date:
session_query = session_query.where(Session.start_time >= start_date)
session_result = await db.execute(session_query)
sessions = session_result.scalars().all()
if not sessions:
return ProductivityMetrics(
engagement_score=0.0,
average_session_length=0.0,
think_time_average=0.0,
files_per_session=0.0,
tools_most_used=[],
productivity_trends=[]
)
# Calculate basic metrics
total_sessions = len(sessions)
total_time = sum(s.calculated_duration_minutes or 0 for s in sessions)
average_session_length = total_time / total_sessions if total_sessions > 0 else 0
# Collect all waiting periods for engagement analysis
all_waiting_periods = []
for session in sessions:
all_waiting_periods.extend(session.waiting_periods)
# Calculate think time average
valid_wait_times = [wp.calculated_duration_seconds for wp in all_waiting_periods
if wp.calculated_duration_seconds is not None]
think_time_average = sum(valid_wait_times) / len(valid_wait_times) if valid_wait_times else 0
# Calculate engagement score
engagement_score = await calculate_engagement_score(all_waiting_periods)
# Calculate files per session
total_files = sum(len(s.files_touched or []) for s in sessions)
files_per_session = total_files / total_sessions if total_sessions > 0 else 0
# Tool usage analysis
tool_usage = {}
for session in sessions:
for activity in session.activities:
tool = activity.tool_name
if tool not in tool_usage:
tool_usage[tool] = 0
tool_usage[tool] += 1
tools_most_used = [
{"tool": tool, "count": count}
for tool, count in sorted(tool_usage.items(), key=lambda x: x[1], reverse=True)[:10]
]
# Get productivity trends
productivity_trends = await get_productivity_trends(db, sessions, days)
return ProductivityMetrics(
engagement_score=engagement_score,
average_session_length=round(average_session_length, 1),
think_time_average=round(think_time_average, 1),
files_per_session=round(files_per_session, 1),
tools_most_used=tools_most_used,
productivity_trends=productivity_trends
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get productivity metrics: {str(e)}"
)
@router.get("/analytics/patterns")
async def get_development_patterns(
project_id: Optional[int] = Query(None, description="Filter by project ID"),
days: int = Query(30, description="Number of days to analyze"),
db: AsyncSession = Depends(get_db)
):
"""Analyze development patterns and workflow insights."""
try:
from datetime import datetime, timedelta
start_date = datetime.utcnow() - timedelta(days=days) if days > 0 else None
# Get sessions with related data
session_query = select(Session).options(
selectinload(Session.activities),
selectinload(Session.conversations),
selectinload(Session.waiting_periods),
selectinload(Session.git_operations)
)
if project_id:
session_query = session_query.where(Session.project_id == project_id)
if start_date:
session_query = session_query.where(Session.start_time >= start_date)
session_result = await db.execute(session_query)
sessions = session_result.scalars().all()
if not sessions:
return {"message": "No data available for the specified period"}
# Working hours analysis
hour_distribution = {}
for session in sessions:
hour = session.start_time.hour
hour_distribution[hour] = hour_distribution.get(hour, 0) + 1
# Session type patterns
session_type_distribution = {}
for session in sessions:
session_type = session.session_type
session_type_distribution[session_type] = session_type_distribution.get(session_type, 0) + 1
# Git workflow patterns
git_patterns = {"commits_per_session": 0, "commit_frequency": {}}
total_commits = 0
commit_days = set()
for session in sessions:
session_commits = sum(1 for op in session.git_operations if op.is_commit)
total_commits += session_commits
for op in session.git_operations:
if op.is_commit:
commit_days.add(op.timestamp.date())
git_patterns["commits_per_session"] = round(total_commits / len(sessions), 2) if sessions else 0
git_patterns["commit_frequency"] = round(len(commit_days) / days, 2) if days > 0 else 0
# Problem-solving patterns
problem_solving = {"debug_sessions": 0, "learning_sessions": 0, "implementation_sessions": 0}
for session in sessions:
# Analyze conversation content to infer session type
debug_keywords = ["error", "debug", "bug", "fix", "problem", "issue"]
learn_keywords = ["how", "what", "explain", "understand", "learn", "tutorial"]
impl_keywords = ["implement", "create", "build", "add", "feature"]
session_content = " ".join([
conv.user_prompt or "" for conv in session.conversations
]).lower()
if any(keyword in session_content for keyword in debug_keywords):
problem_solving["debug_sessions"] += 1
elif any(keyword in session_content for keyword in learn_keywords):
problem_solving["learning_sessions"] += 1
elif any(keyword in session_content for keyword in impl_keywords):
problem_solving["implementation_sessions"] += 1
# Tool workflow patterns
common_sequences = {}
for session in sessions:
activities = sorted(session.activities, key=lambda a: a.timestamp)
if len(activities) >= 2:
for i in range(len(activities) - 1):
sequence = f"{activities[i].tool_name}{activities[i+1].tool_name}"
common_sequences[sequence] = common_sequences.get(sequence, 0) + 1
# Get top 5 tool sequences
top_sequences = sorted(common_sequences.items(), key=lambda x: x[1], reverse=True)[:5]
return {
"analysis_period_days": days,
"total_sessions_analyzed": len(sessions),
"working_hours": {
"distribution": hour_distribution,
"peak_hours": sorted(hour_distribution.items(), key=lambda x: x[1], reverse=True)[:3],
"most_active_hour": max(hour_distribution.items(), key=lambda x: x[1])[0] if hour_distribution else None
},
"session_patterns": {
"type_distribution": session_type_distribution,
"average_duration_minutes": round(sum(s.calculated_duration_minutes or 0 for s in sessions) / len(sessions), 1)
},
"git_workflow": git_patterns,
"problem_solving_patterns": problem_solving,
"tool_workflows": {
"common_sequences": [{"sequence": seq, "count": count} for seq, count in top_sequences],
"total_unique_sequences": len(common_sequences)
}
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to analyze development patterns: {str(e)}"
)
@router.get("/analytics/learning")
async def get_learning_insights(
project_id: Optional[int] = Query(None, description="Filter by project ID"),
days: int = Query(30, description="Number of days to analyze"),
db: AsyncSession = Depends(get_db)
):
"""Analyze learning patterns and knowledge development."""
try:
from datetime import datetime, timedelta
start_date = datetime.utcnow() - timedelta(days=days) if days > 0 else None
# Get conversations for learning analysis
conv_query = select(Conversation).options(selectinload(Conversation.session))
if project_id:
conv_query = conv_query.join(Session).where(Session.project_id == project_id)
if start_date:
conv_query = conv_query.where(Conversation.timestamp >= start_date)
conv_result = await db.execute(conv_query)
conversations = conv_result.scalars().all()
if not conversations:
return {"message": "No conversation data available for learning analysis"}
# Topic frequency analysis
learning_keywords = {
"authentication": ["auth", "login", "password", "token", "session"],
"database": ["database", "sql", "query", "table", "migration"],
"api": ["api", "rest", "endpoint", "request", "response"],
"testing": ["test", "pytest", "unittest", "mock", "fixture"],
"deployment": ["deploy", "docker", "aws", "server", "production"],
"debugging": ["debug", "error", "exception", "traceback", "log"],
"optimization": ["optimize", "performance", "speed", "memory", "cache"],
"security": ["security", "vulnerability", "encrypt", "hash", "ssl"]
}
topic_frequency = {topic: 0 for topic in learning_keywords.keys()}
for conv in conversations:
if conv.user_prompt:
prompt_lower = conv.user_prompt.lower()
for topic, keywords in learning_keywords.items():
if any(keyword in prompt_lower for keyword in keywords):
topic_frequency[topic] += 1
# Question complexity analysis
complexity_indicators = {
"beginner": ["how to", "what is", "how do i", "basic", "simple"],
"intermediate": ["best practice", "optimize", "improve", "better way"],
"advanced": ["architecture", "pattern", "scalability", "design", "system"]
}
complexity_distribution = {level: 0 for level in complexity_indicators.keys()}
for conv in conversations:
if conv.user_prompt:
prompt_lower = conv.user_prompt.lower()
for level, indicators in complexity_indicators.items():
if any(indicator in prompt_lower for indicator in indicators):
complexity_distribution[level] += 1
break
# Learning progression analysis
weekly_topics = {}
for conv in conversations:
if conv.user_prompt:
week = conv.timestamp.strftime("%Y-W%U")
if week not in weekly_topics:
weekly_topics[week] = set()
prompt_lower = conv.user_prompt.lower()
for topic, keywords in learning_keywords.items():
if any(keyword in prompt_lower for keyword in keywords):
weekly_topics[week].add(topic)
# Calculate learning velocity (new topics per week)
learning_velocity = []
for week, topics in sorted(weekly_topics.items()):
learning_velocity.append({
"week": week,
"new_topics": len(topics),
"topics": list(topics)
})
# Repetition patterns (topics asked about multiple times)
repeated_topics = {topic: count for topic, count in topic_frequency.items() if count > 1}
return {
"analysis_period_days": days,
"total_conversations_analyzed": len(conversations),
"learning_topics": {
"frequency": topic_frequency,
"most_discussed": max(topic_frequency.items(), key=lambda x: x[1]) if topic_frequency else None,
"repeated_topics": repeated_topics
},
"question_complexity": {
"distribution": complexity_distribution,
"progression_indicator": "advancing" if complexity_distribution["advanced"] > complexity_distribution["beginner"] else "learning_basics"
},
"learning_velocity": learning_velocity,
"insights": {
"diverse_learning": len([t for t, c in topic_frequency.items() if c > 0]),
"deep_dives": len(repeated_topics),
"active_learning_weeks": len(weekly_topics),
"avg_topics_per_week": round(sum(len(topics) for topics in weekly_topics.values()) / len(weekly_topics), 1) if weekly_topics else 0
}
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to analyze learning insights: {str(e)}"
)
@router.get("/analytics/summary")
async def get_analytics_summary(
project_id: Optional[int] = Query(None, description="Filter by project ID"),
db: AsyncSession = Depends(get_db)
):
"""Get a high-level analytics summary dashboard."""
try:
# Get overall statistics
base_query = select(Session)
if project_id:
base_query = base_query.where(Session.project_id == project_id)
session_result = await db.execute(base_query)
all_sessions = session_result.scalars().all()
if not all_sessions:
return {"message": "No data available"}
# Basic metrics
total_sessions = len(all_sessions)
total_time_hours = sum(s.calculated_duration_minutes or 0 for s in all_sessions) / 60
avg_session_minutes = (sum(s.calculated_duration_minutes or 0 for s in all_sessions) / total_sessions) if total_sessions else 0
# Date range
start_date = min(s.start_time for s in all_sessions)
end_date = max(s.start_time for s in all_sessions)
total_days = (end_date - start_date).days + 1
# Activity summary
total_activities = sum(s.activity_count for s in all_sessions)
total_conversations = sum(s.conversation_count for s in all_sessions)
# Recent activity (last 7 days)
from datetime import datetime, timedelta
week_ago = datetime.utcnow() - timedelta(days=7)
recent_sessions = [s for s in all_sessions if s.start_time >= week_ago]
# Project diversity (if not filtered by project)
project_count = 1 if project_id else len(set(s.project_id for s in all_sessions))
return {
"overview": {
"total_sessions": total_sessions,
"total_time_hours": round(total_time_hours, 1),
"average_session_minutes": round(avg_session_minutes, 1),
"total_activities": total_activities,
"total_conversations": total_conversations,
"projects_tracked": project_count,
"tracking_period_days": total_days
},
"recent_activity": {
"sessions_last_7_days": len(recent_sessions),
"time_last_7_days_hours": round(sum(s.calculated_duration_minutes or 0 for s in recent_sessions) / 60, 1),
"daily_average_last_week": round(len(recent_sessions) / 7, 1)
},
"productivity_indicators": {
"activities_per_session": round(total_activities / total_sessions, 1) if total_sessions else 0,
"conversations_per_session": round(total_conversations / total_sessions, 1) if total_sessions else 0,
"productivity_score": min(100, round((total_activities / total_sessions) * 5, 1)) if total_sessions else 0 # Rough scoring
},
"time_distribution": {
"daily_average_hours": round(total_time_hours / total_days, 1),
"longest_session_minutes": max(s.calculated_duration_minutes or 0 for s in all_sessions),
"shortest_session_minutes": min(s.calculated_duration_minutes or 0 for s in all_sessions if s.calculated_duration_minutes)
}
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get analytics summary: {str(e)}"
)