Ryan Malloy 44ed9936b7 Initial commit: Claude Code Project Tracker
Add comprehensive development intelligence system that tracks:
- Development sessions with automatic start/stop
- Full conversation history with semantic search
- Tool usage and file operation analytics
- Think time and engagement analysis
- Git activity correlation
- Learning pattern recognition
- Productivity insights and metrics

Features:
- FastAPI backend with SQLite database
- Modern web dashboard with interactive charts
- Claude Code hook integration for automatic tracking
- Comprehensive test suite with 100+ tests
- Complete API documentation (OpenAPI/Swagger)
- Privacy-first design with local data storage

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-11 02:59:21 -06:00

430 lines
17 KiB
Python

"""
Project data retrieval API endpoints.
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from app.database.connection import get_db
from app.models.project import Project
from app.models.session import Session
from app.models.conversation import Conversation
from app.models.activity import Activity
from app.models.waiting_period import WaitingPeriod
from app.models.git_operation import GitOperation
from app.api.schemas import ProjectSummary, ProjectTimeline, TimelineEvent
router = APIRouter()
@router.get("/projects", response_model=List[ProjectSummary])
async def list_projects(
limit: int = Query(50, description="Maximum number of results"),
offset: int = Query(0, description="Number of results to skip"),
db: AsyncSession = Depends(get_db)
):
"""Get list of all tracked projects with summary statistics."""
try:
# Get projects with basic info
query = select(Project).order_by(Project.last_session.desc().nullslast()).offset(offset).limit(limit)
result = await db.execute(query)
projects = result.scalars().all()
project_summaries = []
for project in projects:
# Get latest session for last_activity
latest_session_result = await db.execute(
select(Session.start_time)
.where(Session.project_id == project.id)
.order_by(Session.start_time.desc())
.limit(1)
)
latest_session = latest_session_result.scalars().first()
project_summaries.append(ProjectSummary(
id=project.id,
name=project.name,
path=project.path,
git_repo=project.git_repo,
languages=project.languages,
total_sessions=project.total_sessions,
total_time_minutes=project.total_time_minutes,
last_activity=latest_session or project.created_at,
files_modified_count=project.files_modified_count,
lines_changed_count=project.lines_changed_count
))
return project_summaries
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list projects: {str(e)}"
)
@router.get("/projects/{project_id}", response_model=ProjectSummary)
async def get_project(
project_id: int,
db: AsyncSession = Depends(get_db)
):
"""Get detailed information about a specific project."""
try:
result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = result.scalars().first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project {project_id} not found"
)
# Get latest session for last_activity
latest_session_result = await db.execute(
select(Session.start_time)
.where(Session.project_id == project.id)
.order_by(Session.start_time.desc())
.limit(1)
)
latest_session = latest_session_result.scalars().first()
return ProjectSummary(
id=project.id,
name=project.name,
path=project.path,
git_repo=project.git_repo,
languages=project.languages,
total_sessions=project.total_sessions,
total_time_minutes=project.total_time_minutes,
last_activity=latest_session or project.created_at,
files_modified_count=project.files_modified_count,
lines_changed_count=project.lines_changed_count
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get project: {str(e)}"
)
@router.get("/projects/{project_id}/timeline", response_model=ProjectTimeline)
async def get_project_timeline(
project_id: int,
start_date: Optional[str] = Query(None, description="Start date (YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="End date (YYYY-MM-DD)"),
db: AsyncSession = Depends(get_db)
):
"""Get chronological timeline of project development."""
try:
# Get project info
project_result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalars().first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project {project_id} not found"
)
# Parse date filters
date_filters = []
if start_date:
from datetime import datetime
start_dt = datetime.fromisoformat(start_date)
date_filters.append(lambda table: table.timestamp >= start_dt if hasattr(table, 'timestamp') else table.start_time >= start_dt)
if end_date:
from datetime import datetime
end_dt = datetime.fromisoformat(end_date + " 23:59:59")
date_filters.append(lambda table: table.timestamp <= end_dt if hasattr(table, 'timestamp') else table.start_time <= end_dt)
timeline_events = []
# Get sessions for this project
session_query = select(Session).where(Session.project_id == project_id)
session_result = await db.execute(session_query)
sessions = session_result.scalars().all()
session_ids = [s.id for s in sessions]
# Session start/end events
for session in sessions:
# Session start
timeline_events.append(TimelineEvent(
timestamp=session.start_time,
type="session_start",
data={
"session_id": session.id,
"session_type": session.session_type,
"git_branch": session.git_branch,
"working_directory": session.working_directory
}
))
# Session end (if ended)
if session.end_time:
timeline_events.append(TimelineEvent(
timestamp=session.end_time,
type="session_end",
data={
"session_id": session.id,
"duration_minutes": session.duration_minutes,
"activity_count": session.activity_count,
"conversation_count": session.conversation_count
}
))
if session_ids:
# Get conversations
conv_query = select(Conversation).where(Conversation.session_id.in_(session_ids))
if date_filters:
for date_filter in date_filters:
conv_query = conv_query.where(date_filter(Conversation))
conv_result = await db.execute(conv_query)
conversations = conv_result.scalars().all()
for conv in conversations:
timeline_events.append(TimelineEvent(
timestamp=conv.timestamp,
type="conversation",
data={
"id": conv.id,
"session_id": conv.session_id,
"exchange_type": conv.exchange_type,
"user_prompt": conv.user_prompt[:100] + "..." if conv.user_prompt and len(conv.user_prompt) > 100 else conv.user_prompt,
"tools_used": conv.tools_used,
"files_affected": conv.files_affected
}
))
# Get activities
activity_query = select(Activity).where(Activity.session_id.in_(session_ids))
if date_filters:
for date_filter in date_filters:
activity_query = activity_query.where(date_filter(Activity))
activity_result = await db.execute(activity_query)
activities = activity_result.scalars().all()
for activity in activities:
timeline_events.append(TimelineEvent(
timestamp=activity.timestamp,
type="activity",
data={
"id": activity.id,
"session_id": activity.session_id,
"tool_name": activity.tool_name,
"action": activity.action,
"file_path": activity.file_path,
"success": activity.success,
"lines_changed": activity.total_lines_changed
}
))
# Get git operations
git_query = select(GitOperation).where(GitOperation.session_id.in_(session_ids))
if date_filters:
for date_filter in date_filters:
git_query = git_query.where(date_filter(GitOperation))
git_result = await db.execute(git_query)
git_operations = git_result.scalars().all()
for git_op in git_operations:
timeline_events.append(TimelineEvent(
timestamp=git_op.timestamp,
type="git_operation",
data={
"id": git_op.id,
"session_id": git_op.session_id,
"operation": git_op.operation,
"commit_hash": git_op.commit_hash,
"commit_message": git_op.get_commit_message(),
"files_changed": git_op.files_count,
"lines_changed": git_op.total_lines_changed
}
))
# Sort timeline by timestamp
timeline_events.sort(key=lambda x: x.timestamp)
# Create project summary for response
latest_session_result = await db.execute(
select(Session.start_time)
.where(Session.project_id == project.id)
.order_by(Session.start_time.desc())
.limit(1)
)
latest_session = latest_session_result.scalars().first()
project_summary = ProjectSummary(
id=project.id,
name=project.name,
path=project.path,
git_repo=project.git_repo,
languages=project.languages,
total_sessions=project.total_sessions,
total_time_minutes=project.total_time_minutes,
last_activity=latest_session or project.created_at,
files_modified_count=project.files_modified_count,
lines_changed_count=project.lines_changed_count
)
return ProjectTimeline(
project=project_summary,
timeline=timeline_events
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get project timeline: {str(e)}"
)
@router.get("/projects/{project_id}/stats")
async def get_project_stats(
project_id: int,
days: int = Query(30, description="Number of days to include in statistics"),
db: AsyncSession = Depends(get_db)
):
"""Get comprehensive statistics for a project."""
try:
# Verify project exists
project_result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalars().first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project {project_id} not found"
)
# Date filter for recent activity
from datetime import datetime, timedelta
start_date = datetime.utcnow() - timedelta(days=days) if days > 0 else None
# Get sessions
session_query = select(Session).where(Session.project_id == project_id)
if start_date:
session_query = session_query.where(Session.start_time >= start_date)
session_result = await db.execute(session_query)
sessions = session_result.scalars().all()
session_ids = [s.id for s in sessions] if sessions else []
# Calculate session statistics
total_sessions = len(sessions)
total_time = sum(s.calculated_duration_minutes or 0 for s in sessions)
avg_session_length = total_time / total_sessions if total_sessions > 0 else 0
# Activity statistics
activity_stats = {"total": 0, "by_tool": {}, "by_language": {}}
if session_ids:
activity_query = select(Activity).where(Activity.session_id.in_(session_ids))
activity_result = await db.execute(activity_query)
activities = activity_result.scalars().all()
activity_stats["total"] = len(activities)
for activity in activities:
# Tool usage
tool = activity.tool_name
activity_stats["by_tool"][tool] = activity_stats["by_tool"].get(tool, 0) + 1
# Language usage
lang = activity.get_programming_language()
if lang:
activity_stats["by_language"][lang] = activity_stats["by_language"].get(lang, 0) + 1
# Conversation statistics
conversation_stats = {"total": 0, "by_type": {}}
if session_ids:
conv_query = select(Conversation).where(Conversation.session_id.in_(session_ids))
conv_result = await db.execute(conv_query)
conversations = conv_result.scalars().all()
conversation_stats["total"] = len(conversations)
for conv in conversations:
conv_type = conv.exchange_type
conversation_stats["by_type"][conv_type] = conversation_stats["by_type"].get(conv_type, 0) + 1
# Git statistics
git_stats = {"total": 0, "commits": 0, "by_operation": {}}
if session_ids:
git_query = select(GitOperation).where(GitOperation.session_id.in_(session_ids))
git_result = await db.execute(git_query)
git_operations = git_result.scalars().all()
git_stats["total"] = len(git_operations)
for git_op in git_operations:
if git_op.is_commit:
git_stats["commits"] += 1
op_type = git_op.operation
git_stats["by_operation"][op_type] = git_stats["by_operation"].get(op_type, 0) + 1
# Productivity trends (daily aggregation)
daily_stats = {}
for session in sessions:
date_key = session.start_time.date().isoformat()
if date_key not in daily_stats:
daily_stats[date_key] = {
"date": date_key,
"sessions": 0,
"time_minutes": 0,
"activities": 0
}
daily_stats[date_key]["sessions"] += 1
daily_stats[date_key]["time_minutes"] += session.calculated_duration_minutes or 0
daily_stats[date_key]["activities"] += session.activity_count
productivity_trends = list(daily_stats.values())
productivity_trends.sort(key=lambda x: x["date"])
return {
"project_id": project_id,
"project_name": project.name,
"time_period_days": days,
"session_statistics": {
"total_sessions": total_sessions,
"total_time_minutes": total_time,
"average_session_length_minutes": round(avg_session_length, 1)
},
"activity_statistics": activity_stats,
"conversation_statistics": conversation_stats,
"git_statistics": git_stats,
"productivity_trends": productivity_trends,
"summary": {
"most_used_tool": max(activity_stats["by_tool"].items(), key=lambda x: x[1])[0] if activity_stats["by_tool"] else None,
"primary_language": max(activity_stats["by_language"].items(), key=lambda x: x[1])[0] if activity_stats["by_language"] else None,
"daily_average_time": round(total_time / days, 1) if days > 0 else 0,
"daily_average_sessions": round(total_sessions / days, 1) if days > 0 else 0
}
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get project stats: {str(e)}"
)