Add SmartPot tidal intelligence tools

4 new tools (tidal_phase, deployment_briefing, catch_tidal_context,
water_level_anomaly) and 2 prompts (smartpot_deployment, crab_pot_analysis)
for autonomous crab pot deployment planning and catch correlation.

Pure tidal phase classification in tidal.py with no MCP dependencies.
65 tests passing, lint clean.
This commit is contained in:
Ryan Malloy 2026-02-22 18:31:03 -07:00
parent 9f6d7bb4ac
commit c7320e599b
9 changed files with 1225 additions and 6 deletions

View File

@ -75,4 +75,86 @@ GO / NO-GO criteria:
Provide a clear GO / CAUTION / NO-GO recommendation with reasoning.
If any single factor is NO-GO, the overall assessment should be NO-GO.
"""
@mcp.prompt()
def smartpot_deployment(
latitude: str,
longitude: str,
soak_hours: str = "48",
) -> str:
"""Full SmartPot deployment planning workflow.
Orchestrates station discovery, tidal phase, deployment briefing,
and anomaly detection into a single deployment recommendation.
"""
return f"""Plan a SmartPot crab pot deployment at ({latitude}, {longitude})
with a {soak_hours}-hour soak window.
Workflow:
1. Use find_nearest_stations with latitude={latitude}, longitude={longitude}
to identify the reference tide station. Note the distance data
accuracy decreases beyond ~10 nm from the station.
2. Use tidal_phase with latitude={latitude}, longitude={longitude}
to determine the current tidal phase. Deploying during slack or
early flood maximizes initial soak productivity.
3. Use deployment_briefing with latitude={latitude}, longitude={longitude},
soak_hours={soak_hours} to get a GO/CAUTION/NO-GO assessment.
Review the tide schedule, wind, temperature, and pressure advisories.
4. Use water_level_anomaly on the nearest station to check for storm
surge or unusual water level deviations that could affect pot
stability or recovery.
5. Synthesize a deployment recommendation:
- Overall GO/CAUTION/NO-GO with reasoning
- Optimal deployment window within the next 6 hours
- Expected tidal cycles during soak
- Any safety or recovery concerns
- Recommended recovery time based on tide schedule
"""
@mcp.prompt()
def crab_pot_analysis(
events_description: str = "",
) -> str:
"""Guided catch-data analysis with tidal correlation.
Helps analyze historical catch events against tidal phases
to identify productive patterns.
"""
events_note = (
f"\nCatch data context: {events_description}"
if events_description else ""
)
return f"""Analyze crab pot catch data for tidal phase correlation.{events_note}
Workflow:
1. Gather catch event data. Each event needs at minimum:
- timestamp (UTC)
- latitude, longitude (pot GPS position)
- catch_count or weight
Any additional fields (species, pot_id, bait_type) will be preserved.
2. Use catch_tidal_context to enrich all events with tidal phase data.
This will classify each event as ebb, flood, slack_high, or slack_low
and note progress through the phase.
3. Group enriched events by tidal phase and compute:
- Average catch per phase
- Best-performing phase
- Catch distribution across phases
4. Analyze patterns:
- Do certain phases consistently outperform?
- Is there a sweet spot in phase progress (e.g., early flood vs late flood)?
- Any correlation with tide range (spring vs neap)?
5. Produce a summary with:
- Top-performing tidal conditions
- Recommended deployment timing relative to tide
- Confidence level based on sample size
- Suggestions for future data collection
"""

View File

@ -7,7 +7,7 @@ from fastmcp import FastMCP
from mcnoaa_tides import __version__, prompts, resources
from mcnoaa_tides.client import NOAAClient
from mcnoaa_tides.tools import charts, conditions, meteorological, stations, tides
from mcnoaa_tides.tools import charts, conditions, meteorological, smartpot, stations, tides
@asynccontextmanager
@ -49,6 +49,7 @@ stations.register(mcp)
tides.register(mcp)
meteorological.register(mcp)
conditions.register(mcp)
smartpot.register(mcp)
charts.register(mcp)
# Register resources and prompts

192
src/mcnoaa_tides/tidal.py Normal file
View File

@ -0,0 +1,192 @@
"""Pure tidal phase classification and interpolation.
No I/O, no FastMCP dependencies just datetime math. The NOAA hilo
predictions give us discrete high/low events; this module classifies
an arbitrary timestamp into a tidal phase (ebb, flood, slack_high,
slack_low) by finding the bracketing events and computing progress.
"""
from datetime import datetime
# Minutes around a high/low event that count as "slack"
SLACK_WINDOW_MIN = 30
def parse_hilo_predictions(predictions: list[dict]) -> list[dict]:
"""Convert raw NOAA hilo dicts into typed records.
Input: [{"t": "2026-02-21 04:30", "v": "4.521", "type": "H"}, ...]
Output: [{"dt": datetime, "v": 4.521, "type": "H"}, ...]
Filters out records without a "type" field (non-hilo entries that
sometimes appear in mixed-interval responses), sorts by time.
"""
parsed = []
for p in predictions:
if not p.get("type"):
continue
parsed.append({
"dt": datetime.strptime(p["t"], "%Y-%m-%d %H:%M"),
"v": float(p["v"]),
"type": p["type"],
})
parsed.sort(key=lambda x: x["dt"])
return parsed
def classify_tidal_phase(now: datetime, hilo_events: list[dict]) -> dict:
"""Classify the tidal phase at a given timestamp.
Parameters
----------
now : datetime
Naive datetime (matching the timezone of hilo_events).
hilo_events : list[dict]
Output of parse_hilo_predictions sorted by "dt".
Returns
-------
dict with keys:
phase "ebb", "flood", "slack_high", "slack_low"
description human-readable explanation
previous {"type", "time", "level_ft"} or None
next {"type", "time", "level_ft"} or None
minutes_since_previous int or None
minutes_to_next int or None
progress_pct 0-100 progress between previous and next event
"""
if not hilo_events:
return {
"phase": "unknown",
"description": "No hilo data available for classification",
"previous": None,
"next": None,
"minutes_since_previous": None,
"minutes_to_next": None,
"progress_pct": None,
}
prev_event = None
next_event = None
for event in hilo_events:
if event["dt"] <= now:
prev_event = event
elif next_event is None:
next_event = event
def _fmt(event):
if event is None:
return None
return {
"type": "high" if event["type"] == "H" else "low",
"time": event["dt"].strftime("%Y-%m-%d %H:%M"),
"level_ft": event["v"],
}
mins_since = int((now - prev_event["dt"]).total_seconds() / 60) if prev_event else None
mins_to = int((next_event["dt"] - now).total_seconds() / 60) if next_event else None
# Progress between bracketing events
progress = None
if prev_event and next_event:
total = (next_event["dt"] - prev_event["dt"]).total_seconds()
elapsed = (now - prev_event["dt"]).total_seconds()
progress = round(elapsed / total * 100, 1) if total > 0 else 0.0
# Classify phase
phase, desc = _determine_phase(prev_event, next_event, mins_since, mins_to)
return {
"phase": phase,
"description": desc,
"previous": _fmt(prev_event),
"next": _fmt(next_event),
"minutes_since_previous": mins_since,
"minutes_to_next": mins_to,
"progress_pct": progress,
}
def _determine_phase(prev_event, next_event, mins_since, mins_to):
"""Return (phase, description) based on bracketing events."""
# Near a high → slack_high
if (
prev_event and prev_event["type"] == "H"
and mins_since is not None and mins_since <= SLACK_WINDOW_MIN
):
return "slack_high", "Near high tide — current is slack"
if (
next_event and next_event["type"] == "H"
and mins_to is not None and mins_to <= SLACK_WINDOW_MIN
):
return "slack_high", "Approaching high tide — current is slack"
# Near a low → slack_low
if (
prev_event and prev_event["type"] == "L"
and mins_since is not None and mins_since <= SLACK_WINDOW_MIN
):
return "slack_low", "Near low tide — current is slack"
if (
next_event and next_event["type"] == "L"
and mins_to is not None and mins_to <= SLACK_WINDOW_MIN
):
return "slack_low", "Approaching low tide — current is slack"
# Between events: H→L = ebb, L→H = flood
if prev_event and next_event:
if prev_event["type"] == "H" and next_event["type"] == "L":
return "ebb", "Tide is falling (ebb) — water moving seaward"
if prev_event["type"] == "L" and next_event["type"] == "H":
return "flood", "Tide is rising (flood) — water moving shoreward"
# Only one bracketing event available
if prev_event and not next_event:
if prev_event["type"] == "H":
return "ebb", "Past high tide — likely ebbing"
return "flood", "Past low tide — likely flooding"
if next_event and not prev_event:
if next_event["type"] == "H":
return "flood", "Before next high — likely flooding"
return "ebb", "Before next low — likely ebbing"
return "unknown", "Unable to determine tidal phase"
def interpolate_predictions(
obs_dt: datetime,
pred_times: list[datetime],
pred_values: list[float],
) -> float | None:
"""Linear interpolation of predicted water level at an arbitrary time.
Parameters
----------
obs_dt : datetime
The timestamp to interpolate at.
pred_times : list[datetime]
Sorted prediction timestamps.
pred_values : list[float]
Corresponding water levels (same length as pred_times).
Returns None if obs_dt is outside the prediction window.
"""
if len(pred_times) < 2 or len(pred_times) != len(pred_values):
return None
if obs_dt < pred_times[0] or obs_dt > pred_times[-1]:
return None
# Find bracketing indices
for i in range(len(pred_times) - 1):
t0, t1 = pred_times[i], pred_times[i + 1]
if t0 <= obs_dt <= t1:
span = (t1 - t0).total_seconds()
if span == 0:
return pred_values[i]
frac = (obs_dt - t0).total_seconds() / span
return pred_values[i] + frac * (pred_values[i + 1] - pred_values[i])
return None

View File

@ -0,0 +1,469 @@
"""SmartPot tidal intelligence tools.
Bridges NOAA CO-OPS tide data with autonomous crab pot deployments:
tidal phase awareness, deployment assessment, catch enrichment, and
anomaly detection.
"""
import asyncio
from datetime import datetime, timedelta, timezone
from fastmcp import Context, FastMCP
from mcnoaa_tides.client import NOAAClient
from mcnoaa_tides.tidal import (
classify_tidal_phase,
interpolate_predictions,
parse_hilo_predictions,
)
async def _resolve_station(
noaa: NOAAClient,
station_id: str = "",
latitude: float | None = None,
longitude: float | None = None,
) -> tuple[dict, float | None]:
"""Resolve a station from either a 7-digit ID or GPS coordinates.
Returns (station_dict, distance_nm). distance_nm is None when
looked up by ID, or the haversine distance when found via GPS.
"""
if station_id:
stations = await noaa.search(query=station_id)
if not stations:
raise ValueError(
f"Station '{station_id}' not found. "
"Use search_stations to find valid IDs."
)
s = stations[0]
return {"id": s.id, "name": s.name, "lat": s.lat, "lng": s.lng}, None
if latitude is not None and longitude is not None:
nearest = await noaa.find_nearest(latitude, longitude, limit=1)
if not nearest:
raise ValueError(
f"No NOAA station found within 100 nm of "
f"({latitude}, {longitude}). Try a coastal location."
)
s, dist = nearest[0]
return {"id": s.id, "name": s.name, "lat": s.lat, "lng": s.lng}, round(dist, 2)
raise ValueError(
"Provide either station_id or both latitude and longitude."
)
def register(mcp: FastMCP) -> None:
@mcp.tool(tags={"smartpot"})
async def tidal_phase(
ctx: Context,
station_id: str = "",
latitude: float | None = None,
longitude: float | None = None,
) -> dict:
"""Classify the current tidal phase at a station or GPS location.
Returns the phase (ebb, flood, slack_high, slack_low), timing
relative to the previous and next high/low events, progress
percentage through the current phase, and latest observed level.
Provide either station_id (7-digit NOAA ID) or latitude+longitude.
GPS coordinates find the nearest tidal station automatically.
"""
noaa: NOAAClient = ctx.lifespan_context["noaa_client"]
station, distance_nm = await _resolve_station(
noaa, station_id, latitude, longitude,
)
now_utc = datetime.now(timezone.utc)
# Fetch hilo predictions ±12h around now
begin = (now_utc - timedelta(hours=12)).strftime("%Y%m%d %H:%M")
end = (now_utc + timedelta(hours=12)).strftime("%Y%m%d %H:%M")
hilo_data, obs_data = await asyncio.gather(
noaa.get_data(
station["id"], "predictions",
begin_date=begin, end_date=end,
interval="hilo", time_zone="gmt",
),
noaa.get_data(
station["id"], "water_level",
hours=1, time_zone="gmt",
),
return_exceptions=True,
)
# Parse hilo predictions
hilo_preds = []
if isinstance(hilo_data, dict):
hilo_preds = parse_hilo_predictions(hilo_data.get("predictions", []))
# Classify using naive UTC datetime (hilo_preds are naive from NOAA gmt)
now_naive = now_utc.replace(tzinfo=None)
phase_info = classify_tidal_phase(now_naive, hilo_preds)
# Latest observed level
latest_obs = None
if isinstance(obs_data, dict):
readings = obs_data.get("data", [])
if readings:
last = readings[-1]
latest_obs = {"time": last["t"], "level_ft": float(last["v"])}
result = {
"station": station,
"timestamp_utc": now_utc.isoformat(),
**phase_info,
"latest_observed": latest_obs,
}
if distance_nm is not None:
result["station_distance_nm"] = distance_nm
return result
@mcp.tool(tags={"smartpot"})
async def deployment_briefing(
ctx: Context,
latitude: float,
longitude: float,
soak_hours: int = 48,
) -> dict:
"""GO/CAUTION/NO-GO deployment assessment for crab pot placement.
Finds the nearest NOAA station, fetches tide predictions covering
the soak window, and checks current wind, temperature, and pressure.
Assessment logic:
Wind >20 kn sustained NO-GO
Wind 15-20 kn CAUTION
Water temp <40°F cold-water advisory
Falling pressure weather advisory
Returns station info, tide schedule, conditions, and assessment.
"""
noaa: NOAAClient = ctx.lifespan_context["noaa_client"]
station, distance_nm = await _resolve_station(
noaa, latitude=latitude, longitude=longitude,
)
now_utc = datetime.now(timezone.utc)
begin = now_utc.strftime("%Y%m%d %H:%M")
end = (now_utc + timedelta(hours=soak_hours)).strftime("%Y%m%d %H:%M")
# Parallel fetch: hilo predictions for soak window + current conditions
hilo_fut = noaa.get_data(
station["id"], "predictions",
begin_date=begin, end_date=end,
interval="hilo", time_zone="gmt",
)
async def _safe_fetch(product, **kwargs):
try:
return await noaa.get_data(station["id"], product, **kwargs)
except Exception:
return None
hilo_data, wind_data, temp_data, pressure_data = await asyncio.gather(
hilo_fut,
_safe_fetch("wind", hours=3, time_zone="gmt"),
_safe_fetch("water_temperature", hours=3, time_zone="gmt"),
_safe_fetch("air_pressure", hours=3, time_zone="gmt"),
)
# Build tide schedule
hilo_events = parse_hilo_predictions(hilo_data.get("predictions", []))
tide_schedule = [
{
"type": "high" if e["type"] == "H" else "low",
"time_utc": e["dt"].strftime("%Y-%m-%d %H:%M"),
"level_ft": e["v"],
}
for e in hilo_events
]
tidal_cycles = max(0, len(hilo_events) // 2)
# Current conditions
conditions = {}
advisories = []
assessment = "GO"
# Wind assessment
if wind_data and wind_data.get("data"):
latest_wind = wind_data["data"][-1]
wind_speed = float(latest_wind["s"])
wind_gust = float(latest_wind["g"])
conditions["wind"] = {
"speed_kn": wind_speed,
"gust_kn": wind_gust,
"direction": latest_wind.get("dr", ""),
}
if wind_speed > 20 or wind_gust > 30:
assessment = "NO-GO"
advisories.append(
f"Wind {wind_speed} kn (gusts {wind_gust} kn) exceeds safe limits"
)
elif wind_speed > 15 or wind_gust > 25:
if assessment != "NO-GO":
assessment = "CAUTION"
advisories.append(
f"Wind {wind_speed} kn (gusts {wind_gust} kn) — marginal conditions"
)
# Water temperature
if temp_data and temp_data.get("data"):
latest_temp = temp_data["data"][-1]
water_temp = float(latest_temp["v"])
conditions["water_temperature_f"] = water_temp
if water_temp < 40:
advisories.append(
f"Water temperature {water_temp}°F — cold-water safety protocols advised"
)
# Pressure trend
if pressure_data and pressure_data.get("data"):
readings = pressure_data["data"]
latest_pressure = float(readings[-1]["v"])
conditions["air_pressure_mb"] = latest_pressure
if len(readings) >= 2:
first_pressure = float(readings[0]["v"])
drop = first_pressure - latest_pressure
if drop > 3:
advisories.append(
f"Pressure dropping {drop:.1f} mb — possible approaching weather system"
)
return {
"station": station,
"station_distance_nm": distance_nm,
"timestamp_utc": now_utc.isoformat(),
"soak_window": {
"begin_utc": begin,
"end_utc": end,
"hours": soak_hours,
"tidal_cycles": tidal_cycles,
},
"tide_schedule": tide_schedule,
"conditions": conditions,
"assessment": assessment,
"advisories": advisories,
}
@mcp.tool(tags={"smartpot"})
async def catch_tidal_context(
ctx: Context,
events: list[dict],
) -> list[dict]:
"""Batch-enrich catch events with tidal phase at each event's time+location.
Each event dict should contain:
- timestamp: ISO-8601 or "YYYY-MM-DD HH:MM" (UTC)
- latitude, longitude: GPS coordinates of the pot
Extra fields (catch_count, species, weight, etc.) are passed through.
Returns events in the same order, each enriched with a "tidal" key.
Limit: 100 events per call to avoid excessive API requests.
"""
if len(events) > 100:
raise ValueError(
f"Too many events ({len(events)}). Maximum is 100 per call."
)
noaa: NOAAClient = ctx.lifespan_context["noaa_client"]
# Group events by nearest station to batch hilo fetches
station_groups: dict[str, list[tuple[int, dict, float | None]]] = {}
for idx, event in enumerate(events):
lat = event.get("latitude")
lon = event.get("longitude")
if lat is None or lon is None:
continue
nearest = await noaa.find_nearest(float(lat), float(lon), limit=1)
if not nearest:
continue
station, dist = nearest[0]
key = station.id
if key not in station_groups:
station_groups[key] = []
station_groups[key].append((idx, event, round(dist, 2)))
# For each station group, fetch hilo once for the full time window
enriched = [None] * len(events)
for station_id, group in station_groups.items():
timestamps = []
for _, event, _ in group:
ts_str = event.get("timestamp", "")
try:
if "T" in ts_str:
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
dt = dt.replace(tzinfo=None)
else:
dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M")
timestamps.append(dt)
except (ValueError, TypeError):
timestamps.append(None)
valid_times = [t for t in timestamps if t is not None]
if not valid_times:
continue
earliest = min(valid_times) - timedelta(hours=6)
latest = max(valid_times) + timedelta(hours=6)
try:
hilo_data = await noaa.get_data(
station_id, "predictions",
begin_date=earliest.strftime("%Y%m%d %H:%M"),
end_date=latest.strftime("%Y%m%d %H:%M"),
interval="hilo", time_zone="gmt",
)
hilo_events = parse_hilo_predictions(hilo_data.get("predictions", []))
except Exception:
hilo_events = []
for (idx, event, dist), ts in zip(group, timestamps):
result = dict(event) # passthrough all original fields
if ts is not None and hilo_events:
phase_info = classify_tidal_phase(ts, hilo_events)
result["tidal"] = {
"station_id": station_id,
"station_distance_nm": dist,
**phase_info,
}
else:
result["tidal"] = {
"error": "Could not classify — missing timestamp or predictions"
}
enriched[idx] = result
# Fill any events that weren't enriched (missing lat/lon)
for idx, event in enumerate(events):
if enriched[idx] is None:
result = dict(event)
result["tidal"] = {"error": "Missing latitude or longitude"}
enriched[idx] = result
return enriched
@mcp.tool(tags={"smartpot"})
async def water_level_anomaly(
ctx: Context,
station_id: str,
window_hours: int = 6,
threshold_ft: float = 0.5,
) -> dict:
"""Compare observed vs predicted water levels to detect anomalies.
Storm surge, seiche events, or sensor drift push observed levels
away from predictions. This tool quantifies the deviation and
assigns a risk level.
Risk levels (based on threshold_ft, default 0.5 ft):
normal max deviation < threshold
elevated threshold deviation < 2×threshold
high deviation 2×threshold
Returns deviation metrics, risk level, and a plain-language summary.
"""
noaa: NOAAClient = ctx.lifespan_context["noaa_client"]
# Parallel fetch: observed levels + 6-minute predictions
obs_data, pred_data = await asyncio.gather(
noaa.get_data(
station_id, "water_level",
hours=window_hours, time_zone="gmt",
),
noaa.get_data(
station_id, "predictions",
hours=window_hours, interval="6", time_zone="gmt",
),
)
obs_records = obs_data.get("data", [])
pred_records = pred_data.get("predictions", [])
if not obs_records or not pred_records:
return {
"station_id": station_id,
"window_hours": window_hours,
"error": "Insufficient data — need both observations and predictions",
}
# Parse predictions into parallel lists for interpolation
pred_times = []
pred_values = []
for p in pred_records:
try:
pred_times.append(datetime.strptime(p["t"], "%Y-%m-%d %H:%M"))
pred_values.append(float(p["v"]))
except (ValueError, KeyError):
continue
# Compare each observation against interpolated prediction
deviations = []
for obs in obs_records:
try:
obs_dt = datetime.strptime(obs["t"], "%Y-%m-%d %H:%M")
obs_val = float(obs["v"])
except (ValueError, KeyError):
continue
predicted = interpolate_predictions(obs_dt, pred_times, pred_values)
if predicted is not None:
dev = obs_val - predicted
deviations.append({
"time": obs["t"],
"observed_ft": obs_val,
"predicted_ft": round(predicted, 3),
"deviation_ft": round(dev, 3),
})
if not deviations:
return {
"station_id": station_id,
"window_hours": window_hours,
"error": "Could not compute deviations — observation/prediction time mismatch",
}
abs_devs = [abs(d["deviation_ft"]) for d in deviations]
max_dev = max(abs_devs)
mean_dev = sum(abs_devs) / len(abs_devs)
# Direction: are observations mostly above or below predictions?
signed_devs = [d["deviation_ft"] for d in deviations]
mean_signed = sum(signed_devs) / len(signed_devs)
direction = "above" if mean_signed > 0 else "below"
# Risk classification
if max_dev >= threshold_ft * 2:
risk = "high"
explanation = (
f"Water levels are {max_dev:.2f} ft {direction} predictions — "
"possible storm surge, seiche, or significant weather influence"
)
elif max_dev >= threshold_ft:
risk = "elevated"
explanation = (
f"Water levels are {max_dev:.2f} ft {direction} predictions — "
"moderate deviation, monitor conditions"
)
else:
risk = "normal"
explanation = (
f"Water levels within {threshold_ft} ft of predictions — "
"conditions as expected"
)
return {
"station_id": station_id,
"window_hours": window_hours,
"threshold_ft": threshold_ft,
"risk_level": risk,
"explanation": explanation,
"max_deviation_ft": round(max_dev, 3),
"mean_deviation_ft": round(mean_dev, 3),
"direction": direction,
"sample_count": len(deviations),
}

View File

@ -10,7 +10,7 @@ from fastmcp.utilities.tests import run_server_async
from mcnoaa_tides import prompts, resources
from mcnoaa_tides.client import NOAAClient
from mcnoaa_tides.tools import charts, conditions, meteorological, stations, tides
from mcnoaa_tides.tools import charts, conditions, meteorological, smartpot, stations, tides
# Realistic station fixtures
MOCK_STATIONS_RAW = [
@ -79,6 +79,15 @@ MOCK_AIR_TEMP = {"data": [{"t": "2026-02-21 00:00", "v": "42.3", "f": "0,0,0"}]}
MOCK_WATER_TEMP = {"data": [{"t": "2026-02-21 00:00", "v": "38.7", "f": "0,0,0"}]}
MOCK_PRESSURE = {"data": [{"t": "2026-02-21 00:00", "v": "1013.2", "f": "0,0,0"}]}
# 6-minute interval predictions (no "type" field) that overlap with MOCK_WATER_LEVEL
MOCK_PREDICTIONS_6MIN = {
"predictions": [
{"t": "2026-02-21 00:00", "v": "2.50"},
{"t": "2026-02-21 00:06", "v": "2.55"},
{"t": "2026-02-21 00:12", "v": "2.60"},
]
}
MOCK_METADATA = {
"stations": [
{
@ -106,8 +115,12 @@ def _build_mock_client() -> NOAAClient:
client._http = AsyncMock()
async def mock_get_data(station_id, product, **kwargs):
if product == "predictions":
# Return 6-min interval data when interval="6", hilo otherwise
if kwargs.get("interval") == "6":
return MOCK_PREDICTIONS_6MIN
return MOCK_PREDICTIONS
responses = {
"predictions": MOCK_PREDICTIONS,
"water_level": MOCK_WATER_LEVEL,
"wind": MOCK_WIND,
"air_temperature": MOCK_AIR_TEMP,
@ -140,6 +153,7 @@ def _build_test_server() -> FastMCP:
tides.register(mcp)
meteorological.register(mcp)
conditions.register(mcp)
smartpot.register(mcp)
charts.register(mcp)
resources.register(mcp)
prompts.register(mcp)

View File

@ -170,6 +170,6 @@ async def test_visualization_tools_registered(mcp_client: Client):
async def test_total_tool_count(mcp_client: Client):
"""Verify total tool count after adding visualization tools (7 + 2 = 9)."""
"""Verify total tool count (9 base + 4 SmartPot = 13)."""
tools = await mcp_client.list_tools()
assert len(tools) == 9
assert len(tools) == 13

253
tests/test_smartpot.py Normal file
View File

@ -0,0 +1,253 @@
"""Integration tests for SmartPot tidal intelligence tools and prompts."""
import json
from datetime import datetime, timezone
from unittest.mock import patch
import pytest
from fastmcp import Client
from fastmcp.exceptions import ToolError
# Fixed "now" in the middle of the ebb phase:
# Between H at 04:30 and L at 10:42 on 2026-02-21
MOCK_NOW = datetime(2026, 2, 21, 7, 30, 0, tzinfo=timezone.utc)
@pytest.fixture(autouse=True)
def freeze_time():
"""Patch datetime.now() in the smartpot module to return a fixed time."""
with patch("mcnoaa_tides.tools.smartpot.datetime") as mock_dt:
mock_dt.now.return_value = MOCK_NOW
mock_dt.strptime = datetime.strptime
mock_dt.fromisoformat = datetime.fromisoformat
mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
yield mock_dt
# --- Tool registration ---
async def test_smartpot_tools_registered(mcp_client: Client):
"""All 4 SmartPot tools should appear in the tool list."""
tools = await mcp_client.list_tools()
names = {t.name for t in tools}
assert "tidal_phase" in names
assert "deployment_briefing" in names
assert "catch_tidal_context" in names
assert "water_level_anomaly" in names
async def test_total_tool_count(mcp_client: Client):
"""Should have 13 tools total (9 original + 4 SmartPot)."""
tools = await mcp_client.list_tools()
assert len(tools) == 13
# --- tidal_phase ---
async def test_tidal_phase_by_station(mcp_client: Client):
result = await mcp_client.call_tool(
"tidal_phase", {"station_id": "8454000"}
)
data = json.loads(result.content[0].text)
assert data["station"]["id"] == "8454000"
assert data["phase"] in ("ebb", "flood", "slack_high", "slack_low")
assert data["previous"] is not None
assert data["next"] is not None
assert "timestamp_utc" in data
async def test_tidal_phase_by_gps(mcp_client: Client):
result = await mcp_client.call_tool(
"tidal_phase", {"latitude": 41.8, "longitude": -71.4}
)
data = json.loads(result.content[0].text)
# Should resolve to Providence (nearest to 41.8, -71.4)
assert data["station"]["id"] == "8454000"
assert "station_distance_nm" in data
assert data["phase"] in ("ebb", "flood", "slack_high", "slack_low")
async def test_tidal_phase_missing_params(mcp_client: Client):
"""Should error when neither station_id nor lat/lon provided."""
with pytest.raises(ToolError):
await mcp_client.call_tool("tidal_phase", {})
# --- deployment_briefing ---
async def test_deployment_briefing(mcp_client: Client):
result = await mcp_client.call_tool(
"deployment_briefing",
{"latitude": 41.8, "longitude": -71.4, "soak_hours": 48},
)
data = json.loads(result.content[0].text)
assert data["station"]["id"] == "8454000"
assert data["station_distance_nm"] is not None
assert data["assessment"] in ("GO", "CAUTION", "NO-GO")
assert "soak_window" in data
assert data["soak_window"]["hours"] == 48
assert "tide_schedule" in data
assert isinstance(data["advisories"], list)
async def test_deployment_briefing_cold_water_advisory(mcp_client: Client):
"""Mock water temp is 38.7°F — should trigger cold-water advisory."""
result = await mcp_client.call_tool(
"deployment_briefing",
{"latitude": 41.8, "longitude": -71.4},
)
data = json.loads(result.content[0].text)
cold_advisories = [a for a in data["advisories"] if "cold-water" in a.lower()]
assert len(cold_advisories) > 0, "Expected cold-water advisory for 38.7°F"
async def test_deployment_briefing_conditions(mcp_client: Client):
"""Should include conditions from meteorological data."""
result = await mcp_client.call_tool(
"deployment_briefing",
{"latitude": 41.8, "longitude": -71.4},
)
data = json.loads(result.content[0].text)
conditions = data["conditions"]
# Wind data available in mock
if "wind" in conditions:
assert "speed_kn" in conditions["wind"]
# Water temp available in mock
if "water_temperature_f" in conditions:
assert conditions["water_temperature_f"] == 38.7
# --- catch_tidal_context ---
async def test_catch_tidal_context_enrichment(mcp_client: Client):
events = [
{
"timestamp": "2026-02-21 07:30",
"latitude": 41.8,
"longitude": -71.4,
"catch_count": 12,
"species": "blue_crab",
},
{
"timestamp": "2026-02-21 13:00",
"latitude": 41.8,
"longitude": -71.4,
"catch_count": 5,
},
]
result = await mcp_client.call_tool(
"catch_tidal_context", {"events": events}
)
data = json.loads(result.content[0].text)
assert len(data) == 2
# First event should be enriched with tidal info
assert "tidal" in data[0]
assert "phase" in data[0]["tidal"]
# Passthrough fields preserved
assert data[0]["catch_count"] == 12
assert data[0]["species"] == "blue_crab"
assert data[1]["catch_count"] == 5
async def test_catch_tidal_context_iso_timestamp(mcp_client: Client):
"""ISO-8601 timestamps with 'T' separator should also work."""
events = [
{
"timestamp": "2026-02-21T07:30:00Z",
"latitude": 41.8,
"longitude": -71.4,
},
]
result = await mcp_client.call_tool(
"catch_tidal_context", {"events": events}
)
data = json.loads(result.content[0].text)
assert "tidal" in data[0]
assert "phase" in data[0]["tidal"]
async def test_catch_tidal_context_missing_location(mcp_client: Client):
"""Events without lat/lon should get an error tidal entry."""
events = [{"timestamp": "2026-02-21 07:30"}]
result = await mcp_client.call_tool(
"catch_tidal_context", {"events": events}
)
data = json.loads(result.content[0].text)
assert "error" in data[0]["tidal"]
async def test_catch_tidal_context_limit(mcp_client: Client):
"""Should reject >100 events."""
events = [
{"timestamp": "2026-02-21 07:30", "latitude": 41.8, "longitude": -71.4}
for _ in range(101)
]
with pytest.raises(ToolError):
await mcp_client.call_tool("catch_tidal_context", {"events": events})
# --- water_level_anomaly ---
async def test_water_level_anomaly(mcp_client: Client):
result = await mcp_client.call_tool(
"water_level_anomaly", {"station_id": "8454000"}
)
data = json.loads(result.content[0].text)
assert data["station_id"] == "8454000"
assert data["risk_level"] in ("normal", "elevated", "high")
assert "max_deviation_ft" in data
assert "mean_deviation_ft" in data
assert "direction" in data
assert "explanation" in data
async def test_water_level_anomaly_custom_threshold(mcp_client: Client):
result = await mcp_client.call_tool(
"water_level_anomaly",
{"station_id": "8454000", "threshold_ft": 0.1},
)
data = json.loads(result.content[0].text)
assert data["threshold_ft"] == 0.1
# --- Prompt registration ---
async def test_smartpot_prompts_registered(mcp_client: Client):
"""Both SmartPot prompts should appear in the prompt list."""
prompts = await mcp_client.list_prompts()
names = {p.name for p in prompts}
assert "smartpot_deployment" in names
assert "crab_pot_analysis" in names
async def test_total_prompt_count(mcp_client: Client):
"""Should have 4 prompts total (2 original + 2 SmartPot)."""
prompts = await mcp_client.list_prompts()
assert len(prompts) == 4
async def test_smartpot_deployment_prompt(mcp_client: Client):
result = await mcp_client.get_prompt(
"smartpot_deployment",
{"latitude": "41.8", "longitude": "-71.4", "soak_hours": "48"},
)
assert len(result.messages) >= 1
text = str(result.messages[0].content)
assert "41.8" in text
assert "-71.4" in text
assert "deployment_briefing" in text
async def test_crab_pot_analysis_prompt(mcp_client: Client):
result = await mcp_client.get_prompt("crab_pot_analysis", {})
assert len(result.messages) >= 1
text = str(result.messages[0].content)
assert "catch_tidal_context" in text

204
tests/test_tidal.py Normal file
View File

@ -0,0 +1,204 @@
"""Unit tests for pure tidal phase classification — no MCP, no I/O."""
from datetime import datetime
from mcnoaa_tides.tidal import (
classify_tidal_phase,
interpolate_predictions,
parse_hilo_predictions,
)
# Synthetic hilo data: a full day with 2 highs and 2 lows
RAW_HILO = [
{"t": "2026-02-21 04:30", "v": "4.521", "type": "H"},
{"t": "2026-02-21 10:42", "v": "-0.123", "type": "L"},
{"t": "2026-02-21 16:55", "v": "5.012", "type": "H"},
{"t": "2026-02-21 23:08", "v": "0.234", "type": "L"},
]
# --- parse_hilo_predictions ---
def test_parse_hilo_basic():
events = parse_hilo_predictions(RAW_HILO)
assert len(events) == 4
assert events[0]["type"] == "H"
assert events[0]["v"] == 4.521
assert isinstance(events[0]["dt"], datetime)
def test_parse_hilo_filters_non_hilo():
"""Records without a 'type' field (6-min interval data) should be dropped."""
mixed = [
{"t": "2026-02-21 04:00", "v": "3.200"}, # no type — filtered out
{"t": "2026-02-21 04:30", "v": "4.521", "type": "H"},
]
events = parse_hilo_predictions(mixed)
assert len(events) == 1
assert events[0]["type"] == "H"
def test_parse_hilo_sorts_by_time():
reversed_data = list(reversed(RAW_HILO))
events = parse_hilo_predictions(reversed_data)
times = [e["dt"] for e in events]
assert times == sorted(times)
def test_parse_hilo_empty():
assert parse_hilo_predictions([]) == []
# --- classify_tidal_phase ---
def _events():
return parse_hilo_predictions(RAW_HILO)
def test_classify_ebb():
"""Between high (04:30) and low (10:42) → ebb."""
now = datetime(2026, 2, 21, 7, 30)
result = classify_tidal_phase(now, _events())
assert result["phase"] == "ebb"
assert result["previous"]["type"] == "high"
assert result["next"]["type"] == "low"
assert result["progress_pct"] is not None
assert 0 < result["progress_pct"] < 100
def test_classify_flood():
"""Between low (10:42) and high (16:55) → flood."""
now = datetime(2026, 2, 21, 13, 0)
result = classify_tidal_phase(now, _events())
assert result["phase"] == "flood"
assert result["previous"]["type"] == "low"
assert result["next"]["type"] == "high"
def test_classify_slack_high_after():
"""Within 30 min after high (04:30) → slack_high."""
now = datetime(2026, 2, 21, 4, 45) # 15 min after H
result = classify_tidal_phase(now, _events())
assert result["phase"] == "slack_high"
def test_classify_slack_high_before():
"""Within 30 min before high (16:55) → slack_high."""
now = datetime(2026, 2, 21, 16, 30) # 25 min before H
result = classify_tidal_phase(now, _events())
assert result["phase"] == "slack_high"
def test_classify_slack_low_after():
"""Within 30 min after low (10:42) → slack_low."""
now = datetime(2026, 2, 21, 11, 0) # 18 min after L
result = classify_tidal_phase(now, _events())
assert result["phase"] == "slack_low"
def test_classify_slack_low_before():
"""Within 30 min before low (23:08) → slack_low."""
now = datetime(2026, 2, 21, 22, 50) # 18 min before L
result = classify_tidal_phase(now, _events())
assert result["phase"] == "slack_low"
def test_classify_progress_midpoint():
"""At the midpoint between two events, progress should be ~50%."""
events = _events()
# Midpoint between 04:30 H and 10:42 L = ~07:36
mid = datetime(2026, 2, 21, 7, 36)
result = classify_tidal_phase(mid, events)
assert result["progress_pct"] is not None
assert 45 < result["progress_pct"] < 55
def test_classify_minutes_timing():
"""Check that minutes_since and minutes_to are reasonable."""
now = datetime(2026, 2, 21, 7, 30) # 3h after H at 04:30
result = classify_tidal_phase(now, _events())
assert result["minutes_since_previous"] == 180 # 3 hours
assert result["minutes_to_next"] is not None
assert result["minutes_to_next"] > 0
def test_classify_before_all_events():
"""Timestamp before all hilo events — no previous event."""
now = datetime(2026, 2, 21, 1, 0) # Before 04:30 H
result = classify_tidal_phase(now, _events())
assert result["previous"] is None
assert result["next"] is not None
assert result["phase"] in ("flood", "ebb")
def test_classify_after_all_events():
"""Timestamp after all hilo events — no next event."""
now = datetime(2026, 2, 22, 3, 0) # After 23:08 L
result = classify_tidal_phase(now, _events())
assert result["previous"] is not None
assert result["next"] is None
assert result["phase"] in ("flood", "ebb")
def test_classify_empty_events():
result = classify_tidal_phase(datetime(2026, 2, 21, 12, 0), [])
assert result["phase"] == "unknown"
# --- interpolate_predictions ---
def test_interpolate_midpoint():
"""Interpolation at midpoint should return average of bracketing values."""
times = [datetime(2026, 2, 21, 0, 0), datetime(2026, 2, 21, 1, 0)]
values = [2.0, 4.0]
result = interpolate_predictions(datetime(2026, 2, 21, 0, 30), times, values)
assert result is not None
assert abs(result - 3.0) < 0.01
def test_interpolate_at_boundary():
"""Interpolation exactly at a prediction point should return that value."""
times = [datetime(2026, 2, 21, 0, 0), datetime(2026, 2, 21, 1, 0)]
values = [2.0, 4.0]
result = interpolate_predictions(times[0], times, values)
assert result is not None
assert abs(result - 2.0) < 0.01
def test_interpolate_outside_window():
"""Timestamp outside prediction window should return None."""
times = [datetime(2026, 2, 21, 0, 0), datetime(2026, 2, 21, 1, 0)]
values = [2.0, 4.0]
assert interpolate_predictions(datetime(2026, 2, 20, 23, 0), times, values) is None
assert interpolate_predictions(datetime(2026, 2, 21, 2, 0), times, values) is None
def test_interpolate_multiple_segments():
"""Interpolation across multiple 6-min segments."""
times = [
datetime(2026, 2, 21, 0, 0),
datetime(2026, 2, 21, 0, 6),
datetime(2026, 2, 21, 0, 12),
]
values = [2.0, 3.0, 5.0]
# At 0:03 (midpoint of first segment) → 2.5
result = interpolate_predictions(datetime(2026, 2, 21, 0, 3), times, values)
assert result is not None
assert abs(result - 2.5) < 0.01
# At 0:09 (midpoint of second segment) → 4.0
result = interpolate_predictions(datetime(2026, 2, 21, 0, 9), times, values)
assert result is not None
assert abs(result - 4.0) < 0.01
def test_interpolate_insufficient_data():
"""Less than 2 prediction points → None."""
assert interpolate_predictions(datetime(2026, 2, 21, 0, 0), [], []) is None
assert interpolate_predictions(
datetime(2026, 2, 21, 0, 0),
[datetime(2026, 2, 21, 0, 0)],
[2.0],
) is None

View File

@ -6,7 +6,7 @@ from fastmcp import Client
async def test_tool_registration(mcp_client: Client):
"""All 9 tools should be registered."""
"""All 13 tools should be registered (9 original + 4 SmartPot)."""
tools = await mcp_client.list_tools()
tool_names = {t.name for t in tools}
expected = {
@ -19,6 +19,10 @@ async def test_tool_registration(mcp_client: Client):
"marine_conditions_snapshot",
"visualize_tides",
"visualize_conditions",
"tidal_phase",
"deployment_briefing",
"catch_tidal_context",
"water_level_anomaly",
}
assert expected == tool_names