Implement mcgibs FastMCP server for NASA GIBS

Complete implementation of all modules:
- constants.py: GIBS API endpoints, projections, TileMatrixSet defs
- models.py: Pydantic models for layers, colormaps, geocoding
- geo.py: Nominatim geocoding with rate limiting and caching
- capabilities.py: WMTS GetCapabilities XML parser with search
- colormaps.py: Colormap v1.3 parser with natural-language summaries
- client.py: Async GIBS HTTP client wrapping all API interactions
- server.py: FastMCP 3.0 tools, resources, and prompts

11 MCP tools, 3 resources, 2 prompts. 47 tests, all passing.
This commit is contained in:
Ryan Malloy 2026-02-18 14:55:41 -07:00
parent 648c145e14
commit f7fad32a9e
18 changed files with 4522 additions and 0 deletions

329
src/mcgibs/capabilities.py Normal file
View File

@ -0,0 +1,329 @@
"""WMTS GetCapabilities XML parser for NASA GIBS layers."""
import logging
import xml.etree.ElementTree as ET
from mcgibs.constants import WMTS_NS
from mcgibs.models import BBox, LayerInfo, TimeDimension
log = logging.getLogger(__name__)
# Precompute fully-qualified namespace prefixes for element lookups.
# ElementTree findall/find with namespace dicts works for "prefix:Local" syntax,
# but child elements that inherit a default namespace need the {uri}Local form.
_WMTS_URI = WMTS_NS["wmts"]
_OWS_URI = WMTS_NS["ows"]
_XLINK_URI = WMTS_NS["xlink"]
def _text(el: ET.Element | None) -> str:
"""Safely extract text content, returning empty string for None."""
if el is None:
return ""
return (el.text or "").strip()
def _find(parent: ET.Element, local_name: str, ns_uri: str | None = None) -> ET.Element | None:
"""Find child element, trying namespaced then unnamespaced.
When the WMTS document declares a default namespace, child elements like
<Default>, <Value>, <Format>, <TileMatrixSet>, etc. inherit it even though
they appear unqualified in the raw XML. This helper tries the namespaced
form first (which matches real GIBS documents), then falls back to plain
local name (for documents without a default namespace).
"""
if ns_uri:
el = parent.find(f"{{{ns_uri}}}{local_name}")
if el is not None:
return el
return parent.find(local_name)
def _findall(parent: ET.Element, local_name: str, ns_uri: str | None = None) -> list[ET.Element]:
"""Find all child elements, combining namespaced and unnamespaced results."""
results: list[ET.Element] = []
seen_ids: set[int] = set()
if ns_uri:
for el in parent.findall(f"{{{ns_uri}}}{local_name}"):
seen_ids.add(id(el))
results.append(el)
for el in parent.findall(local_name):
if id(el) not in seen_ids:
results.append(el)
return results
def _parse_time_dimension(dim_el: ET.Element) -> TimeDimension | None:
"""Parse a WMTS Dimension element with ows:Identifier == 'Time'.
Value strings follow the ISO 8601 interval pattern:
"start/end/period" e.g. "1980-01-01/2023-11-01/P1M"
Single-date values or multi-range values are stored as-is.
"""
ident = _text(_find(dim_el, "Identifier", _OWS_URI))
if ident.lower() != "time":
return None
# <Default> and <Value> inherit the wmts default namespace
default = _text(_find(dim_el, "Default", _WMTS_URI))
value = _text(_find(dim_el, "Value", _WMTS_URI))
start = None
end = None
period = None
if value:
# Value may contain comma-separated ranges; take the last (broadest) one
last_range = value.split(",")[-1].strip()
parts = last_range.split("/")
if len(parts) == 3:
start, end, period = parts
elif len(parts) == 2:
start, end = parts
return TimeDimension(
default=default or None,
value=value or None,
start=start or None,
end=end or None,
period=period or None,
)
def _parse_bbox(layer_el: ET.Element) -> BBox | None:
"""Extract WGS84BoundingBox from a Layer element."""
bb = _find(layer_el, "WGS84BoundingBox", _OWS_URI)
if bb is None:
return None
lower = _text(_find(bb, "LowerCorner", _OWS_URI))
upper = _text(_find(bb, "UpperCorner", _OWS_URI))
if not lower or not upper:
return None
try:
west, south = (float(v) for v in lower.split())
east, north = (float(v) for v in upper.split())
except (ValueError, TypeError):
log.warning("Malformed bounding box: lower=%r upper=%r", lower, upper)
return None
return BBox(west=west, south=south, east=east, north=north)
def _detect_colormap(layer_el: ET.Element, identifier: str) -> tuple[bool, str | None]:
"""Detect whether a layer has a colormap.
GIBS layers with colormaps typically have:
- A LegendURL element inside a Style
- Metadata with xlink:role containing "colormap"
Returns (has_colormap, colormap_id).
"""
# Check for LegendURL in any Style
for style in _findall(layer_el, "Style", _WMTS_URI):
if _find(style, "LegendURL", _WMTS_URI) is not None:
return True, identifier
# Check metadata for colormap hints
for meta in _findall(layer_el, "Metadata", _OWS_URI):
role = meta.get(f"{{{_XLINK_URI}}}role", "")
href = meta.get(f"{{{_XLINK_URI}}}href", "")
if "colormap" in role.lower() or "colormap" in href.lower():
return True, identifier
return False, None
def _parse_layer(layer_el: ET.Element) -> LayerInfo | None:
"""Parse a single WMTS Layer element into a LayerInfo."""
identifier = _text(_find(layer_el, "Identifier", _OWS_URI))
if not identifier:
return None
title = _text(_find(layer_el, "Title", _OWS_URI))
abstract = _text(_find(layer_el, "Abstract", _OWS_URI))
bbox = _parse_bbox(layer_el)
# Formats
formats = []
for fmt_el in _findall(layer_el, "Format", _WMTS_URI):
fmt = _text(fmt_el)
if fmt and fmt not in formats:
formats.append(fmt)
# TileMatrixSetLink -> TileMatrixSet
tile_matrix_sets = []
for link_el in _findall(layer_el, "TileMatrixSetLink", _WMTS_URI):
tms = _text(_find(link_el, "TileMatrixSet", _WMTS_URI))
if tms and tms not in tile_matrix_sets:
tile_matrix_sets.append(tms)
# Time dimension
time_dim = None
for dim_el in _findall(layer_el, "Dimension", _WMTS_URI):
time_dim = _parse_time_dimension(dim_el)
if time_dim is not None:
break
# ResourceURL template
resource_url_template = None
for res_el in _findall(layer_el, "ResourceURL", _WMTS_URI):
tpl = res_el.get("template", "")
if tpl:
resource_url_template = tpl
break
# Legend URL from Style
legend_url = None
for style in _findall(layer_el, "Style", _WMTS_URI):
leg = _find(style, "LegendURL", _WMTS_URI)
if leg is not None:
legend_url = leg.get(f"{{{_XLINK_URI}}}href", "")
break
has_colormap, colormap_id = _detect_colormap(layer_el, identifier)
return LayerInfo(
identifier=identifier,
title=title,
abstract=abstract,
formats=formats,
tile_matrix_sets=tile_matrix_sets,
time=time_dim,
bbox=bbox,
has_colormap=has_colormap,
colormap_id=colormap_id,
legend_url=legend_url or None,
resource_url_template=resource_url_template,
)
def parse_capabilities(xml_text: str) -> dict[str, LayerInfo]:
"""Parse WMTS GetCapabilities XML into a dict of layer_id -> LayerInfo.
Handles the ~5MB GetCapabilities document from NASA GIBS by iterating
through all Layer elements found under the wmts:Contents element.
"""
root = ET.fromstring(xml_text)
layers: dict[str, LayerInfo] = {}
contents = _find(root, "Contents", _WMTS_URI)
if contents is None:
log.warning("No Contents element found in capabilities document")
return layers
for layer_el in _findall(contents, "Layer", _WMTS_URI):
info = _parse_layer(layer_el)
if info is not None:
layers[info.identifier] = info
log.info("Parsed %d layers from WMTS capabilities", len(layers))
return layers
def _score_match(layer: LayerInfo, query_lower: str) -> int:
"""Score a layer's relevance to a search query.
Higher score = better match. Returns 0 if no match at all.
Scoring tiers:
- Title exact word match: 100
- Identifier exact word match: 80
- Title substring match: 60
- Identifier substring match: 40
- Other field matches: 20
"""
score = 0
title_lower = layer.title.lower()
ident_lower = layer.identifier.lower()
terms = query_lower.split()
for term in terms:
# Title word match (highest value -- most user-visible)
title_words = title_lower.replace("-", " ").replace("_", " ").split()
if term in title_words:
score += 100
elif term in title_lower:
score += 60
# Identifier word match
ident_words = ident_lower.replace("-", " ").replace("_", " ").split()
if term in ident_words:
score += 80
elif term in ident_lower:
score += 40
# Enriched metadata matches
for field_val in (layer.measurement, layer.instrument, layer.platform, layer.description):
if field_val and term in field_val.lower():
score += 20
return score
def search_layers(
index: dict[str, LayerInfo],
query: str,
measurement: str | None = None,
period: str | None = None,
ongoing: bool | None = None,
limit: int = 20,
) -> list[LayerInfo]:
"""Search the layer index with keyword matching and optional filters.
Parameters
----------
index:
Dict of layer_id -> LayerInfo as returned by parse_capabilities().
query:
Free-text search string. Matched case-insensitively against title,
identifier, measurement, instrument, and platform fields.
measurement:
Filter to layers whose measurement field matches (case-insensitive substring).
period:
Filter to layers whose period field matches, e.g. "Daily", "Monthly".
ongoing:
If True, only layers marked as ongoing. If False, only ended layers.
None skips this filter.
limit:
Maximum number of results to return.
"""
query_lower = query.strip().lower()
candidates: list[tuple[int, LayerInfo]] = []
for layer in index.values():
# Apply hard filters first (skip scoring if filtered out)
if measurement is not None:
layer_meas = (layer.measurement or "").lower()
if measurement.lower() not in layer_meas:
continue
if period is not None:
layer_period = (layer.period or "").lower()
if period.lower() != layer_period:
continue
if ongoing is not None and (layer.ongoing is None or layer.ongoing != ongoing):
continue
# Score against the query
if query_lower:
score = _score_match(layer, query_lower)
if score <= 0:
continue
else:
# No query text -- all non-filtered layers match equally
score = 1
candidates.append((score, layer))
# Sort by score descending, then alphabetically by title for ties
candidates.sort(key=lambda pair: (-pair[0], pair[1].title.lower()))
return [layer for _, layer in candidates[:limit]]

342
src/mcgibs/client.py Normal file
View File

