"""Async NOAA CO-OPS API client with station caching and proximity search.""" import math import re import sys import time import httpx from noaa_tides.models import Station DATA_URL = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter" META_URL = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi" CACHE_TTL = 86400 # 24 hours MAX_RANGE_HOURS = 720 # NOAA API cap ~30 days _STATION_ID_RE = re.compile(r"^\d{7}$") def _validate_station_id(station_id: str) -> str: """NOAA station IDs are 7-digit numbers (e.g. '8454000').""" if not _STATION_ID_RE.match(station_id): raise ValueError( f"Invalid station ID '{station_id}': expected a 7-digit number (e.g. '8454000')" ) return station_id def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Distance in nautical miles between two coordinates.""" R = 3440.065 # Earth radius in nautical miles lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 return 2 * R * math.asin(math.sqrt(a)) class NOAAClient: """Async client wrapping NOAA CO-OPS data and metadata APIs. Caches the station catalog in memory (~301 entries) and refreshes every 24 hours. The httpx.AsyncClient is created once and reused for connection pooling. """ def __init__(self) -> None: self._http: httpx.AsyncClient | None = None self._stations: list[Station] = [] self._cache_time: float = 0 async def initialize(self) -> None: self._http = httpx.AsyncClient(timeout=30) await self._refresh_stations() async def close(self) -> None: if self._http: await self._http.aclose() # -- Station cache -- async def _refresh_stations(self) -> None: resp = await self._http.get(f"{META_URL}/stations.json") resp.raise_for_status() data = resp.json() self._stations = [Station(**s) for s in data.get("stations", [])] self._cache_time = time.monotonic() async def get_stations(self) -> list[Station]: if time.monotonic() - self._cache_time > CACHE_TTL: try: await self._refresh_stations() except Exception: # Serve stale data rather than failing the request. # If cache was never populated, re-raise. if not self._stations: raise print( "Warning: station cache refresh failed, serving stale data", file=sys.stderr, ) return list(self._stations) # -- Metadata API -- async def get_station_metadata(self, station_id: str) -> dict: _validate_station_id(station_id) try: resp = await self._http.get( f"{META_URL}/stations/{station_id}.json", params={"expand": "details,sensors,datums,products,disclaimers"}, ) resp.raise_for_status() except httpx.HTTPStatusError as exc: if exc.response.status_code == 404: raise ValueError( f"Station '{station_id}' not found. " "Verify the ID using search_stations." ) from exc raise RuntimeError( f"NOAA metadata API error ({exc.response.status_code}). " "The service may be temporarily unavailable." ) from exc data = resp.json() # The metadata API wraps the station in a "stations" list stations = data.get("stations", []) if stations: return stations[0] return data # -- Data API -- async def get_data( self, station_id: str, product: str, begin_date: str = "", end_date: str = "", hours: int = 0, datum: str = "MLLW", interval: str = "", units: str = "english", time_zone: str = "lst_ldt", ) -> dict: """Fetch data from the NOAA CO-OPS data API. Date format: yyyyMMdd or yyyyMMdd HH:mm If no date range or hours specified, defaults to last 24 hours. """ _validate_station_id(station_id) if hours and (hours < 0 or hours > MAX_RANGE_HOURS): raise ValueError(f"hours must be between 1 and {MAX_RANGE_HOURS}, got {hours}") params: dict[str, str] = { "station": station_id, "product": product, "datum": datum, "units": units, "time_zone": time_zone, "format": "json", "application": "noaa-tides-mcp", } if begin_date: params["begin_date"] = begin_date if end_date: params["end_date"] = end_date if hours: params["range"] = str(hours) if interval: params["interval"] = interval # Default to last 24h if no date range specified if not begin_date and not end_date and not hours: params["range"] = "24" try: resp = await self._http.get(DATA_URL, params=params) resp.raise_for_status() except httpx.HTTPStatusError as exc: if exc.response.status_code == 404: raise ValueError( f"No data for station '{station_id}' product '{product}'. " "Use get_station_info to check available products." ) from exc raise RuntimeError( f"NOAA data API error ({exc.response.status_code}). " "The service may be temporarily unavailable." ) from exc result = resp.json() if "error" in result: raise ValueError(result["error"].get("message", "Unknown NOAA API error")) return result # -- In-memory search -- async def search( self, query: str = "", state: str = "", is_tidal: bool | None = None, ) -> list[Station]: """Filter cached stations. Triggers cache refresh if TTL expired.""" stations = await self.get_stations() matches = stations if query: q = query.lower() matches = [s for s in matches if q in s.name.lower() or q in s.id] if state: st = state.upper() matches = [s for s in matches if s.state and s.state.upper() == st] if is_tidal is not None: matches = [s for s in matches if s.tidal == is_tidal] return matches async def find_nearest( self, lat: float, lon: float, limit: int = 5, max_distance: float = 100, ) -> list[tuple[Station, float]]: """Return stations within max_distance nautical miles, sorted by proximity.""" stations = await self.get_stations() results: list[tuple[Station, float]] = [] for station in stations: dist = haversine(lat, lon, station.lat, station.lng) if dist <= max_distance: results.append((station, dist)) results.sort(key=lambda x: x[1]) return results[:limit]