claude-code-tracker/app/api/activities.py
Ryan Malloy 44ed9936b7 Initial commit: Claude Code Project Tracker
Add comprehensive development intelligence system that tracks:
- Development sessions with automatic start/stop
- Full conversation history with semantic search
- Tool usage and file operation analytics
- Think time and engagement analysis
- Git activity correlation
- Learning pattern recognition
- Productivity insights and metrics

Features:
- FastAPI backend with SQLite database
- Modern web dashboard with interactive charts
- Claude Code hook integration for automatic tracking
- Comprehensive test suite with 100+ tests
- Complete API documentation (OpenAPI/Swagger)
- Privacy-first design with local data storage

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-11 02:59:21 -06:00

304 lines
11 KiB
Python

"""
Activity tracking API endpoints.
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from app.database.connection import get_db
from app.models.activity import Activity
from app.models.session import Session
from app.api.schemas import ActivityRequest, ActivityResponse
router = APIRouter()
@router.post("/activity", response_model=ActivityResponse, status_code=status.HTTP_201_CREATED)
async def record_activity(
request: ActivityRequest,
db: AsyncSession = Depends(get_db)
):
"""
Record a development activity (tool usage, file operation, etc.).
This endpoint is called by Claude Code PostToolUse hooks.
"""
try:
# Verify session exists
result = await db.execute(
select(Session).where(Session.id == request.session_id)
)
session = result.scalars().first()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session {request.session_id} not found"
)
# Create activity record
activity = Activity(
session_id=request.session_id,
conversation_id=request.conversation_id,
timestamp=request.timestamp,
tool_name=request.tool_name,
action=request.action,
file_path=request.file_path,
metadata=request.metadata,
success=request.success,
error_message=request.error_message,
lines_added=request.lines_added,
lines_removed=request.lines_removed
)
db.add(activity)
# Update session activity count
session.add_activity()
# Add file to session's touched files if applicable
if request.file_path:
session.add_file_touched(request.file_path)
await db.commit()
await db.refresh(activity)
return ActivityResponse(
id=activity.id,
session_id=activity.session_id,
tool_name=activity.tool_name,
action=activity.action,
timestamp=activity.timestamp,
success=activity.success
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to record activity: {str(e)}"
)
@router.get("/activities")
async def get_activities(
session_id: Optional[int] = Query(None, description="Filter by session ID"),
tool_name: Optional[str] = Query(None, description="Filter by tool name"),
limit: int = Query(50, description="Maximum number of results"),
offset: int = Query(0, description="Number of results to skip"),
db: AsyncSession = Depends(get_db)
):
"""Get activities with optional filtering."""
try:
query = select(Activity).options(
selectinload(Activity.session).selectinload(Session.project)
)
# Apply filters
if session_id:
query = query.where(Activity.session_id == session_id)
if tool_name:
query = query.where(Activity.tool_name == tool_name)
# Order by timestamp descending
query = query.order_by(Activity.timestamp.desc()).offset(offset).limit(limit)
result = await db.execute(query)
activities = result.scalars().all()
return [
{
"id": activity.id,
"session_id": activity.session_id,
"project_name": activity.session.project.name,
"timestamp": activity.timestamp,
"tool_name": activity.tool_name,
"action": activity.action,
"file_path": activity.file_path,
"success": activity.success,
"programming_language": activity.get_programming_language(),
"lines_changed": activity.total_lines_changed,
"metadata": activity.metadata
}
for activity in activities
]
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get activities: {str(e)}"
)
@router.get("/activities/{activity_id}")
async def get_activity(
activity_id: int,
db: AsyncSession = Depends(get_db)
):
"""Get detailed information about a specific activity."""
try:
result = await db.execute(
select(Activity)
.options(
selectinload(Activity.session).selectinload(Session.project),
selectinload(Activity.conversation)
)
.where(Activity.id == activity_id)
)
activity = result.scalars().first()
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Activity {activity_id} not found"
)
return {
"id": activity.id,
"session_id": activity.session_id,
"conversation_id": activity.conversation_id,
"project_name": activity.session.project.name,
"timestamp": activity.timestamp,
"tool_name": activity.tool_name,
"action": activity.action,
"file_path": activity.file_path,
"metadata": activity.metadata,
"success": activity.success,
"error_message": activity.error_message,
"lines_added": activity.lines_added,
"lines_removed": activity.lines_removed,
"total_lines_changed": activity.total_lines_changed,
"net_lines_changed": activity.net_lines_changed,
"file_extension": activity.get_file_extension(),
"programming_language": activity.get_programming_language(),
"is_file_operation": activity.is_file_operation,
"is_code_execution": activity.is_code_execution,
"is_search_operation": activity.is_search_operation,
"command_executed": activity.get_command_executed(),
"search_pattern": activity.get_search_pattern(),
"task_type": activity.get_task_type()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get activity: {str(e)}"
)
@router.get("/activities/stats/tools")
async def get_tool_usage_stats(
session_id: Optional[int] = Query(None, description="Filter by session ID"),
project_id: Optional[int] = Query(None, description="Filter by project ID"),
days: int = Query(30, description="Number of days to include"),
db: AsyncSession = Depends(get_db)
):
"""Get tool usage statistics."""
try:
# Base query for tool usage counts
query = select(
Activity.tool_name,
func.count(Activity.id).label('usage_count'),
func.count(func.distinct(Activity.session_id)).label('sessions_used'),
func.avg(Activity.lines_added + Activity.lines_removed).label('avg_lines_changed'),
func.sum(Activity.lines_added + Activity.lines_removed).label('total_lines_changed')
).group_by(Activity.tool_name)
# Apply filters
if session_id:
query = query.where(Activity.session_id == session_id)
elif project_id:
query = query.join(Session).where(Session.project_id == project_id)
# Filter by date range
if days > 0:
from datetime import datetime, timedelta
start_date = datetime.utcnow() - timedelta(days=days)
query = query.where(Activity.timestamp >= start_date)
result = await db.execute(query)
stats = result.all()
return [
{
"tool_name": stat.tool_name,
"usage_count": stat.usage_count,
"sessions_used": stat.sessions_used,
"avg_lines_changed": float(stat.avg_lines_changed or 0),
"total_lines_changed": stat.total_lines_changed or 0
}
for stat in stats
]
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get tool usage stats: {str(e)}"
)
@router.get("/activities/stats/languages")
async def get_language_usage_stats(
project_id: Optional[int] = Query(None, description="Filter by project ID"),
days: int = Query(30, description="Number of days to include"),
db: AsyncSession = Depends(get_db)
):
"""Get programming language usage statistics."""
try:
# Get activities with file operations
query = select(Activity).where(
Activity.file_path.isnot(None),
Activity.tool_name.in_(["Edit", "Write", "Read"])
)
# Apply filters
if project_id:
query = query.join(Session).where(Session.project_id == project_id)
if days > 0:
from datetime import datetime, timedelta
start_date = datetime.utcnow() - timedelta(days=days)
query = query.where(Activity.timestamp >= start_date)
result = await db.execute(query)
activities = result.scalars().all()
# Count by programming language
language_stats = {}
for activity in activities:
lang = activity.get_programming_language()
if lang:
if lang not in language_stats:
language_stats[lang] = {
"language": lang,
"file_count": 0,
"activity_count": 0,
"lines_added": 0,
"lines_removed": 0
}
language_stats[lang]["activity_count"] += 1
language_stats[lang]["lines_added"] += activity.lines_added or 0
language_stats[lang]["lines_removed"] += activity.lines_removed or 0
# Count unique files per language
for activity in activities:
lang = activity.get_programming_language()
if lang and lang in language_stats:
# This is a rough approximation - in reality we'd need to track unique files
language_stats[lang]["file_count"] += 1
return list(language_stats.values())
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get language usage stats: {str(e)}"
)