@ -0,0 +1,342 @@
"""Async HTTP client for all NASA GIBS API interactions."""
import logging
from io import BytesIO
import httpx
from PIL import Image
from mcgibs.capabilities import parse_capabilities
from mcgibs.colormaps import explain_colormap, parse_colormap
from mcgibs.constants import (
COLORMAP_BASE,
DEFAULT_EPSG,
LAYER_METADATA_BASE,
WMS_BASE,
WMS_DEFAULTS,
WMTS_CAPABILITIES_URL,
WMTS_DESCRIBE_DOMAINS_URL,
WMTS_TILE_URL,
)
from mcgibs.geo import geocode
from mcgibs.models import BBox, ColorMapSet, GeocodingResult, LayerInfo
log = logging.getLogger(__name__)
class GIBSClient:
"""Async client for NASA GIBS APIs.
Wraps WMTS (discovery), WMS (imagery), layer metadata, and colormaps
behind a single interface. Designed to be initialized once per session
via FastMCP lifespan and reused across tool calls.
"""
def __init__(self) -> None:
self._http: httpx.AsyncClient | None = None
self.layer_index: dict[str, LayerInfo] = {}
self._metadata_cache: dict[str, dict] = {}
self._colormap_cache: dict[str, ColorMapSet] = {}
self._geocode_cache: dict[str, GeocodingResult | None] = {}
async def initialize(self) -> None:
"""Create HTTP client and load WMTS capabilities."""
self._http = httpx.AsyncClient(timeout=60.0, follow_redirects=True)
await self._load_capabilities()
async def close(self) -> None:
"""Shut down the HTTP client."""
if self._http:
await self._http.aclose()
self._http = None
@property
def http(self) -> httpx.AsyncClient:
if self._http is None:
raise RuntimeError("GIBSClient not initialized — call initialize() first")
return self._http
# --- Capabilities ---
async def _load_capabilities(self) -> None:
"""Fetch and parse WMTS GetCapabilities for EPSG:4326."""
url = WMTS_CAPABILITIES_URL.format(epsg="4326")
log.info("Fetching WMTS capabilities from %s", url)
resp = await self.http.get(url)
resp.raise_for_status()
self.layer_index = parse_capabilities(resp.text)
log.info("Loaded %d layers into index", len(self.layer_index))
def get_layer(self, layer_id: str) -> LayerInfo | None:
"""Look up a layer by identifier."""
return self.layer_index.get(layer_id)
# --- Layer Metadata (enrichment) ---
async def fetch_layer_metadata(self, layer_id: str) -> dict:
"""Fetch the enriched JSON metadata for a layer.
This supplements the GetCapabilities data with measurement,
instrument, platform, and other fields not in the WMTS XML.
"""
if layer_id in self._metadata_cache:
return self._metadata_cache[layer_id]
url = f"{LAYER_METADATA_BASE}/{layer_id}.json"
try:
resp = await self.http.get(url)
resp.raise_for_status()
data = resp.json()
except (httpx.HTTPError, ValueError) as exc:
log.debug("Layer metadata not available for %s: %s", layer_id, exc)
data = {}
self._metadata_cache[layer_id] = data
# Enrich the layer_index entry if it exists
layer = self.layer_index.get(layer_id)
if layer and data:
layer.measurement = data.get("measurement")
layer.instrument = data.get("instrument")
layer.platform = data.get("platform")
layer.period = data.get("period")
layer.ongoing = data.get("ongoing")
layer.day_night = data.get("daynight")
layer.description = data.get("description")
return data
# --- Colormaps ---
async def fetch_colormap(self, layer_id: str) -> ColorMapSet | None:
"""Fetch and parse the colormap XML for a layer."""
if layer_id in self._colormap_cache:
return self._colormap_cache[layer_id]
# Derive colormap URL — GIBS uses the layer identifier as filename
layer = self.layer_index.get(layer_id)
colormap_id = (layer.colormap_id if layer else None) or layer_id
url = f"{COLORMAP_BASE}/{colormap_id}.xml"
try:
resp = await self.http.get(url)
resp.raise_for_status()
colormap_set = parse_colormap(resp.text)
except (httpx.HTTPError, ValueError) as exc:
log.debug("Colormap not available for %s: %s", layer_id, exc)
return None
self._colormap_cache[layer_id] = colormap_set
return colormap_set
async def explain_layer_colormap(self, layer_id: str) -> str:
"""Fetch colormap and generate natural-language explanation."""
colormap_set = await self.fetch_colormap(layer_id)
if colormap_set is None:
return f"No colormap available for layer '{layer_id}'."
return explain_colormap(colormap_set)
# --- Geocoding ---
async def resolve_place(self, place: str) -> GeocodingResult | None:
"""Geocode a place name via Nominatim."""
return await geocode(self.http, place, self._geocode_cache)
# --- WMS Imagery ---
async def get_wms_image(
self,
layer_id: str,
date: str,
bbox: BBox,
width: int = 1024,
height: int = 1024,
image_format: str = "image/jpeg",
epsg: str = DEFAULT_EPSG,
) -> bytes:
"""Fetch a WMS GetMap image for the given layer, date, and bbox.
Returns raw image bytes (JPEG or PNG).
"""
params = dict(WMS_DEFAULTS)
params.update({
"LAYERS": layer_id,
"SRS": f"EPSG:{epsg}",
"BBOX": bbox.wms_bbox,
"WIDTH": str(width),
"HEIGHT": str(height),
"FORMAT": image_format,
"TIME": date,
})
url = WMS_BASE.format(epsg=epsg)
resp = await self.http.get(url, params=params)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "xml" in content_type or "text" in content_type:
raise RuntimeError(f"WMS returned error: {resp.text[:500]}")
return resp.content
async def get_wms_composite(
self,
layer_ids: list[str],
date: str,
bbox: BBox,
width: int = 1024,
height: int = 1024,
image_format: str = "image/jpeg",
epsg: str = DEFAULT_EPSG,
) -> bytes:
"""Fetch a multi-layer WMS composite image.
WMS supports comma-separated LAYERS for overlay compositing.
"""
return await self.get_wms_image(
layer_id=",".join(layer_ids),
date=date,
bbox=bbox,
width=width,
height=height,
image_format=image_format,
epsg=epsg,
)
async def compare_dates(
self,
layer_id: str,
date_before: str,
date_after: str,
bbox: BBox,
width: int = 1024,
height: int = 512,
image_format: str = "image/jpeg",
) -> bytes:
"""Fetch two images and compose a side-by-side comparison.
Returns a single image with the "before" date on the left
and the "after" date on the right, each labeled.
"""
img_before = await self.get_wms_image(
layer_id, date_before, bbox, width, height, image_format,
)
img_after = await self.get_wms_image(
layer_id, date_after, bbox, width, height, image_format,
)
pil_before = Image.open(BytesIO(img_before))
pil_after = Image.open(BytesIO(img_after))
# Create side-by-side composite
total_width = pil_before.width + pil_after.width
max_height = max(pil_before.height, pil_after.height)
composite = Image.new("RGB", (total_width, max_height))
composite.paste(pil_before, (0, 0))
composite.paste(pil_after, (pil_before.width, 0))
buf = BytesIO()
composite.save(buf, format="JPEG", quality=90)
return buf.getvalue()
# --- DescribeDomains (date ranges) ---
async def describe_domains(
self,
layer_id: str,
epsg: str = DEFAULT_EPSG,
) -> dict:
"""Query WMTS DescribeDomains for available date ranges.
Returns a dict with 'time_domain' key (ISO 8601 interval or list
of dates) and 'spatial_domain' if available.
"""
url = WMTS_DESCRIBE_DOMAINS_URL.format(epsg=epsg, layer_id=layer_id)
resp = await self.http.get(url)
resp.raise_for_status()
# DescribeDomains returns XML — extract time domain
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.text)
result: dict[str, str] = {}
# Look for time domain in various possible locations
for elem in root.iter():
tag = elem.tag.rpartition("}")[-1] if "}" in elem.tag else elem.tag
if tag.lower() in ("timedomain", "value") and elem.text:
text = elem.text.strip()
if text and ("/" in text or "-" in text):
result["time_domain"] = text
break
return result
# --- WMTS tile URL builder ---
def build_tile_url(
self,
layer_id: str,
date: str,
zoom: int,
row: int,
col: int,
tile_matrix_set: str = "250m",
ext: str = "jpg",
epsg: str = DEFAULT_EPSG,
) -> str:
"""Build a direct WMTS REST tile URL."""
return WMTS_TILE_URL.format(
epsg=epsg,
layer_id=layer_id,
date=date,
tile_matrix_set=tile_matrix_set,
z=zoom,
row=row,
col=col,
ext=ext,
)
# --- Legend image ---
async def get_legend_image(
self,
layer_id: str,
orientation: str = "horizontal",
) -> bytes | None:
"""Fetch the pre-rendered legend image for a layer.
GIBS provides legend images via the GetLegendGraphic WMS call.
"""
layer = self.layer_index.get(layer_id)
if layer and layer.legend_url:
try:
resp = await self.http.get(layer.legend_url)
resp.raise_for_status()
return resp.content
except httpx.HTTPError:
pass
# Fallback to WMS GetLegendGraphic
url = WMS_BASE.format(epsg=DEFAULT_EPSG)
params = {
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetLegendGraphic",
"LAYER": layer_id,
"FORMAT": "image/png",
}
if orientation == "vertical":
params["LEGEND_OPTIONS"] = "layout:vertical"
try:
resp = await self.http.get(url, params=params)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "image" in content_type:
return resp.content
except httpx.HTTPError as exc:
log.debug("Legend not available for %s: %s", layer_id, exc)
return None

475
src/mcgibs/colormaps.py Normal file
View File

