Ryan Malloy fb574d26b8 Add MCP progress reporting to all multi-fetch tools
ctx.report_progress() gives clients real-time visibility into
long-running tool calls. Per-fetch counters for parallel gather
calls, stage-based milestones for linear pipelines. No-op when
client doesn't send a progressToken.
2026-02-23 13:28:56 -07:00

195 lines
7.2 KiB
Python

"""Visualization tools — tide charts and conditions dashboards."""
import asyncio
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
from fastmcp import Context, FastMCP
from fastmcp.utilities.types import Image
from mcnoaa_tides.charts import check_deps
from mcnoaa_tides.client import NOAAClient
def register(mcp: FastMCP) -> None:
@mcp.tool(tags={"visualization"})
async def visualize_tides(
ctx: Context,
station_id: str,
hours: int = 48,
include_observed: bool = True,
format: Literal["png", "html"] = "png",
) -> Image | str:
"""Generate a tide prediction chart with high/low markers.
Creates a visual chart of tide predictions showing the water level curve
with high (H) and low (L) tide markers. Optionally overlays observed
water levels as a dashed line for comparison.
PNG format returns an inline image. HTML format saves an interactive
chart to artifacts/charts/ and returns the file path.
Requires mcnoaa-tides[viz] to be installed.
"""
check_deps(format)
noaa: NOAAClient = ctx.lifespan_context["noaa_client"]
today = datetime.now(timezone.utc).strftime("%Y%m%d")
# Fetch predictions (6-minute interval for smooth curve) + hilo for markers
await ctx.report_progress(1, 4, "Fetching predictions")
predictions_raw, hilo_raw = await asyncio.gather(
noaa.get_data(
station_id, product="predictions", begin_date=today,
hours=hours, interval="6",
),
noaa.get_data(
station_id, product="predictions", begin_date=today,
hours=hours, interval="hilo",
),
)
# Merge hilo type markers into the 6-minute data points
predictions = predictions_raw.get("predictions", [])
hilo_map = {}
for h in hilo_raw.get("predictions", []):
hilo_map[h["t"]] = h.get("type")
for p in predictions:
if p["t"] in hilo_map:
p["type"] = hilo_map[p["t"]]
# Fetch observed water levels if requested
observed = None
if include_observed:
await ctx.report_progress(2, 4, "Fetching observed data")
try:
obs_raw = await noaa.get_data(
station_id, product="water_level", hours=hours,
)
observed = obs_raw.get("data", [])
except Exception:
pass # observed overlay is optional — skip on failure
# Look up station name
await ctx.report_progress(3, 4, "Looking up station")
station_name = ""
try:
stations = await noaa.get_stations()
match = [s for s in stations if s.id == station_id]
if match:
station_name = match[0].name
except Exception:
pass
await ctx.report_progress(4, 4, "Rendering chart")
if format == "png":
from mcnoaa_tides.charts.tides import render_tide_chart_png
png_bytes = render_tide_chart_png(predictions, observed, station_name)
return Image(data=png_bytes, format="image/png")
else:
from mcnoaa_tides.charts.tides import render_tide_chart_html
html = render_tide_chart_html(predictions, observed, station_name)
path = _save_html(html, station_id, "tides")
return f"Interactive tide chart saved to: {path}"
@mcp.tool(tags={"visualization"})
async def visualize_conditions(
ctx: Context,
station_id: str,
hours: int = 24,
format: Literal["png", "html"] = "png",
) -> Image | str:
"""Generate a multi-panel marine conditions dashboard.
Creates a dashboard with up to 4 panels:
- Tide predictions with observed water level overlay
- Wind speed and gust
- Air and water temperature
- Barometric pressure with trend indicator
Products unavailable at a station are simply omitted from the dashboard.
PNG format returns an inline image. HTML format saves an interactive
chart to artifacts/charts/ and returns the file path.
Requires mcnoaa-tides[viz] to be installed.
"""
check_deps(format)
noaa: NOAAClient = ctx.lifespan_context["noaa_client"]
today = datetime.now(timezone.utc).strftime("%Y%m%d")
# Parallel fetch — same products as marine_conditions_snapshot
requests = {
"predictions": {
"product": "predictions", "interval": "hilo",
"begin_date": today, "hours": hours,
},
"water_level": {"product": "water_level", "hours": hours},
"water_temperature": {"product": "water_temperature", "hours": hours},
"air_temperature": {"product": "air_temperature", "hours": hours},
"wind": {"product": "wind", "hours": hours},
"air_pressure": {"product": "air_pressure", "hours": hours},
}
completed = 0
total_steps = len(requests) + 2 # 6 fetches + station lookup + render
async def fetch(name: str, params: dict) -> tuple[str, dict | None]:
nonlocal completed
try:
data = await noaa.get_data(station_id, **params)
result = name, data
except Exception:
result = name, None
completed += 1
await ctx.report_progress(completed, total_steps, f"Fetched {name}")
return result
results = await asyncio.gather(
*[fetch(name, params) for name, params in requests.items()]
)
snapshot: dict = {"station_id": station_id}
for name, data in results:
if data is not None:
snapshot[name] = data
# Look up station name
await ctx.report_progress(total_steps - 1, total_steps, "Looking up station")
station_name = ""
try:
stations = await noaa.get_stations()
match = [s for s in stations if s.id == station_id]
if match:
station_name = match[0].name
except Exception:
pass
await ctx.report_progress(total_steps, total_steps, "Rendering dashboard")
if format == "png":
from mcnoaa_tides.charts.conditions import render_conditions_png
png_bytes = render_conditions_png(snapshot, station_name)
return Image(data=png_bytes, format="image/png")
else:
from mcnoaa_tides.charts.conditions import render_conditions_html
html = render_conditions_html(snapshot, station_name)
path = _save_html(html, station_id, "conditions")
return f"Interactive conditions dashboard saved to: {path}"
def _save_html(html: str, station_id: str, chart_type: str) -> Path:
"""Save HTML chart to artifacts/charts/ and return the path."""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
out_dir = Path("artifacts/charts")
out_dir.mkdir(parents=True, exist_ok=True)
path = out_dir / f"{station_id}_{chart_type}_{timestamp}.html"
path.write_text(html, encoding="utf-8")
return path