@ -0,0 +1,475 @@
"""GIBS Colormap v1.3 XML parser and natural-language summary generator.
Parses NASA GIBS colormap XML documents into structured models and generates
human-readable explanations of what the colors represent. This is the key
differentiator -- most GIBS tools just show tiles, but we tell you what
the colors actually mean.
"""
import re
import xml.etree.ElementTree as ET
from mcgibs.models import ColorMap, ColorMapEntry, ColorMapSet, LegendEntry
# --- Unit conversion helpers ---
_KELVIN_OFFSET = 273.15
def _kelvin_to_celsius(k: float) -> float:
"""Convert Kelvin to Celsius."""
return k - _KELVIN_OFFSET
# Map of units that benefit from inline conversion in explanations.
# key = canonical unit string (lowercase), value = (converter, target_unit)
_UNIT_CONVERTERS: dict[str, tuple] = {
"k": (_kelvin_to_celsius, "C"),
"kelvin": (_kelvin_to_celsius, "C"),
}
# --- Color naming ---
def _describe_rgb(rgb: tuple[int, int, int]) -> str:
"""Return an approximate human-friendly color name for an RGB triple.
Not meant to be authoritative -- just helpful enough that someone
reading the explanation can picture the color without seeing it.
"""
r, g, b = rgb
# Greyscale shortcuts
brightness = (r + g + b) / 3
max_c = max(r, g, b)
spread = max_c - min(r, g, b)
if spread < 20:
if brightness < 40:
return "black"
if brightness < 100:
return "dark grey"
if brightness < 180:
return "grey"
if brightness < 230:
return "light grey"
return "white"
# Determine hue (simplified)
if max_c == 0:
return "black"
# Saturation proxy (0-1)
saturation = spread / max_c
# Lightness proxy
lightness = brightness / 255
prefix = ""
if lightness < 0.3:
prefix = "dark "
elif lightness > 0.75 and saturation < 0.5:
prefix = "light "
elif lightness > 0.8:
prefix = "pale "
# Hue angle (degrees)
if spread == 0:
hue = 0.0
else:
fr = (max_c - r) / spread
fg = (max_c - g) / spread
fb = (max_c - b) / spread
if r == max_c:
hue = fb - fg
elif g == max_c:
hue = 2.0 + fr - fb
else:
hue = 4.0 + fg - fr
hue = (hue * 60) % 360
# Map hue to name
if hue < 15 or hue >= 345:
name = "red"
elif hue < 40:
name = "orange"
elif hue < 70:
name = "yellow"
elif hue < 85:
name = "yellow-green"
elif hue < 160:
name = "green"
elif hue < 185:
name = "cyan"
elif hue < 250:
name = "blue"
elif hue < 290:
name = "purple"
elif hue < 330:
name = "magenta"
else:
name = "pink"
# Adjust for low-saturation warm tones
if saturation < 0.3 and name in ("red", "orange", "yellow"):
name = "tan" if lightness > 0.5 else "brown"
return f"{prefix}{name}".strip()
# --- Interval notation parser ---
_INTERVAL_RE = re.compile(
r"""
[\[\(] # opening bracket
\s*
([+\-]?(?:INF|\d+(?:\.\d+)?(?:[eE][+\-]?\d+)?)) # low value or -INF
\s*,\s*
([+\-]?(?:INF|\d+(?:\.\d+)?(?:[eE][+\-]?\d+)?)) # high value or +INF
\s*
[\]\)] # closing bracket
""",
re.VERBOSE | re.IGNORECASE,
)
_SINGLE_VALUE_RE = re.compile(
r"[\[\(]\s*([+\-]?\d+(?:\.\d+)?(?:[eE][+\-]?\d+)?)\s*[\]\)]"
)
def _parse_interval_value(interval: str) -> tuple[float | None, float | None]:
"""Parse GIBS interval notation into (low, high) numeric bounds.
Handles these forms:
"[200.0,200.5)" -> (200.0, 200.5)
"[-INF,200.0)" -> (None, 200.0)
"[320.0,+INF)" -> (320.0, None)
"[42]" -> (42.0, 42.0)
"""
if not interval:
return (None, None)
interval = interval.strip()
m = _INTERVAL_RE.match(interval)
if m:
low_str, high_str = m.group(1), m.group(2)
low = None if "INF" in low_str.upper() else float(low_str)
high = None if "INF" in high_str.upper() else float(high_str)
return (low, high)
# Single-value bracket: "[42]"
m = _SINGLE_VALUE_RE.match(interval)
if m:
val = float(m.group(1))
return (val, val)
# Last resort: try bare number
try:
val = float(interval)
return (val, val)
except ValueError:
return (None, None)
# --- XML parsing ---
def _parse_rgb(raw: str) -> tuple[int, int, int]:
"""Parse "r,g,b" string into an integer triple."""
parts = raw.split(",")
return (int(parts[0]), int(parts[1]), int(parts[2]))
def parse_colormap(xml_text: str) -> ColorMapSet:
"""Parse a GIBS Colormap v1.3 XML document into a ColorMapSet.
Expects the root element to be <ColorMaps> containing one or more
<ColorMap> children. Each <ColorMap> contains <ColorMapEntry> elements
and an optional <Legend> with <LegendEntry> children.
"""
root = ET.fromstring(xml_text)
maps: list[ColorMap] = []
# Handle both with and without namespace
colormap_elements = root.findall("ColorMap")
if not colormap_elements:
colormap_elements = root.findall(".//{*}ColorMap")
for cm_elem in colormap_elements:
title = cm_elem.get("title", "")
units = cm_elem.get("units", "")
# Parse entries
entries: list[ColorMapEntry] = []
for entry_elem in cm_elem.findall("ColorMapEntry"):
rgb_raw = entry_elem.get("rgb", "0,0,0")
entries.append(
ColorMapEntry(
rgb=_parse_rgb(rgb_raw),
transparent=entry_elem.get("transparent", "false").lower() == "true",
nodata=entry_elem.get("nodata", "false").lower() == "true",
value=entry_elem.get("value"),
source_value=entry_elem.get("sourceValue"),
label=entry_elem.get("label"),
ref=entry_elem.get("ref"),
)
)
# Parse legend
legend_entries: list[LegendEntry] = []
legend_type = "continuous"
legend_elem = cm_elem.find("Legend")
if legend_elem is not None:
legend_type = legend_elem.get("type", "continuous")
for le_elem in legend_elem.findall("LegendEntry"):
rgb_raw = le_elem.get("rgb", "0,0,0")
legend_entries.append(
LegendEntry(
rgb=_parse_rgb(rgb_raw),
tooltip=le_elem.get("tooltip", ""),
show_tick=le_elem.get("showTick", "false").lower() == "true",
show_label=le_elem.get("showLabel", "false").lower() == "true",
id=le_elem.get("id"),
)
)
maps.append(
ColorMap(
title=title,
units=units,
entries=entries,
legend=legend_entries,
legend_type=legend_type,
)
)
return ColorMapSet(maps=maps)
# --- Natural-language explanation ---
def _format_value(val: float, units: str) -> str:
"""Format a numeric value with optional unit conversion."""
units_lower = units.lower().strip()
converter_info = _UNIT_CONVERTERS.get(units_lower)
# Round for readability
if abs(val) >= 100:
display = f"{val:.0f}"
elif abs(val) >= 1:
display = f"{val:.1f}"
else:
display = f"{val:.2f}"
result = f"{display} {units}" if units else display
if converter_info:
converter, target_unit = converter_info
converted = converter(val)
if abs(converted) >= 100:
conv_display = f"{converted:.0f}"
elif abs(converted) >= 1:
conv_display = f"{converted:.1f}"
else:
conv_display = f"{converted:.2f}"
result += f" ({conv_display} {target_unit})"
return result
def _pick_sample_indices(n: int, count: int = 5) -> list[int]:
"""Pick evenly-spaced sample indices from a range of n items.
Returns up to `count` indices including first and last.
"""
if n <= 0:
return []
if n <= count:
return list(range(n))
step = (n - 1) / (count - 1)
return [round(i * step) for i in range(count)]
def _entry_value_midpoint(entry: ColorMapEntry) -> float | None:
"""Extract the representative numeric value for a colormap entry.
For ranges, returns the midpoint. For open-ended intervals, returns
the finite bound.
"""
if not entry.value:
return None
low, high = _parse_interval_value(entry.value)
if low is not None and high is not None:
return (low + high) / 2
if low is not None:
return low
if high is not None:
return high
return None
def _describe_temperature_feel(celsius: float) -> str:
"""Return a short qualitative descriptor for a temperature in Celsius."""
if celsius < -40:
return "extreme cold"
if celsius < -20:
return "severe cold"
if celsius < -5:
return "cold"
if celsius < 5:
return "near freezing"
if celsius < 20:
return "mild"
if celsius < 30:
return "warm"
if celsius < 40:
return "hot"
return "extreme heat"
def _describe_entry(entry: ColorMapEntry, units: str) -> str:
"""Build a single bullet-point description for one colormap entry."""
color_name = _describe_rgb(entry.rgb)
if entry.label:
return f"{color_name} = {entry.label}"
if not entry.value:
return color_name
low, high = _parse_interval_value(entry.value)
units_lower = units.lower().strip()
is_temp = units_lower in ("k", "kelvin")
# Open-ended low: below threshold
if low is None and high is not None:
val_str = _format_value(high, units)
desc = f"{color_name} = below {val_str}"
if is_temp:
desc += f" -- {_describe_temperature_feel(_kelvin_to_celsius(high))}"
return desc
# Open-ended high: above threshold
if low is not None and high is None:
val_str = _format_value(low, units)
desc = f"{color_name} = above {val_str}"
if is_temp:
desc += f" -- {_describe_temperature_feel(_kelvin_to_celsius(low))}"
return desc
# Bounded range
if low is not None and high is not None:
mid = (low + high) / 2
val_str = _format_value(mid, units)
desc = f"{color_name} = ~{val_str}"
if is_temp:
desc += f" -- {_describe_temperature_feel(_kelvin_to_celsius(mid))}"
return desc
return color_name
def _gradient_summary(data_entries: list[ColorMapEntry]) -> str:
"""Produce a one-line summary of the overall color gradient direction."""
if len(data_entries) < 2:
return ""
first_color = _describe_rgb(data_entries[0].rgb)
last_color = _describe_rgb(data_entries[-1].rgb)
# Find a midpoint color for richer description
mid_idx = len(data_entries) // 2
mid_color = _describe_rgb(data_entries[mid_idx].rgb)
if first_color == last_color:
return f"Color gradient passes through {mid_color} tones."
return (
f"Color gradient runs from {first_color}s (low values) "
f"through {mid_color}s to {last_color}s (high values)."
)
def explain_colormap(colormap_set: ColorMapSet) -> str:
"""Generate a natural-language explanation of a GIBS colormap.
This is the key function -- it turns raw XML colormap data into
something a human (or an LLM composing a response) can understand
without needing to stare at an RGB table.
"""
data_map = colormap_set.data_map
if data_map is None:
return "No colormap data available."
# Filter to non-transparent, non-nodata entries for analysis
data_entries = [
e for e in data_map.entries
if not e.transparent and not e.nodata
]
if not data_entries:
return "This colormap contains only no-data / transparent entries."
title = data_map.title or "Untitled layer"
units = data_map.units or ""
# Check if this is a classification (categorical) colormap
has_numeric_values = any(_entry_value_midpoint(e) is not None for e in data_entries)
if not has_numeric_values:
# Classification / categorical colormap
lines = [f"{title}:"]
if data_map.legend_type == "classification":
lines[0] = f"{title} (classification):"
for entry in data_entries:
label = entry.label or ""
color_name = _describe_rgb(entry.rgb)
if label:
lines.append(f" - {color_name} = {label}")
else:
lines.append(f" - {color_name}")
lines.append("")
lines.append("This layer uses a classification colormap with distinct categories.")
return "\n".join(lines)
# Continuous / discrete numeric colormap
header = title
if units:
header += f" ({units})"
header += ":"
sample_indices = _pick_sample_indices(len(data_entries))
sampled = [data_entries[i] for i in sample_indices]
lines = [header]
for entry in sampled:
bullet = _describe_entry(entry, units)
lines.append(f" - {bullet}")
# Append gradient summary
gradient = _gradient_summary(data_entries)
if gradient:
lines.append("")
lines.append(gradient)
# Note entry count for context
total = len(data_entries)
shown = len(sampled)
if total > shown:
lines.append(f"({total} color stops total, {shown} key values shown)")
# Add no-data note if present in the full set
nodata_entries = []
for cm in colormap_set.maps:
for e in cm.entries:
if e.nodata:
label = e.label or "No Data"
nodata_entries.append(label)
if nodata_entries:
lines.append(f"Transparent / no-data: {', '.join(nodata_entries)}")
return "\n".join(lines)

67
src/mcgibs/constants.py Normal file
View File

@ -0,0 +1,67 @@
"""GIBS API endpoints, EPSG codes, and TileMatrixSet definitions."""
# GIBS base URLs — domain sharding (gibs-a/b/c) for parallel tile fetches
WMTS_BASE = "https://gibs.earthdata.nasa.gov/wmts/epsg{epsg}/best"
WMS_BASE = "https://gibs.earthdata.nasa.gov/wms/epsg{epsg}/best/wms.cgi"
COLORMAP_BASE = "https://gibs.earthdata.nasa.gov/colormaps/v1.3"
LAYER_METADATA_BASE = "https://gibs.earthdata.nasa.gov/layer-metadata/v1.0"
# GetCapabilities and DescribeDomains
WMTS_CAPABILITIES_URL = WMTS_BASE + "/1.0.0/WMTSCapabilities.xml"
WMTS_DESCRIBE_DOMAINS_URL = (
WMTS_BASE + "/wmts.cgi?SERVICE=WMTS&VERSION=1.0.0"
"&REQUEST=DescribeDomains&LAYER={layer_id}"
)
# WMTS REST tile URL pattern
WMTS_TILE_URL = (
WMTS_BASE + "/{layer_id}/default/{date}/{tile_matrix_set}"
"/{z}/{row}/{col}.{ext}"
)
# Nominatim geocoding
NOMINATIM_BASE = "https://nominatim.openstreetmap.org"
# Supported projections — EPSG code → human label
PROJECTIONS = {
"4326": "Geographic (EPSG:4326, WGS84)",
"3857": "Web Mercator (EPSG:3857)",
"3413": "Arctic Polar Stereographic (EPSG:3413)",
"3031": "Antarctic Polar Stereographic (EPSG:3031)",
}
# Default projection for most requests
DEFAULT_EPSG = "4326"
# WMTS XML namespaces used in GetCapabilities
WMTS_NS = {
"wmts": "http://www.opengis.net/wmts/1.0",
"ows": "http://www.opengis.net/ows/1.1",
"xlink": "http://www.w3.org/1999/xlink",
"gml": "http://www.opengis.net/gml",
}
# TileMatrixSet definitions for EPSG:4326
# Maps human-readable resolution names to zoom levels and pixel sizes
TILE_MATRIX_SETS_4326 = {
"2km": {"max_zoom": 5, "tile_size": 512},
"1km": {"max_zoom": 6, "tile_size": 512},
"500m": {"max_zoom": 7, "tile_size": 512},
"250m": {"max_zoom": 8, "tile_size": 512},
"31.25m": {"max_zoom": 11, "tile_size": 512},
"15.625m": {"max_zoom": 12, "tile_size": 512},
}
# WMS request defaults
WMS_DEFAULTS = {
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetMap",
"SRS": "EPSG:4326",
"FORMAT": "image/jpeg",
"WIDTH": "1024",
"HEIGHT": "1024",
}
# User-Agent for Nominatim (required by their usage policy)
USER_AGENT = "mcgibs-mcp-server/2026.02.18 (ryan@supported.systems)"

149
src/mcgibs/geo.py Normal file
View File

@ -0,0 +1,149 @@
"""Async Nominatim geocoding with rate limiting and in-memory caching."""
import asyncio
import logging
import time
import httpx
from mcgibs.constants import NOMINATIM_BASE, USER_AGENT
from mcgibs.models import BBox, GeocodingResult
log = logging.getLogger(__name__)
# Nominatim usage policy: max 1 request per second.
_nominatim_lock = asyncio.Lock()
_last_request_time = 0.0
async def _rate_limit() -> None:
"""Enforce a minimum 1-second gap between Nominatim requests."""
global _last_request_time
async with _nominatim_lock:
now = time.monotonic()
elapsed = now - _last_request_time
if elapsed < 1.0:
await asyncio.sleep(1.0 - elapsed)
_last_request_time = time.monotonic()
async def geocode(
client: httpx.AsyncClient,
query: str,
cache: dict,
) -> GeocodingResult | None:
"""Geocode a place name via Nominatim.
Args:
client: Shared httpx async client.
query: Free-form place name (e.g. "Tokyo", "Amazon River").
cache: Dict used as an in-memory dedup cache (query -> result).
Returns:
GeocodingResult on success, None if no results found.
"""
key = query.strip().lower()
if key in cache:
log.debug("Geocode cache hit: %s", key)
return cache[key]
await _rate_limit()
params = {
"q": query,
"format": "json",
"limit": "1",
"bounded": "0",
}
headers = {"User-Agent": USER_AGENT}
try:
resp = await client.get(
f"{NOMINATIM_BASE}/search",
params=params,
headers=headers,
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as exc:
log.warning("Nominatim request failed for %r: %s", query, exc)
return None
if not data:
log.debug("Nominatim returned no results for %r", query)
cache[key] = None
return None
hit = data[0]
# Nominatim bbox order: [south_lat, north_lat, west_lon, east_lon]
raw_bbox = hit.get("boundingbox", [])
if len(raw_bbox) == 4:
south, north, west, east = (float(v) for v in raw_bbox)
bbox = BBox(west=west, south=south, east=east, north=north)
else:
# Fall back to a small bbox around the point.
lat = float(hit["lat"])
lon = float(hit["lon"])
bbox = bbox_from_point(lat, lon, radius_deg=0.5)
result = GeocodingResult(
display_name=hit.get("display_name", query),
bbox=bbox,
lat=float(hit["lat"]),
lon=float(hit["lon"]),
osm_type=hit.get("osm_type", ""),
importance=float(hit.get("importance", 0.0)),
)
cache[key] = result
log.debug("Geocoded %r -> %s", query, result.display_name)
return result
def expand_bbox(bbox: BBox, factor: float = 0.1) -> BBox:
"""Expand a bounding box by a proportional factor in each direction.
A factor of 0.1 adds 10% of the width/height as padding on each side.
Results are clamped to valid geographic ranges.
Args:
bbox: Original bounding box.
factor: Expansion ratio (0.1 = 10% padding per side).
Returns:
New BBox with expanded extents, clamped to [-180, 180] lon
and [-90, 90] lat.
"""
dlon = (bbox.east - bbox.west) * factor
dlat = (bbox.north - bbox.south) * factor
# Ensure a minimum expansion so zero-area boxes still grow.
dlon = max(dlon, 0.001)
dlat = max(dlat, 0.001)
return BBox(
west=max(-180.0, bbox.west - dlon),
south=max(-90.0, bbox.south - dlat),
east=min(180.0, bbox.east + dlon),
north=min(90.0, bbox.north + dlat),
)
def bbox_from_point(lat: float, lon: float, radius_deg: float = 0.5) -> BBox:
"""Create a bounding box centered on a geographic point.
Args:
lat: Latitude in decimal degrees.
lon: Longitude in decimal degrees.
radius_deg: Half-width/height of the box in degrees.
Returns:
BBox centered on (lat, lon), clamped to valid ranges.
"""
return BBox(
west=max(-180.0, lon - radius_deg),
south=max(-90.0, lat - radius_deg),
east=min(180.0, lon + radius_deg),
north=min(90.0, lat + radius_deg),
)

115
src/mcgibs/models.py Normal file
View File

@ -0,0 +1,115 @@
"""Pydantic models for GIBS layer metadata, colormaps, and API responses."""
from __future__ import annotations
from pydantic import BaseModel, Field
class BBox(BaseModel):
"""Bounding box in decimal degrees (EPSG:4326)."""
west: float = Field(description="Western longitude (-180 to 180)")
south: float = Field(description="Southern latitude (-90 to 90)")
east: float = Field(description="Eastern longitude (-180 to 180)")
north: float = Field(description="Northern latitude (-90 to 90)")
@property
def wms_bbox(self) -> str:
"""Format as WMS BBOX parameter: minx,miny,maxx,maxy."""
return f"{self.west},{self.south},{self.east},{self.north}"
@property
def area_sq_deg(self) -> float:
return abs(self.east - self.west) * abs(self.north - self.south)
class TimeDimension(BaseModel):
"""Temporal extent of a layer."""
default: str | None = None
value: str | None = None # ISO 8601 interval like "2020-01-01/2024-12-31/P1D"
start: str | None = None
end: str | None = None
period: str | None = None # P1D, P1M, P8D, etc.
class LayerInfo(BaseModel):
"""Parsed metadata for a single GIBS visualization layer."""
identifier: str
title: str
abstract: str = ""
formats: list[str] = Field(default_factory=list)
tile_matrix_sets: list[str] = Field(default_factory=list)
time: TimeDimension | None = None
bbox: BBox | None = None
has_colormap: bool = False
colormap_id: str | None = None
legend_url: str | None = None
resource_url_template: str | None = None
# Enriched from layer-metadata JSON
measurement: str | None = None
instrument: str | None = None
platform: str | None = None
period: str | None = None # "Daily", "Monthly", etc.
ongoing: bool | None = None
day_night: str | None = None
description: str | None = None
class ColorMapEntry(BaseModel):
"""Single entry mapping an RGB color to a data value range."""
rgb: tuple[int, int, int]
transparent: bool = False
nodata: bool = False
value: str | None = None # Interval notation: "[200.0,200.5)"
source_value: str | None = None
label: str | None = None
ref: str | None = None
class LegendEntry(BaseModel):
"""Entry in a colormap legend."""
rgb: tuple[int, int, int]
tooltip: str = ""
show_tick: bool = False
show_label: bool = False
id: str | None = None
class ColorMap(BaseModel):
"""Parsed GIBS colormap with entries and legend."""
title: str = ""
units: str = ""
entries: list[ColorMapEntry] = Field(default_factory=list)
legend: list[LegendEntry] = Field(default_factory=list)
legend_type: str = "continuous" # or "discrete", "classification"
class ColorMapSet(BaseModel):
"""Complete colormap document which can contain multiple ColorMap elements."""
maps: list[ColorMap] = Field(default_factory=list)
@property
def data_map(self) -> ColorMap | None:
"""Return the primary data colormap (not no-data)."""
for m in self.maps:
if m.entries and not all(e.nodata for e in m.entries):
return m
return self.maps[0] if self.maps else None
class GeocodingResult(BaseModel):
"""Result from Nominatim geocoding."""
display_name: str
bbox: BBox
lat: float
lon: float
osm_type: str = ""
importance: float = 0.0

666
src/mcgibs/server.py Normal file
View File

@ -0,0 +1,666 @@
"""FastMCP server for NASA GIBS — tools, resources, and prompts.
Wires the GIBS client, geocoding, capabilities parser, and colormap
interpreter into a cohesive MCP server that lets LLMs discover and
fetch satellite imagery via natural language.
IMPORTANT: Do NOT use `from __future__ import annotations` here.
FastMCP needs runtime types for Literal, list[str], etc.
"""
import base64
import json
import logging
from fastmcp import FastMCP
from fastmcp.server.middleware import Middleware
from mcgibs.capabilities import search_layers
from mcgibs.client import GIBSClient
from mcgibs.constants import PROJECTIONS
from mcgibs.geo import expand_bbox
from mcgibs.models import BBox
log = logging.getLogger(__name__)
mcp = FastMCP(
"mcgibs",
instructions=(
"NASA Global Imagery Browse Services (GIBS) — "
"search 1000+ satellite visualization layers, fetch imagery by place name "
"and date, and get natural-language explanations of what the colors mean."
),
)
# Shared client instance — initialized via on_initialize middleware
_client: GIBSClient | None = None
def _get_client() -> GIBSClient:
"""Retrieve the initialized GIBS client."""
if _client is None:
raise RuntimeError("GIBS client not initialized")
return _client
# --- Middleware: initialize client on session start ---
class GIBSInitMiddleware(Middleware):
"""Load GIBS capabilities when the first client connects."""
async def on_initialize(self, context, call_next):
global _client
if _client is None:
log.info("Initializing GIBS client and loading capabilities...")
_client = GIBSClient()
await _client.initialize()
log.info("GIBS client ready with %d layers", len(_client.layer_index))
return await call_next(context)
mcp.middleware.append(GIBSInitMiddleware())
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# TOOLS — Discovery
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@mcp.tool(
description="Search NASA GIBS satellite imagery layers by keyword. "
"Returns matching layers with titles, identifiers, and date ranges."
)
async def search_gibs_layers(
query: str,
measurement: str | None = None,
period: str | None = None,
ongoing: bool | None = None,
limit: int = 20,
) -> str:
"""Search across 1000+ GIBS visualization layers.
Args:
query: Keywords to search (e.g. "sea ice", "temperature", "MODIS fire").
measurement: Filter by measurement category (e.g. "Sea Ice", "Temperature").
period: Filter by temporal period ("Daily", "Monthly", "8-Day").
ongoing: If True, only actively updated layers. If False, only ended datasets.
limit: Maximum results to return.
"""
client = _get_client()
results = search_layers(
client.layer_index, query, measurement, period, ongoing, limit,
)
if not results:
return f"No layers found matching '{query}'."
lines = [f"Found {len(results)} layer(s) matching '{query}':\n"]
for layer in results:
date_info = ""
if layer.time:
parts = []
if layer.time.start:
parts.append(f"from {layer.time.start}")
if layer.time.end:
parts.append(f"to {layer.time.end}")
if layer.time.period:
parts.append(f"({layer.time.period})")
date_info = "" + " ".join(parts) if parts else ""
lines.append(f" {layer.identifier}")
lines.append(f" {layer.title}{date_info}")
if layer.has_colormap:
lines.append(" [has colormap]")
lines.append("")
return "\n".join(lines)
@mcp.tool(
description="Get detailed metadata for a specific GIBS layer including "
"dates, resolution, formats, projections, and colormap availability."
)
async def get_layer_info(layer_id: str) -> str:
"""Fetch full metadata for a layer.
Args:
layer_id: The GIBS layer identifier (e.g. "MODIS_Terra_CorrectedReflectance_TrueColor").
"""
client = _get_client()
layer = client.get_layer(layer_id)
if layer is None:
return f"Layer '{layer_id}' not found. Use search_gibs_layers to find valid identifiers."
# Enrich with layer-metadata JSON
await client.fetch_layer_metadata(layer_id)
info = {
"identifier": layer.identifier,
"title": layer.title,
"formats": layer.formats,
"tile_matrix_sets": layer.tile_matrix_sets,
"has_colormap": layer.has_colormap,
}
if layer.abstract:
info["abstract"] = layer.abstract
if layer.measurement:
info["measurement"] = layer.measurement
if layer.instrument:
info["instrument"] = layer.instrument
if layer.platform:
info["platform"] = layer.platform
if layer.period:
info["period"] = layer.period
if layer.ongoing is not None:
info["ongoing"] = layer.ongoing
if layer.day_night:
info["day_night"] = layer.day_night
if layer.description:
info["description"] = layer.description
if layer.time:
info["time"] = {
"start": layer.time.start,
"end": layer.time.end,
"period": layer.time.period,
"default": layer.time.default,
}
if layer.bbox:
info["bbox"] = {
"west": layer.bbox.west,
"south": layer.bbox.south,
"east": layer.bbox.east,
"north": layer.bbox.north,
}
return json.dumps(info, indent=2)
@mcp.tool(
description="List all measurement categories available in GIBS with layer counts."
)
async def list_measurements() -> str:
"""List measurement categories across all layers."""
client = _get_client()
# Collect measurements — we need to fetch metadata for layers
# that haven't been enriched yet. For efficiency, work from
# the layer titles which often encode the measurement.
measurements: dict[str, int] = {}
for layer in client.layer_index.values():
key = layer.measurement or "Unknown"
measurements[key] = measurements.get(key, 0) + 1
if not measurements:
return "No measurement categories available."
lines = ["Measurement categories:\n"]
for name, count in sorted(measurements.items()):
lines.append(f" {name}: {count} layer(s)")
return "\n".join(lines)
@mcp.tool(
description="Check available date ranges for a GIBS layer via WMTS DescribeDomains."
)
async def check_layer_dates(
layer_id: str,
start_date: str | None = None,
end_date: str | None = None,
) -> str:
"""Query what dates are available for a specific layer.
Args:
layer_id: The GIBS layer identifier.
start_date: Optional start date filter (YYYY-MM-DD).
end_date: Optional end date filter (YYYY-MM-DD).
"""
client = _get_client()
layer = client.get_layer(layer_id)
if layer is None:
return f"Layer '{layer_id}' not found."
# First, report what the capabilities document says
lines = [f"Date information for {layer_id}:"]
if layer.time:
if layer.time.start:
lines.append(f" Start: {layer.time.start}")
if layer.time.end:
lines.append(f" End: {layer.time.end}")
if layer.time.period:
lines.append(f" Period: {layer.time.period}")
if layer.time.default:
lines.append(f" Default/latest: {layer.time.default}")
else:
lines.append(" No time dimension (static layer)")
# Try DescribeDomains for more precise info
try:
domains = await client.describe_domains(layer_id)
if "time_domain" in domains:
lines.append(f" Live time domain: {domains['time_domain']}")
except Exception as exc:
log.debug("DescribeDomains failed for %s: %s", layer_id, exc)
return "\n".join(lines)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# TOOLS — Imagery
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
async def _resolve_bbox(
client: GIBSClient,
bbox: list[float] | None,
place: str | None,
) -> BBox:
"""Resolve a bounding box from either explicit coordinates or a place name."""
if bbox and len(bbox) == 4:
return BBox(west=bbox[0], south=bbox[1], east=bbox[2], north=bbox[3])
if place:
result = await client.resolve_place(place)
if result:
return expand_bbox(result.bbox, factor=0.1)
raise ValueError(f"Could not geocode '{place}'. Try providing explicit bbox coordinates.")
raise ValueError("Provide either 'bbox' [west, south, east, north] or 'place' name.")
@mcp.tool(
description="Fetch satellite imagery for a specific layer, date, and region. "
"Provide either a place name or bbox coordinates. Returns the image."
)
async def get_imagery(
layer_id: str,
date: str,
bbox: list[float] | None = None,
place: str | None = None,
width: int = 1024,
height: int = 1024,
format: str = "jpeg",
) -> list[dict]:
"""Fetch GIBS imagery via WMS.
Args:
layer_id: GIBS layer identifier.
date: Date in YYYY-MM-DD format.
bbox: Bounding box as [west, south, east, north] in decimal degrees.
place: Place name to geocode (e.g. "California", "Amazon Basin").
width: Image width in pixels.
height: Image height in pixels.
format: Image format "jpeg" or "png".
"""
client = _get_client()
layer = client.get_layer(layer_id)
if layer is None:
return [{"type": "text", "text": f"Layer '{layer_id}' not found."}]
resolved_bbox = await _resolve_bbox(client, bbox, place)
image_format = f"image/{format}"
image_bytes = await client.get_wms_image(
layer_id, date, resolved_bbox, width, height, image_format,
)
description = (
f"{layer.title}{date}\n"
f"Region: {place or resolved_bbox.wms_bbox}\n"
f"Size: {width}x{height}"
)
# If the layer has a colormap, add a hint about explain_colormap
if layer.has_colormap:
description += "\nTip: use explain_colormap to understand what the colors represent."
mime = f"image/{format}"
b64 = base64.b64encode(image_bytes).decode()
return [
{"type": "text", "text": description},
{"type": "image", "data": b64, "mimeType": mime},
]
@mcp.tool(
description="Compare a layer across two dates with a side-by-side image. "
"Useful for tracking changes over time (fires, floods, ice extent)."
)
async def compare_dates(
layer_id: str,
date_before: str,
date_after: str,
bbox: list[float] | None = None,
place: str | None = None,
) -> list[dict]:
"""Side-by-side comparison of two dates.
Args:
layer_id: GIBS layer identifier.
date_before: Earlier date (YYYY-MM-DD).
date_after: Later date (YYYY-MM-DD).
bbox: Bounding box as [west, south, east, north].
place: Place name to geocode.
"""
client = _get_client()
layer = client.get_layer(layer_id)
if layer is None:
return [{"type": "text", "text": f"Layer '{layer_id}' not found."}]
resolved_bbox = await _resolve_bbox(client, bbox, place)
composite_bytes = await client.compare_dates(
layer_id, date_before, date_after, resolved_bbox,
)
description = (
f"{layer.title} — comparison\n"
f"Left: {date_before} | Right: {date_after}\n"
f"Region: {place or resolved_bbox.wms_bbox}"
)
b64 = base64.b64encode(composite_bytes).decode()
return [
{"type": "text", "text": description},
{"type": "image", "data": b64, "mimeType": "image/jpeg"},
]
@mcp.tool(
description="Overlay multiple GIBS layers into a single composite image. "
"WMS supports up to 5 layers composited together."
)
async def get_imagery_composite(
layer_ids: list[str],
date: str,
bbox: list[float] | None = None,
place: str | None = None,
width: int = 1024,
height: int = 1024,
) -> list[dict]:
"""Multi-layer composite image.
Args:
layer_ids: List of GIBS layer identifiers to overlay (max 5).
date: Date in YYYY-MM-DD format.
bbox: Bounding box as [west, south, east, north].
place: Place name to geocode.
width: Image width in pixels.
height: Image height in pixels.
"""
client = _get_client()
if len(layer_ids) > 5:
return [{"type": "text", "text": "WMS supports at most 5 layers per composite."}]
resolved_bbox = await _resolve_bbox(client, bbox, place)
image_bytes = await client.get_wms_composite(
layer_ids, date, resolved_bbox, width, height,
)
layer_names = ", ".join(layer_ids)
description = (
f"Composite: {layer_names}\n"
f"Date: {date}\n"
f"Region: {place or resolved_bbox.wms_bbox}"
)
b64 = base64.b64encode(image_bytes).decode()
return [
{"type": "text", "text": description},
{"type": "image", "data": b64, "mimeType": "image/jpeg"},
]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# TOOLS — Interpretation
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@mcp.tool(
description="Explain what the colors in a GIBS layer mean. "
"Returns a natural-language description mapping colors to scientific values and units."
)
async def explain_layer_colormap(layer_id: str) -> str:
"""Get natural-language explanation of a layer's colormap.
Args:
layer_id: GIBS layer identifier.
"""
client = _get_client()
return await client.explain_layer_colormap(layer_id)
@mcp.tool(
description="Fetch the pre-rendered legend image for a GIBS layer."
)
async def get_legend(
layer_id: str,
orientation: str = "horizontal",
) -> list[dict]:
"""Fetch the legend graphic for a layer.
Args:
layer_id: GIBS layer identifier.
orientation: "horizontal" or "vertical".
"""
client = _get_client()
legend_bytes = await client.get_legend_image(layer_id, orientation)
if legend_bytes is None:
return [{"type": "text", "text": f"No legend available for '{layer_id}'."}]
b64 = base64.b64encode(legend_bytes).decode()
return [
{"type": "text", "text": f"Legend for {layer_id}"},
{"type": "image", "data": b64, "mimeType": "image/png"},
]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# TOOLS — Utility
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@mcp.tool(
description="Geocode a place name to geographic coordinates and bounding box. "
"Uses OpenStreetMap Nominatim."
)
async def resolve_place(place: str) -> str:
"""Resolve a place name to coordinates and bounding box.
Args:
place: Place name (e.g. "Tokyo", "California", "Great Barrier Reef").
"""
client = _get_client()
result = await client.resolve_place(place)
if result is None:
return f"Could not geocode '{place}'. Try a more specific name."
return json.dumps({
"display_name": result.display_name,
"lat": result.lat,
"lon": result.lon,
"bbox": {
"west": result.bbox.west,
"south": result.bbox.south,
"east": result.bbox.east,
"north": result.bbox.north,
},
}, indent=2)
@mcp.tool(
description="Construct a direct WMTS tile URL for a specific "
"layer, date, zoom level, and tile coordinates."
)
async def build_tile_url(
layer_id: str,
date: str,
zoom: int,
row: int,
col: int,
projection: str = "4326",
) -> str:
"""Build a WMTS REST tile URL.
Args:
layer_id: GIBS layer identifier.
date: Date in YYYY-MM-DD format.
zoom: Zoom level (0-12 depending on resolution).
row: Tile row.
col: Tile column.
projection: EPSG code ("4326", "3857", "3413", "3031").
"""
client = _get_client()
layer = client.get_layer(layer_id)
ext = "jpg"
tile_matrix_set = "250m"
if layer:
if layer.formats and "image/png" in layer.formats:
ext = "png"
if layer.tile_matrix_sets:
tile_matrix_set = layer.tile_matrix_sets[0]
url = client.build_tile_url(
layer_id, date, zoom, row, col, tile_matrix_set, ext, projection,
)
return url
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# RESOURCES
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@mcp.resource("gibs://catalog")
async def catalog_resource() -> str:
"""Full GIBS layer catalog grouped by measurement category."""
client = _get_client()
by_measurement: dict[str, list[dict]] = {}
for layer in client.layer_index.values():
key = layer.measurement or "Unknown"
if key not in by_measurement:
by_measurement[key] = []
by_measurement[key].append({
"id": layer.identifier,
"title": layer.title,
"has_colormap": layer.has_colormap,
})
return json.dumps(by_measurement, indent=2)
@mcp.resource("gibs://layer/{layer_id}")
async def layer_resource(layer_id: str) -> str:
"""Individual layer metadata as JSON."""
client = _get_client()
layer = client.get_layer(layer_id)
if layer is None:
return json.dumps({"error": f"Layer '{layer_id}' not found"})
await client.fetch_layer_metadata(layer_id)
return layer.model_dump_json(indent=2)
@mcp.resource("gibs://projections")
async def projections_resource() -> str:
"""Available GIBS projections with endpoint information."""
return json.dumps(PROJECTIONS, indent=2)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# PROMPTS
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@mcp.prompt
def investigate_event(
event_type: str,
location: str,
date: str,
) -> str:
"""Guided workflow for investigating a natural event with GIBS imagery.
Args:
event_type: Type of event (e.g. "wildfire", "hurricane", "flood", "volcanic eruption").
location: Where the event occurred.
date: Approximate date of the event (YYYY-MM-DD).
"""
lines = [
f"Investigate the {event_type} near {location} "
f"around {date} using NASA GIBS satellite imagery.",
"",
"Follow this workflow:",
"",
f'1. **Find relevant layers**: Search for layers related to '
f'"{event_type}" (e.g. fire layers for wildfires, '
f'precipitation for floods). Also search for "true color" '
f'or "corrected reflectance" for visual context.',
"",
f"2. **Verify dates**: Check that the layers have data "
f"available for {date}. Adjust the date if needed.",
"",
f'3. **Get imagery**: Fetch imagery for "{location}" on '
f"{date}. Use the true color layer first for visual context.",
"",
"4. **Compare before/after**: Get imagery from a few days "
"or weeks before the event and compare with imagery "
"during/after to show the impact.",
"",
"5. **Explain the data**: If the layers have colormaps, use "
"explain_colormap to understand what the colors represent.",
"",
f"6. **Summarize findings**: Describe what the satellite "
f"data reveals about the {event_type} — extent, intensity, "
f"progression, or impact visible from space.",
]
return "\n".join(lines)
@mcp.prompt
def earth_overview() -> str:
"""Introduction to GIBS and suggested explorations."""
lines = [
"You have access to NASA's Global Imagery Browse Services "
"(GIBS) through the mcgibs MCP server. GIBS provides 1000+ "
"satellite visualization layers covering the entire Earth.",
"",
"Here are some things you can explore:",
"",
'**Recent events**: Search for "fire", "aerosol", or '
'"flood" layers to investigate recent natural events.',
"",
'**Climate data**: Search for "temperature", "sea ice", '
'"snow cover", or "vegetation" to explore climate '
"indicators.",
"",
"**True color imagery**: The "
'"MODIS_Terra_CorrectedReflectance_TrueColor" layer '
"provides daily true-color Earth imagery — great for "
"seeing cloud patterns, smoke plumes, or algal blooms.",
"",
"**Time comparisons**: Use compare_dates to see how a "
"region has changed over time — glacier retreat, urban "
"expansion, seasonal changes.",
"",
"**Understanding the data**: Use explain_colormap on any "
"scientific layer to understand what the colors represent "
"in physical units.",
"",
"Start by searching for a topic that interests you, "
"or ask about a specific location and date!",
]
return "\n".join(lines)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Entry point
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def main():
mcp.run()

0
tests/__init__.py Normal file
View File

22
tests/conftest.py Normal file
View File

@ -0,0 +1,22 @@
"""Shared fixtures for mcgibs tests."""
from pathlib import Path
import pytest
FIXTURES_DIR = Path(__file__).parent / "fixtures"
@pytest.fixture
def capabilities_xml() -> str:
return (FIXTURES_DIR / "capabilities_sample.xml").read_text()
@pytest.fixture
def colormap_xml() -> str:
return (FIXTURES_DIR / "colormap_sample.xml").read_text()
@pytest.fixture
def layer_metadata_json() -> str:
return (FIXTURES_DIR / "layer_metadata_sample.json").read_text()

80
tests/fixtures/capabilities_sample.xml vendored Normal file
View File

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<Capabilities xmlns="http://www.opengis.net/wmts/1.0"
xmlns:ows="http://www.opengis.net/ows/1.1"
xmlns:xlink="http://www.w3.org/1999/xlink"
xmlns:gml="http://www.opengis.net/gml"
version="1.0.0">
<ows:ServiceIdentification>
<ows:Title>NASA GIBS WMTS</ows:Title>
<ows:ServiceType>OGC WMTS</ows:ServiceType>
<ows:ServiceTypeVersion>1.0.0</ows:ServiceTypeVersion>
</ows:ServiceIdentification>
<Contents>
<Layer>
<ows:Title xml:lang="en">Corrected Reflectance (True Color, Terra/MODIS)</ows:Title>
<ows:WGS84BoundingBox crs="urn:ogc:def:crs:OGC:2:84">
<ows:LowerCorner>-180 -90</ows:LowerCorner>
<ows:UpperCorner>180 90</ows:UpperCorner>
</ows:WGS84BoundingBox>
<ows:Identifier>MODIS_Terra_CorrectedReflectance_TrueColor</ows:Identifier>
<Style isDefault="true">
<ows:Title xml:lang="en">default</ows:Title>
<ows:Identifier>default</ows:Identifier>
</Style>
<Dimension>
<ows:Identifier>Time</ows:Identifier>
<ows:UOM>ISO8601</ows:UOM>
<Default>2025-12-01</Default>
<Value>2000-02-24/2025-12-01/P1D</Value>
</Dimension>
<TileMatrixSetLink><TileMatrixSet>250m</TileMatrixSet></TileMatrixSetLink>
<Format>image/jpeg</Format>
<ResourceURL template="https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/MODIS_Terra_CorrectedReflectance_TrueColor/default/{Time}/{TileMatrixSet}/{TileMatrix}/{TileRow}/{TileCol}.jpg" format="image/jpeg" resourceType="tile"/>
</Layer>
<Layer>
<ows:Title xml:lang="en">Sea Ice Concentration (AMSR2, Monthly)</ows:Title>
<ows:WGS84BoundingBox crs="urn:ogc:def:crs:OGC:2:84">
<ows:LowerCorner>-180 -90</ows:LowerCorner>
<ows:UpperCorner>180 90</ows:UpperCorner>
</ows:WGS84BoundingBox>
<ows:Identifier>AMSR2_Sea_Ice_Concentration_12km_Monthly</ows:Identifier>
<ows:Metadata xlink:role="http://earthdata.nasa.gov/gibs/metadata-type/colormap" xlink:href="https://gibs.earthdata.nasa.gov/colormaps/v1.3/AMSR2_Sea_Ice_Concentration_12km.xml" xlink:title="GIBS Colormap: Sea Ice Concentration"/>
<Style isDefault="true">
<ows:Title xml:lang="en">default</ows:Title>
<ows:Identifier>default</ows:Identifier>
<LegendURL width="378" height="86" xlink:href="https://gibs.earthdata.nasa.gov/legends/AMSR2_Sea_Ice_Concentration_12km.png"/>
</Style>
<Dimension>
<ows:Identifier>Time</ows:Identifier>
<ows:UOM>ISO8601</ows:UOM>
<Default>2024-06-01</Default>
<Value>2012-07-01/2024-06-01/P1M</Value>
</Dimension>
<TileMatrixSetLink><TileMatrixSet>2km</TileMatrixSet></TileMatrixSetLink>
<Format>image/png</Format>
<ResourceURL template="https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/AMSR2_Sea_Ice_Concentration_12km_Monthly/default/{Time}/{TileMatrixSet}/{TileMatrix}/{TileRow}/{TileCol}.png" format="image/png" resourceType="tile"/>
</Layer>
<Layer>
<ows:Title xml:lang="en">Surface Air Temperature (Daily, Day, AIRS)</ows:Title>
<ows:WGS84BoundingBox crs="urn:ogc:def:crs:OGC:2:84">
<ows:LowerCorner>-180 -90</ows:LowerCorner>
<ows:UpperCorner>180 90</ows:UpperCorner>
</ows:WGS84BoundingBox>
<ows:Identifier>AIRS_L3_Surface_Air_Temperature_Daily_Day</ows:Identifier>
<ows:Metadata xlink:role="http://earthdata.nasa.gov/gibs/metadata-type/colormap" xlink:href="https://gibs.earthdata.nasa.gov/colormaps/v1.3/AIRS_Surface_Air_Temperature_Daily_Day.xml" xlink:title="GIBS Colormap"/>
<Style isDefault="true">
<ows:Title xml:lang="en">default</ows:Title>
<ows:Identifier>default</ows:Identifier>
<LegendURL width="378" height="86" xlink:href="https://gibs.earthdata.nasa.gov/legends/AIRS_Temperature.png"/>
</Style>
<Dimension>
<ows:Identifier>Time</ows:Identifier>
<ows:UOM>ISO8601</ows:UOM>
<Default>2024-11-15</Default>
<Value>2002-08-30/2024-11-15/P1D</Value>
</Dimension>
<TileMatrixSetLink><TileMatrixSet>2km</TileMatrixSet></TileMatrixSetLink>
<Format>image/png</Format>
</Layer>
</Contents>
</Capabilities>

32
tests/fixtures/colormap_sample.xml vendored Normal file
View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<ColorMaps xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<ColorMap title="Surface Air Temperature" units="K">
<ColorMapEntry rgb="227,245,255" transparent="false" sourceValue="[0,1)" value="[-INF,200.0)" ref="1"/>
<ColorMapEntry rgb="204,234,252" transparent="false" sourceValue="[1,2)" value="[200.0,210.0)" ref="2"/>
<ColorMapEntry rgb="150,200,230" transparent="false" sourceValue="[2,3)" value="[210.0,220.0)" ref="3"/>
<ColorMapEntry rgb="100,180,220" transparent="false" sourceValue="[3,4)" value="[220.0,230.0)" ref="4"/>
<ColorMapEntry rgb="80,160,200" transparent="false" sourceValue="[4,5)" value="[230.0,240.0)" ref="5"/>
<ColorMapEntry rgb="50,140,180" transparent="false" sourceValue="[5,6)" value="[240.0,250.0)" ref="6"/>
<ColorMapEntry rgb="100,200,100" transparent="false" sourceValue="[6,7)" value="[250.0,260.0)" ref="7"/>
<ColorMapEntry rgb="180,220,50" transparent="false" sourceValue="[7,8)" value="[260.0,270.0)" ref="8"/>
<ColorMapEntry rgb="255,200,50" transparent="false" sourceValue="[8,9)" value="[270.0,280.0)" ref="9"/>
<ColorMapEntry rgb="255,150,50" transparent="false" sourceValue="[9,10)" value="[280.0,290.0)" ref="10"/>
<ColorMapEntry rgb="255,100,50" transparent="false" sourceValue="[10,11)" value="[290.0,300.0)" ref="11"/>
<ColorMapEntry rgb="220,50,30" transparent="false" sourceValue="[11,12)" value="[300.0,310.0)" ref="12"/>
<ColorMapEntry rgb="180,20,50" transparent="false" sourceValue="[12,13)" value="[310.0,320.0)" ref="13"/>
<ColorMapEntry rgb="251,3,207" transparent="false" sourceValue="[13,14)" value="[320.0,+INF)" ref="14"/>
<Legend type="continuous">
<LegendEntry rgb="227,245,255" tooltip="&lt; 200 K" id="1"/>
<LegendEntry rgb="204,234,252" tooltip="200 - 210 K" id="2" showTick="true" showLabel="true"/>
<LegendEntry rgb="100,200,100" tooltip="250 - 260 K" id="7" showTick="true" showLabel="true"/>
<LegendEntry rgb="255,100,50" tooltip="290 - 300 K" id="11" showTick="true" showLabel="true"/>
<LegendEntry rgb="251,3,207" tooltip="&gt; 320 K" id="14"/>
</Legend>
</ColorMap>
<ColorMap>
<ColorMapEntry rgb="0,0,0" transparent="true" sourceValue="-9999" nodata="true" label="Missing Data"/>
<Legend type="classification">
<LegendEntry rgb="0,0,0" tooltip="Missing Data" id="1"/>
</Legend>
</ColorMap>
</ColorMaps>

View File

@ -0,0 +1,20 @@
{
"title": "Corrected Reflectance (True Color)",
"measurement": "Corrected Reflectance",
"instrument": "MODIS",
"platform": "Terra",
"period": "Daily",
"ongoing": true,
"daynight": "Day",
"description": "True color imagery from the Moderate Resolution Imaging Spectroradiometer (MODIS) on the Terra satellite.",
"orbitTracks": [
{"type": "descending"}
],
"conceptIds": [
{
"type": "NRT",
"value": "C1219032686-LANCEMODIS",
"shortName": "MOD021KM"
}
]
}

104
tests/test_capabilities.py Normal file
View File

@ -0,0 +1,104 @@
"""Tests for WMTS GetCapabilities XML parsing and layer search."""
from mcgibs.capabilities import parse_capabilities, search_layers
def test_parse_capabilities_layer_count(capabilities_xml: str):
"""parse_capabilities should return exactly 3 layers from the sample XML."""
layers = parse_capabilities(capabilities_xml)
assert len(layers) == 3
def test_parse_capabilities_true_color_layer(capabilities_xml: str):
"""MODIS true color layer should have correct title, formats, and tile matrix sets."""
layers = parse_capabilities(capabilities_xml)
layer = layers["MODIS_Terra_CorrectedReflectance_TrueColor"]
assert layer.title == "Corrected Reflectance (True Color, Terra/MODIS)"
assert layer.formats == ["image/jpeg"]
assert layer.tile_matrix_sets == ["250m"]
def test_parse_capabilities_time_dimension(capabilities_xml: str):
"""True color layer time dimension should have correct start, end, period, and default."""
layers = parse_capabilities(capabilities_xml)
layer = layers["MODIS_Terra_CorrectedReflectance_TrueColor"]
assert layer.time is not None
assert layer.time.start == "2000-02-24"
assert layer.time.end == "2025-12-01"
assert layer.time.period == "P1D"
assert layer.time.default == "2025-12-01"
def test_parse_capabilities_bbox(capabilities_xml: str):
"""All sample layers have a global bounding box of -180,-90 to 180,90."""
layers = parse_capabilities(capabilities_xml)
layer = layers["MODIS_Terra_CorrectedReflectance_TrueColor"]
assert layer.bbox is not None
assert layer.bbox.west == -180.0
assert layer.bbox.south == -90.0
assert layer.bbox.east == 180.0
assert layer.bbox.north == 90.0
def test_parse_capabilities_colormap_detection(capabilities_xml: str):
"""AMSR2 layer should have a colormap; true color layer should not."""
layers = parse_capabilities(capabilities_xml)
amsr2 = layers["AMSR2_Sea_Ice_Concentration_12km_Monthly"]
assert amsr2.has_colormap is True
true_color = layers["MODIS_Terra_CorrectedReflectance_TrueColor"]
assert true_color.has_colormap is False
def test_parse_capabilities_legend_url(capabilities_xml: str):
"""AMSR2 layer should have a legend URL pointing to the PNG legend image."""
layers = parse_capabilities(capabilities_xml)
amsr2 = layers["AMSR2_Sea_Ice_Concentration_12km_Monthly"]
assert amsr2.legend_url == (
"https://gibs.earthdata.nasa.gov/legends/AMSR2_Sea_Ice_Concentration_12km.png"
)
def test_search_layers_keyword(capabilities_xml: str):
"""Searching for 'sea ice' should return the AMSR2 sea ice layer."""
index = parse_capabilities(capabilities_xml)
results = search_layers(index, "sea ice")
assert len(results) >= 1
identifiers = [r.identifier for r in results]
assert "AMSR2_Sea_Ice_Concentration_12km_Monthly" in identifiers
def test_search_layers_no_results(capabilities_xml: str):
"""Searching for a nonexistent term should return an empty list."""
index = parse_capabilities(capabilities_xml)
results = search_layers(index, "nonexistent")
assert results == []
def test_search_layers_limit(capabilities_xml: str):
"""Search with limit=1 should return at most 1 result."""
index = parse_capabilities(capabilities_xml)
# Use a broad query that could match multiple layers
results = search_layers(index, "temperature", limit=1)
assert len(results) <= 1
def test_parse_capabilities_resource_url(capabilities_xml: str):
"""True color layer should have the expected ResourceURL template."""
layers = parse_capabilities(capabilities_xml)
layer = layers["MODIS_Terra_CorrectedReflectance_TrueColor"]
expected = (
"https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/"
"MODIS_Terra_CorrectedReflectance_TrueColor/default/"
"{Time}/{TileMatrixSet}/{TileMatrix}/{TileRow}/{TileCol}.jpg"
)
assert layer.resource_url_template == expected

184
tests/test_client.py Normal file
View File

@ -0,0 +1,184 @@
"""Tests for GIBSClient using respx mocks — no real HTTP calls."""
from io import BytesIO
import httpx
import respx
from PIL import Image
from mcgibs.client import GIBSClient
from mcgibs.constants import WMTS_TILE_URL
@respx.mock
async def test_client_initialize(capabilities_xml):
"""Loading capabilities populates layer_index with all three sample layers."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
client = GIBSClient()
await client.initialize()
assert len(client.layer_index) == 3
assert "MODIS_Terra_CorrectedReflectance_TrueColor" in client.layer_index
assert "AMSR2_Sea_Ice_Concentration_12km_Monthly" in client.layer_index
assert "AIRS_L3_Surface_Air_Temperature_Daily_Day" in client.layer_index
# Spot-check a parsed layer
modis = client.layer_index["MODIS_Terra_CorrectedReflectance_TrueColor"]
assert modis.title == "Corrected Reflectance (True Color, Terra/MODIS)"
assert "image/jpeg" in modis.formats
assert modis.time is not None
assert modis.time.start == "2000-02-24"
assert modis.has_colormap is False
await client.close()
@respx.mock
async def test_client_fetch_layer_metadata(capabilities_xml, layer_metadata_json):
"""Fetching layer metadata enriches the layer with instrument/platform fields."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
layer_id = "MODIS_Terra_CorrectedReflectance_TrueColor"
respx.get(url__regex=rf".*layer-metadata.*/{layer_id}\.json").mock(
return_value=httpx.Response(200, text=layer_metadata_json)
)
client = GIBSClient()
await client.initialize()
data = await client.fetch_layer_metadata(layer_id)
assert data["instrument"] == "MODIS"
assert data["platform"] == "Terra"
assert data["ongoing"] is True
# Verify the layer_index entry was enriched
layer = client.get_layer(layer_id)
assert layer is not None
assert layer.instrument == "MODIS"
assert layer.platform == "Terra"
assert layer.measurement == "Corrected Reflectance"
assert layer.day_night == "Day"
# Second call should use cache
data2 = await client.fetch_layer_metadata(layer_id)
assert data2 is data
await client.close()
@respx.mock
async def test_client_fetch_colormap(capabilities_xml, colormap_xml):
"""Parsing colormap XML produces entries and legend data."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
respx.get(url__regex=r".*colormaps/v1\.3/.*\.xml").mock(
return_value=httpx.Response(200, text=colormap_xml)
)
client = GIBSClient()
await client.initialize()
colormap_set = await client.fetch_colormap("AIRS_L3_Surface_Air_Temperature_Daily_Day")
assert colormap_set is not None
assert len(colormap_set.maps) == 2
data_map = colormap_set.data_map
assert data_map is not None
assert data_map.title == "Surface Air Temperature"
assert data_map.units == "K"
assert len(data_map.entries) == 14
assert data_map.legend_type == "continuous"
# Verify the nodata map
nodata_map = colormap_set.maps[1]
assert any(e.nodata for e in nodata_map.entries)
# Second fetch should be cached
cached = await client.fetch_colormap("AIRS_L3_Surface_Air_Temperature_Daily_Day")
assert cached is colormap_set
await client.close()
@respx.mock
async def test_client_get_wms_image(capabilities_xml):
"""WMS GetMap returns raw image bytes when content-type is image/*."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
# Build a tiny valid JPEG to return as the mock response
buf = BytesIO()
Image.new("RGB", (10, 10), "blue").save(buf, format="JPEG")
fake_jpeg = buf.getvalue()
respx.get(url__regex=r".*wms\.cgi.*").mock(
return_value=httpx.Response(
200,
content=fake_jpeg,
headers={"content-type": "image/jpeg"},
)
)
client = GIBSClient()
await client.initialize()
from mcgibs.models import BBox
bbox = BBox(west=-120.0, south=30.0, east=-110.0, north=40.0)
result = await client.get_wms_image(
"MODIS_Terra_CorrectedReflectance_TrueColor",
"2025-06-01",
bbox,
)
assert isinstance(result, bytes)
assert len(result) > 0
# Verify the returned bytes are valid JPEG (starts with FFD8)
assert result[:2] == b"\xff\xd8"
await client.close()
def test_client_build_tile_url():
"""build_tile_url produces the expected WMTS REST URL format."""
client = GIBSClient()
url = client.build_tile_url(
layer_id="MODIS_Terra_CorrectedReflectance_TrueColor",
date="2025-06-01",
zoom=3,
row=2,
col=4,
tile_matrix_set="250m",
ext="jpg",
epsg="4326",
)
assert "MODIS_Terra_CorrectedReflectance_TrueColor" in url
assert "2025-06-01" in url
assert "/250m/" in url
assert "/3/" in url
assert "/2/" in url
assert "/4.jpg" in url
assert "epsg4326" in url
# Verify it matches the constant template
expected = WMTS_TILE_URL.format(
epsg="4326",
layer_id="MODIS_Terra_CorrectedReflectance_TrueColor",
date="2025-06-01",
tile_matrix_set="250m",
z=3,
row=2,
col=4,
ext="jpg",
)
assert url == expected

143
tests/test_colormaps.py Normal file
View File

@ -0,0 +1,143 @@
"""Tests for mcgibs.colormaps -- XML parsing and natural-language explanations."""
import pytest
from mcgibs.colormaps import (
_describe_rgb,
_parse_interval_value,
explain_colormap,
parse_colormap,
)
# ---------------------------------------------------------------------------
# parse_colormap
# ---------------------------------------------------------------------------
def test_parse_colormap_map_count(colormap_xml: str):
"""Sample XML contains exactly 2 ColorMap elements."""
result = parse_colormap(colormap_xml)
assert len(result.maps) == 2
def test_parse_colormap_data_entries(colormap_xml: str):
"""First ColorMap has 14 data entries, correct title and units."""
result = parse_colormap(colormap_xml)
first = result.maps[0]
assert len(first.entries) == 14
assert first.title == "Surface Air Temperature"
assert first.units == "K"
def test_parse_colormap_nodata_entry(colormap_xml: str):
"""Second ColorMap has a nodata entry labelled 'Missing Data'."""
result = parse_colormap(colormap_xml)
second = result.maps[1]
nodata = [e for e in second.entries if e.nodata]
assert len(nodata) == 1
assert nodata[0].label == "Missing Data"
def test_parse_colormap_rgb_values(colormap_xml: str):
"""First entry of the first ColorMap has rgb (227, 245, 255)."""
result = parse_colormap(colormap_xml)
first_entry = result.maps[0].entries[0]
assert first_entry.rgb == (227, 245, 255)
def test_parse_colormap_value_intervals(colormap_xml: str):
"""First entry value is '[-INF,200.0)', last entry is '[320.0,+INF)'."""
result = parse_colormap(colormap_xml)
entries = result.maps[0].entries
assert entries[0].value == "[-INF,200.0)"
assert entries[-1].value == "[320.0,+INF)"
def test_parse_colormap_legend(colormap_xml: str):
"""First ColorMap has legend entries with expected tooltips."""
result = parse_colormap(colormap_xml)
legend = result.maps[0].legend
assert len(legend) == 5
tooltips = [le.tooltip for le in legend]
assert "< 200 K" in tooltips
assert "200 - 210 K" in tooltips
assert "> 320 K" in tooltips
# ---------------------------------------------------------------------------
# _parse_interval_value
# ---------------------------------------------------------------------------
def test_parse_interval_value_bounded():
"""Bounded interval '[200.0,200.5)' parses to (200.0, 200.5)."""
assert _parse_interval_value("[200.0,200.5)") == (200.0, 200.5)
def test_parse_interval_value_neg_inf():
"""Negative infinity '[-INF,200.0)' parses to (None, 200.0)."""
assert _parse_interval_value("[-INF,200.0)") == (None, 200.0)
def test_parse_interval_value_pos_inf():
"""Positive infinity '[320.0,+INF)' parses to (320.0, None)."""
assert _parse_interval_value("[320.0,+INF)") == (320.0, None)
def test_parse_interval_value_single():
"""Single value '[42]' parses to (42.0, 42.0)."""
assert _parse_interval_value("[42]") == (42.0, 42.0)
# ---------------------------------------------------------------------------
# explain_colormap
# ---------------------------------------------------------------------------
def test_explain_colormap_includes_title(colormap_xml: str):
"""Explanation text contains the layer title."""
cms = parse_colormap(colormap_xml)
text = explain_colormap(cms)
assert "Surface Air Temperature" in text
def test_explain_colormap_includes_units(colormap_xml: str):
"""Explanation mentions the native unit and Celsius conversion."""
cms = parse_colormap(colormap_xml)
text = explain_colormap(cms)
assert "(K)" in text
assert "C)" in text # Celsius conversion appears as e.g. "(-73 C)"
def test_explain_colormap_nodata_mention(colormap_xml: str):
"""Explanation mentions 'Missing Data' from the nodata entry."""
cms = parse_colormap(colormap_xml)
text = explain_colormap(cms)
assert "Missing Data" in text
# ---------------------------------------------------------------------------
# _describe_rgb
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
("rgb", "expected_substring"),
[
((255, 0, 0), "red"),
((0, 128, 0), "green"),
((0, 0, 255), "blue"),
((255, 255, 255), "white"),
((0, 0, 0), "black"),
((255, 255, 0), "yellow"),
],
)
def test_describe_rgb_basic(rgb: tuple[int, int, int], expected_substring: str):
"""Known RGB triples produce color names containing the expected word."""
result = _describe_rgb(rgb)
assert expected_substring in result.lower()

168
tests/test_geo.py Normal file
View File

@ -0,0 +1,168 @@
"""Tests for mcgibs.geo — geocoding, bbox helpers, and caching."""
import httpx
import respx
import mcgibs.geo as _geo_module
from mcgibs.geo import bbox_from_point, expand_bbox, geocode
from mcgibs.models import BBox
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
TOKYO_HIT = {
"display_name": "Tokyo, Japan",
"lat": "35.6762",
"lon": "139.6503",
"boundingbox": ["35.5191", "35.8170", "139.5601", "139.9200"],
"osm_type": "relation",
"importance": 0.82,
}
def _reset_rate_limit() -> None:
"""Reset the module-level rate-limit timestamp so tests don't stall."""
_geo_module._last_request_time = 0.0
# ---------------------------------------------------------------------------
# geocode() tests
# ---------------------------------------------------------------------------
@respx.mock
async def test_geocode_success():
_reset_rate_limit()
respx.get(NOMINATIM_URL).mock(
return_value=httpx.Response(200, json=[TOKYO_HIT]),
)
async with httpx.AsyncClient() as client:
cache: dict = {}
result = await geocode(client, "Tokyo", cache)
assert result is not None
assert result.display_name == "Tokyo, Japan"
assert result.lat == 35.6762
assert result.lon == 139.6503
assert result.osm_type == "relation"
assert result.importance == 0.82
# BBox built from Nominatim's [south, north, west, east] order
assert result.bbox.south == 35.5191
assert result.bbox.north == 35.8170
assert result.bbox.west == 139.5601
assert result.bbox.east == 139.9200
@respx.mock
async def test_geocode_no_results():
_reset_rate_limit()
respx.get(NOMINATIM_URL).mock(
return_value=httpx.Response(200, json=[]),
)
async with httpx.AsyncClient() as client:
cache: dict = {}
result = await geocode(client, "xyznonexistent", cache)
assert result is None
@respx.mock
async def test_geocode_caching():
"""Second call with the same query must be served from cache — no extra HTTP request."""
_reset_rate_limit()
route = respx.get(NOMINATIM_URL).mock(
return_value=httpx.Response(200, json=[TOKYO_HIT]),
)
async with httpx.AsyncClient() as client:
cache: dict = {}
first = await geocode(client, "Tokyo", cache)
second = await geocode(client, "Tokyo", cache)
assert route.call_count == 1
assert first is not None
assert second is not None
assert first.display_name == second.display_name
@respx.mock
async def test_geocode_http_error():
"""A 500 response should return None without raising an exception."""
_reset_rate_limit()
respx.get(NOMINATIM_URL).mock(
return_value=httpx.Response(500, text="Internal Server Error"),
)
async with httpx.AsyncClient() as client:
cache: dict = {}
result = await geocode(client, "ServerError", cache)
assert result is None
# ---------------------------------------------------------------------------
# expand_bbox() tests
# ---------------------------------------------------------------------------
def test_expand_bbox():
bbox = BBox(west=10.0, south=20.0, east=20.0, north=30.0)
expanded = expand_bbox(bbox, factor=0.1)
# Width is 10 deg, so 10% padding = 1 deg each side.
assert expanded.west < bbox.west
assert expanded.east > bbox.east
assert expanded.south < bbox.south
assert expanded.north > bbox.north
# Verify exact amounts (width = 10, dlon = 1; height = 10, dlat = 1).
assert expanded.west == 9.0
assert expanded.east == 21.0
assert expanded.south == 19.0
assert expanded.north == 31.0
def test_expand_bbox_clamping():
"""Expanding a bbox near the poles must clamp latitude to [-90, 90]."""
polar = BBox(west=-170.0, south=85.0, east=170.0, north=89.5)
expanded = expand_bbox(polar, factor=0.5)
assert expanded.north <= 90.0
assert expanded.south >= -90.0
# Longitude should also stay within bounds.
assert expanded.west >= -180.0
assert expanded.east <= 180.0
# ---------------------------------------------------------------------------
# bbox_from_point() tests
# ---------------------------------------------------------------------------
def test_bbox_from_point():
bbox = bbox_from_point(35.0, 139.0, radius_deg=1.0)
assert bbox.south == 34.0
assert bbox.north == 36.0
assert bbox.west == 138.0
assert bbox.east == 140.0
def test_bbox_from_point_clamping():
"""A point near the pole should have its bbox clamped to valid lat range."""
bbox = bbox_from_point(89.5, 0.0, radius_deg=2.0)
assert bbox.north == 90.0 # clamped, not 91.5
assert bbox.south == 87.5
assert bbox.west == -2.0
assert bbox.east == 2.0

173
tests/test_tools.py Normal file
View File

@ -0,0 +1,173 @@
"""FastMCP integration tests — tool calls against the real server, HTTP mocked."""
import json
import httpx
import respx
from fastmcp import Client
import mcgibs.server as server_module
from mcgibs.client import GIBSClient
from mcgibs.server import mcp
async def _init_mock_client(capabilities_xml: str) -> GIBSClient:
"""Create and initialize a GIBSClient with mocked capabilities."""
client = GIBSClient()
await client.initialize()
return client
@respx.mock
async def test_search_tool(capabilities_xml):
"""search_gibs_layers finds the sea ice layer by keyword."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
server_module._client = await _init_mock_client(capabilities_xml)
try:
async with Client(mcp) as client:
result = await client.call_tool("search_gibs_layers", {"query": "sea ice"})
text = result.content[0].text
assert "AMSR2" in text
assert "Sea Ice" in text
# Should not match unrelated layers
assert "Surface Air Temperature" not in text
finally:
await server_module._client.close()
server_module._client = None
@respx.mock
async def test_get_layer_info_tool(capabilities_xml, layer_metadata_json):
"""get_layer_info returns enriched JSON for the true color layer."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
layer_id = "MODIS_Terra_CorrectedReflectance_TrueColor"
respx.get(url__regex=rf".*layer-metadata.*/{layer_id}\.json").mock(
return_value=httpx.Response(200, text=layer_metadata_json)
)
server_module._client = await _init_mock_client(capabilities_xml)
try:
async with Client(mcp) as client:
result = await client.call_tool("get_layer_info", {"layer_id": layer_id})
text = result.content[0].text
info = json.loads(text)
assert info["identifier"] == layer_id
assert info["instrument"] == "MODIS"
assert info["platform"] == "Terra"
assert info["ongoing"] is True
assert "time" in info
assert info["time"]["start"] == "2000-02-24"
finally:
await server_module._client.close()
server_module._client = None
@respx.mock
async def test_resolve_place_tool(capabilities_xml):
"""resolve_place geocodes a place name via mocked Nominatim."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
nominatim_response = [
{
"display_name": "Tokyo, Japan",
"lat": "35.6762",
"lon": "139.6503",
"boundingbox": ["35.5190", "35.8178", "138.9428", "139.9200"],
"osm_type": "relation",
"importance": 0.82,
}
]
respx.get(url__regex=r".*nominatim.*search.*").mock(
return_value=httpx.Response(200, json=nominatim_response)
)
server_module._client = await _init_mock_client(capabilities_xml)
try:
async with Client(mcp) as client:
result = await client.call_tool("resolve_place", {"place": "Tokyo"})
text = result.content[0].text
data = json.loads(text)
assert data["display_name"] == "Tokyo, Japan"
assert abs(data["lat"] - 35.6762) < 0.01
assert abs(data["lon"] - 139.6503) < 0.01
assert "bbox" in data
assert data["bbox"]["west"] < data["bbox"]["east"]
finally:
await server_module._client.close()
server_module._client = None
@respx.mock
async def test_build_tile_url_tool(capabilities_xml):
"""build_tile_url returns a properly formatted WMTS tile URL."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
server_module._client = await _init_mock_client(capabilities_xml)
try:
async with Client(mcp) as client:
result = await client.call_tool(
"build_tile_url",
{
"layer_id": "MODIS_Terra_CorrectedReflectance_TrueColor",
"date": "2025-06-01",
"zoom": 3,
"row": 2,
"col": 4,
},
)
url = result.content[0].text
assert "MODIS_Terra_CorrectedReflectance_TrueColor" in url
assert "2025-06-01" in url
assert "epsg4326" in url
# True color layer uses JPEG and 250m matrix set
assert url.endswith(".jpg")
assert "/250m/" in url
finally:
await server_module._client.close()
server_module._client = None
@respx.mock
async def test_list_tools(capabilities_xml):
"""All expected tools are registered on the MCP server."""
respx.get(url__regex=r".*WMTSCapabilities\.xml").mock(
return_value=httpx.Response(200, text=capabilities_xml)
)
server_module._client = await _init_mock_client(capabilities_xml)
try:
async with Client(mcp) as client:
tools = await client.list_tools()
tool_names = {t.name for t in tools}
expected = {
"search_gibs_layers",
"get_layer_info",
"list_measurements",
"check_layer_dates",
"get_imagery",
"compare_dates",
"get_imagery_composite",
"explain_layer_colormap",
"get_legend",
"resolve_place",
"build_tile_url",
}
for name in expected:
assert name in tool_names, f"Missing tool: {name}"
finally:
await server_module._client.close()
server_module._client = None

1453
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff